r25000: Fix some more C++ compatibility warnings.
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_includes.h"
35
36 #include "ldb_tdb.h"
37
38 /*
39   find an element in a list, using the given comparison function and
40   assuming that the list is already sorted using comp_fn
41
42   return -1 if not found, or the index of the first occurance of needle if found
43 */
44 static int ldb_list_find(const void *needle, 
45                          const void *base, size_t nmemb, size_t size, 
46                          comparison_fn_t comp_fn)
47 {
48         const char *base_p = (const char *)base;
49         size_t min_i, max_i, test_i;
50
51         if (nmemb == 0) {
52                 return -1;
53         }
54
55         min_i = 0;
56         max_i = nmemb-1;
57
58         while (min_i < max_i) {
59                 int r;
60
61                 test_i = (min_i + max_i) / 2;
62                 /* the following cast looks strange, but is
63                  correct. The key to understanding it is that base_p
64                  is a pointer to an array of pointers, so we have to
65                  dereference it after casting to void **. The strange
66                  const in the middle gives us the right type of pointer
67                  after the dereference  (tridge) */
68                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
69                 if (r == 0) {
70                         /* scan back for first element */
71                         while (test_i > 0 &&
72                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
73                                 test_i--;
74                         }
75                         return test_i;
76                 }
77                 if (r < 0) {
78                         if (test_i == 0) {
79                                 return -1;
80                         }
81                         max_i = test_i - 1;
82                 }
83                 if (r > 0) {
84                         min_i = test_i + 1;
85                 }
86         }
87
88         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
89                 return min_i;
90         }
91
92         return -1;
93 }
94
95 struct dn_list {
96         unsigned int count;
97         char **dn;
98 };
99
100 /*
101   return the dn key to be used for an index
102   caller frees
103 */
104 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
105                                      const char *attr, const struct ldb_val *value)
106 {
107         struct ldb_dn *ret;
108         struct ldb_val v;
109         const struct ldb_schema_attribute *a;
110         char *attr_folded;
111         int r;
112
113         attr_folded = ldb_attr_casefold(ldb, attr);
114         if (!attr_folded) {
115                 return NULL;
116         }
117
118         a = ldb_schema_attribute_by_name(ldb, attr);
119         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
120         if (r != LDB_SUCCESS) {
121                 const char *errstr = ldb_errstring(ldb);
122                 /* canonicalisation can be refused. For example, 
123                    a attribute that takes wildcards will refuse to canonicalise
124                    if the value contains a wildcard */
125                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
126                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
127                 talloc_free(attr_folded);
128                 return NULL;
129         }
130         if (ldb_should_b64_encode(&v)) {
131                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
132                 if (!vstr) return NULL;
133                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
134                 talloc_free(vstr);
135         } else {
136                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137         }
138
139         if (v.data != value->data) {
140                 talloc_free(v.data);
141         }
142         talloc_free(attr_folded);
143
144         return ret;
145 }
146
147 /*
148   see if a attribute value is in the list of indexed attributes
149 */
150 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
151                             unsigned int *v_idx, const char *key)
152 {
153         unsigned int i, j;
154         for (i=0;i<msg->num_elements;i++) {
155                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
156                         const struct ldb_message_element *el = &msg->elements[i];
157
158                         if (attr == NULL) {
159                                 /* in this case we are just looking to see if key is present,
160                                    we are not spearching for a specific index */
161                                 return 0;
162                         }
163
164                         for (j=0;j<el->num_values;j++) {
165                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
166                                         if (v_idx) {
167                                                 *v_idx = j;
168                                         }
169                                         return i;
170                                 }
171                         }
172                 }
173         }
174         return -1;
175 }
176
177 /* used in sorting dn lists */
178 static int list_cmp(const char **s1, const char **s2)
179 {
180         return strcmp(*s1, *s2);
181 }
182
183 /*
184   return a list of dn's that might match a simple indexed search or
185  */
186 static int ltdb_index_dn_simple(struct ldb_module *module, 
187                                 const struct ldb_parse_tree *tree,
188                                 const struct ldb_message *index_list,
189                                 struct dn_list *list)
190 {
191         struct ldb_context *ldb = module->ldb;
192         struct ldb_dn *dn;
193         int ret;
194         unsigned int i, j;
195         struct ldb_message *msg;
196
197         list->count = 0;
198         list->dn = NULL;
199
200         /* if the attribute isn't in the list of indexed attributes then
201            this node needs a full search */
202         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
203                 return LDB_ERR_OPERATIONS_ERROR;
204         }
205
206         /* the attribute is indexed. Pull the list of DNs that match the 
207            search criterion */
208         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
209         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
210
211         msg = talloc(list, struct ldb_message);
212         if (msg == NULL) {
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ltdb_search_dn1(module, dn, msg);
217         talloc_free(dn);
218         if (ret != LDB_SUCCESS) {
219                 return ret;
220         }
221
222         for (i=0;i<msg->num_elements;i++) {
223                 struct ldb_message_element *el;
224
225                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
226                         continue;
227                 }
228
229                 el = &msg->elements[i];
230
231                 list->dn = talloc_array(list, char *, el->num_values);
232                 if (!list->dn) {
233                         talloc_free(msg);
234                         return LDB_ERR_OPERATIONS_ERROR;
235                 }
236
237                 for (j=0;j<el->num_values;j++) {
238                         list->dn[list->count] = 
239                                 talloc_strdup(list->dn, (char *)el->values[j].data);
240                         if (!list->dn[list->count]) {
241                                 talloc_free(msg);
242                                 return LDB_ERR_OPERATIONS_ERROR;
243                         }
244                         list->count++;
245                 }
246         }
247
248         talloc_free(msg);
249
250         if (list->count > 1) {
251                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
252         }
253
254         return LDB_SUCCESS;
255 }
256
257
258 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
259
260 /*
261   return a list of dn's that might match a simple indexed search on
262   the special objectclass attribute
263  */
264 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
265                                      const struct ldb_parse_tree *tree,
266                                      const struct ldb_message *index_list,
267                                      struct dn_list *list)
268 {
269         struct ldb_context *ldb = module->ldb;
270         unsigned int i;
271         int ret;
272         const char *target = (const char *)tree->u.equality.value.data;
273         const char **subclasses;
274
275         list->count = 0;
276         list->dn = NULL;
277
278         ret = ltdb_index_dn_simple(module, tree, index_list, list);
279
280         subclasses = ldb_subclass_list(module->ldb, target);
281
282         if (subclasses == NULL) {
283                 return ret;
284         }
285
286         for (i=0;subclasses[i];i++) {
287                 struct ldb_parse_tree tree2;
288                 struct dn_list *list2;
289                 tree2.operation = LDB_OP_EQUALITY;
290                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
291                 if (!tree2.u.equality.attr) {
292                         return LDB_ERR_OPERATIONS_ERROR;
293                 }
294                 tree2.u.equality.value.data = 
295                         (uint8_t *)talloc_strdup(list, subclasses[i]);
296                 if (tree2.u.equality.value.data == NULL) {
297                         return LDB_ERR_OPERATIONS_ERROR;                        
298                 }
299                 tree2.u.equality.value.length = strlen(subclasses[i]);
300                 list2 = talloc(list, struct dn_list);
301                 if (list2 == NULL) {
302                         talloc_free(tree2.u.equality.value.data);
303                         return LDB_ERR_OPERATIONS_ERROR;
304                 }
305                 if (ltdb_index_dn_objectclass(module, &tree2, 
306                                               index_list, list2) == LDB_SUCCESS) {
307                         if (list->count == 0) {
308                                 *list = *list2;
309                                 ret = LDB_SUCCESS;
310                         } else {
311                                 list_union(ldb, list, list2);
312                                 talloc_free(list2);
313                         }
314                 }
315                 talloc_free(tree2.u.equality.value.data);
316         }
317
318         return ret;
319 }
320
321 /*
322   return a list of dn's that might match a leaf indexed search
323  */
324 static int ltdb_index_dn_leaf(struct ldb_module *module, 
325                               const struct ldb_parse_tree *tree,
326                               const struct ldb_message *index_list,
327                               struct dn_list *list)
328 {
329         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
330                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
331         }
332         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
333                 list->dn = talloc_array(list, char *, 1);
334                 if (list->dn == NULL) {
335                         ldb_oom(module->ldb);
336                         return LDB_ERR_OPERATIONS_ERROR;
337                 }
338                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
339                 if (list->dn[0] == NULL) {
340                         ldb_oom(module->ldb);
341                         return LDB_ERR_OPERATIONS_ERROR;
342                 }
343                 list->count = 1;
344                 return LDB_SUCCESS;
345         }
346         return ltdb_index_dn_simple(module, tree, index_list, list);
347 }
348
349
350 /*
351   list intersection
352   list = list & list2
353   relies on the lists being sorted
354 */
355 static int list_intersect(struct ldb_context *ldb,
356                           struct dn_list *list, const struct dn_list *list2)
357 {
358         struct dn_list *list3;
359         unsigned int i;
360
361         if (list->count == 0 || list2->count == 0) {
362                 /* 0 & X == 0 */
363                 return LDB_ERR_NO_SUCH_OBJECT;
364         }
365
366         list3 = talloc(ldb, struct dn_list);
367         if (list3 == NULL) {
368                 return LDB_ERR_OPERATIONS_ERROR;
369         }
370
371         list3->dn = talloc_array(list3, char *, list->count);
372         if (!list3->dn) {
373                 talloc_free(list3);
374                 return LDB_ERR_OPERATIONS_ERROR;
375         }
376         list3->count = 0;
377
378         for (i=0;i<list->count;i++) {
379                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
380                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
381                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
382                         list3->count++;
383                 } else {
384                         talloc_free(list->dn[i]);
385                 }               
386         }
387
388         talloc_free(list->dn);
389         list->dn = talloc_move(list, &list3->dn);
390         list->count = list3->count;
391         talloc_free(list3);
392
393         return LDB_ERR_NO_SUCH_OBJECT;
394 }
395
396
397 /*
398   list union
399   list = list | list2
400   relies on the lists being sorted
401 */
402 static int list_union(struct ldb_context *ldb, 
403                       struct dn_list *list, const struct dn_list *list2)
404 {
405         unsigned int i;
406         char **d;
407         unsigned int count = list->count;
408
409         if (list->count == 0 && list2->count == 0) {
410                 /* 0 | 0 == 0 */
411                 return LDB_ERR_NO_SUCH_OBJECT;
412         }
413
414         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
415         if (!d) {
416                 return LDB_ERR_OPERATIONS_ERROR;
417         }
418         list->dn = d;
419
420         for (i=0;i<list2->count;i++) {
421                 if (ldb_list_find(list2->dn[i], list->dn, count, 
422                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
423                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
424                         if (!list->dn[list->count]) {
425                                 return LDB_ERR_OPERATIONS_ERROR;
426                         }
427                         list->count++;
428                 }               
429         }
430
431         if (list->count != count) {
432                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
433         }
434
435         return LDB_ERR_NO_SUCH_OBJECT;
436 }
437
438 static int ltdb_index_dn(struct ldb_module *module, 
439                          const struct ldb_parse_tree *tree,
440                          const struct ldb_message *index_list,
441                          struct dn_list *list);
442
443
444 /*
445   OR two index results
446  */
447 static int ltdb_index_dn_or(struct ldb_module *module, 
448                             const struct ldb_parse_tree *tree,
449                             const struct ldb_message *index_list,
450                             struct dn_list *list)
451 {
452         struct ldb_context *ldb = module->ldb;
453         unsigned int i;
454         int ret;
455         
456         ret = LDB_ERR_OPERATIONS_ERROR;
457         list->dn = NULL;
458         list->count = 0;
459
460         for (i=0;i<tree->u.list.num_elements;i++) {
461                 struct dn_list *list2;
462                 int v;
463
464                 list2 = talloc(module, struct dn_list);
465                 if (list2 == NULL) {
466                         return LDB_ERR_OPERATIONS_ERROR;
467                 }
468
469                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
470
471                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
472                         /* 0 || X == X */
473                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
474                                 ret = v;
475                         }
476                         talloc_free(list2);
477                         continue;
478                 }
479
480                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
481                         /* 1 || X == 1 */
482                         talloc_free(list->dn);
483                         talloc_free(list2);
484                         return v;
485                 }
486
487                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
488                         ret = LDB_SUCCESS;
489                         list->dn = talloc_move(list, &list2->dn);
490                         list->count = list2->count;
491                 } else {
492                         if (list_union(ldb, list, list2) == -1) {
493                                 talloc_free(list2);
494                                 return LDB_ERR_OPERATIONS_ERROR;
495                         }
496                         ret = LDB_SUCCESS;
497                 }
498                 talloc_free(list2);
499         }
500
501         if (list->count == 0) {
502                 return LDB_ERR_NO_SUCH_OBJECT;
503         }
504
505         return ret;
506 }
507
508
509 /*
510   NOT an index results
511  */
512 static int ltdb_index_dn_not(struct ldb_module *module, 
513                              const struct ldb_parse_tree *tree,
514                              const struct ldb_message *index_list,
515                              struct dn_list *list)
516 {
517         /* the only way to do an indexed not would be if we could
518            negate the not via another not or if we knew the total
519            number of database elements so we could know that the
520            existing expression covered the whole database. 
521            
522            instead, we just give up, and rely on a full index scan
523            (unless an outer & manages to reduce the list)
524         */
525         return LDB_ERR_OPERATIONS_ERROR;
526 }
527
528 /*
529   AND two index results
530  */
531 static int ltdb_index_dn_and(struct ldb_module *module, 
532                              const struct ldb_parse_tree *tree,
533                              const struct ldb_message *index_list,
534                              struct dn_list *list)
535 {
536         struct ldb_context *ldb = module->ldb;
537         unsigned int i;
538         int ret;
539         
540         ret = LDB_ERR_OPERATIONS_ERROR;
541         list->dn = NULL;
542         list->count = 0;
543
544         for (i=0;i<tree->u.list.num_elements;i++) {
545                 struct dn_list *list2;
546                 int v;
547
548                 list2 = talloc(module, struct dn_list);
549                 if (list2 == NULL) {
550                         return LDB_ERR_OPERATIONS_ERROR;
551                 }
552
553                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
554
555                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
556                         /* 0 && X == 0 */
557                         talloc_free(list->dn);
558                         talloc_free(list2);
559                         return LDB_ERR_NO_SUCH_OBJECT;
560                 }
561
562                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
563                         talloc_free(list2);
564                         continue;
565                 }
566
567                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
568                         ret = LDB_SUCCESS;
569                         talloc_free(list->dn);
570                         list->dn = talloc_move(list, &list2->dn);
571                         list->count = list2->count;
572                 } else {
573                         if (list_intersect(ldb, list, list2) == -1) {
574                                 talloc_free(list2);
575                                 return LDB_ERR_OPERATIONS_ERROR;
576                         }
577                 }
578
579                 talloc_free(list2);
580
581                 if (list->count == 0) {
582                         talloc_free(list->dn);
583                         return LDB_ERR_NO_SUCH_OBJECT;
584                 }
585         }
586
587         return ret;
588 }
589
590 /*
591   AND index results and ONE level special index
592  */
593 static int ltdb_index_dn_one(struct ldb_module *module, 
594                              struct ldb_dn *parent_dn,
595                              struct dn_list *list)
596 {
597         struct ldb_context *ldb = module->ldb;
598         struct dn_list *list2;
599         struct ldb_message *msg;
600         struct ldb_dn *key;
601         struct ldb_val val;
602         unsigned int i, j;
603         int ret;
604         
605         list2 = talloc_zero(module, struct dn_list);
606         if (list2 == NULL) {
607                 return LDB_ERR_OPERATIONS_ERROR;
608         }
609
610         /* the attribute is indexed. Pull the list of DNs that match the 
611            search criterion */
612         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(parent_dn));
613         val.length = strlen((char *)val.data);
614         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
615         if (!key) {
616                 talloc_free(list2);
617                 return LDB_ERR_OPERATIONS_ERROR;
618         }
619
620         msg = talloc(list2, struct ldb_message);
621         if (msg == NULL) {
622                 talloc_free(list2);
623                 return LDB_ERR_OPERATIONS_ERROR;
624         }
625
626         ret = ltdb_search_dn1(module, key, msg);
627         talloc_free(key);
628         if (ret != LDB_SUCCESS) {
629                 return ret;
630         }
631
632         for (i = 0; i < msg->num_elements; i++) {
633                 struct ldb_message_element *el;
634
635                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
636                         continue;
637                 }
638
639                 el = &msg->elements[i];
640
641                 list2->dn = talloc_array(list2, char *, el->num_values);
642                 if (!list2->dn) {
643                         talloc_free(list2);
644                         return LDB_ERR_OPERATIONS_ERROR;
645                 }
646
647                 for (j = 0; j < el->num_values; j++) {
648                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
649                         if (!list2->dn[list2->count]) {
650                                 talloc_free(list2);
651                                 return LDB_ERR_OPERATIONS_ERROR;
652                         }
653                         list2->count++;
654                 }
655         }
656
657         if (list2->count == 0) {
658                 talloc_free(list2);
659                 return LDB_ERR_NO_SUCH_OBJECT;
660         }
661
662         if (list2->count > 1) {
663                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
664         }
665
666         if (list->count > 0) {
667                 if (list_intersect(ldb, list, list2) == -1) {
668                         talloc_free(list2);
669                         return LDB_ERR_OPERATIONS_ERROR;
670                 }
671
672                 if (list->count == 0) {
673                         talloc_free(list->dn);
674                         talloc_free(list2);
675                         return LDB_ERR_NO_SUCH_OBJECT;
676                 }
677         } else {
678                 list->dn = talloc_move(list, &list2->dn);
679                 list->count = list2->count;
680         }
681
682         talloc_free(list2);
683
684         return LDB_SUCCESS;
685 }
686
687 /*
688   return a list of dn's that might match a indexed search or
689   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
690  */
691 static int ltdb_index_dn(struct ldb_module *module, 
692                          const struct ldb_parse_tree *tree,
693                          const struct ldb_message *index_list,
694                          struct dn_list *list)
695 {
696         int ret = LDB_ERR_OPERATIONS_ERROR;
697
698         switch (tree->operation) {
699         case LDB_OP_AND:
700                 ret = ltdb_index_dn_and(module, tree, index_list, list);
701                 break;
702
703         case LDB_OP_OR:
704                 ret = ltdb_index_dn_or(module, tree, index_list, list);
705                 break;
706
707         case LDB_OP_NOT:
708                 ret = ltdb_index_dn_not(module, tree, index_list, list);
709                 break;
710
711         case LDB_OP_EQUALITY:
712                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
713                 break;
714
715         case LDB_OP_SUBSTRING:
716         case LDB_OP_GREATER:
717         case LDB_OP_LESS:
718         case LDB_OP_PRESENT:
719         case LDB_OP_APPROX:
720         case LDB_OP_EXTENDED:
721                 /* we can't index with fancy bitops yet */
722                 ret = LDB_ERR_OPERATIONS_ERROR;
723                 break;
724         }
725
726         return ret;
727 }
728
729 /*
730   filter a candidate dn_list from an indexed search into a set of results
731   extracting just the given attributes
732 */
733 static int ltdb_index_filter(const struct dn_list *dn_list, 
734                              struct ldb_handle *handle)
735 {
736         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
737         struct ldb_reply *ares = NULL;
738         unsigned int i;
739
740         for (i = 0; i < dn_list->count; i++) {
741                 struct ldb_dn *dn;
742                 int ret;
743
744                 ares = talloc_zero(ac, struct ldb_reply);
745                 if (!ares) {
746                         handle->status = LDB_ERR_OPERATIONS_ERROR;
747                         handle->state = LDB_ASYNC_DONE;
748                         return LDB_ERR_OPERATIONS_ERROR;
749                 }
750
751                 ares->message = ldb_msg_new(ares);
752                 if (!ares->message) {
753                         handle->status = LDB_ERR_OPERATIONS_ERROR;
754                         handle->state = LDB_ASYNC_DONE;
755                         talloc_free(ares);
756                         return LDB_ERR_OPERATIONS_ERROR;
757                 }
758
759
760                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
761                 if (dn == NULL) {
762                         talloc_free(ares);
763                         return LDB_ERR_OPERATIONS_ERROR;
764                 }
765
766                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
767                 talloc_free(dn);
768                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
769                         /* the record has disappeared? yes, this can happen */
770                         talloc_free(ares);
771                         continue;
772                 }
773
774                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
775                         /* an internal error */
776                         talloc_free(ares);
777                         return LDB_ERR_OPERATIONS_ERROR;
778                 }
779
780                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
781                         talloc_free(ares);
782                         continue;
783                 }
784
785                 /* filter the attributes that the user wants */
786                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
787
788                 if (ret == -1) {
789                         handle->status = LDB_ERR_OPERATIONS_ERROR;
790                         handle->state = LDB_ASYNC_DONE;
791                         talloc_free(ares);
792                         return LDB_ERR_OPERATIONS_ERROR;
793                 }
794
795                 ares->type = LDB_REPLY_ENTRY;
796                 handle->state = LDB_ASYNC_PENDING;
797                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
798
799                 if (handle->status != LDB_SUCCESS) {
800                         handle->state = LDB_ASYNC_DONE;
801                         return handle->status;
802                 }
803         }
804
805         return LDB_SUCCESS;
806 }
807
808 /*
809   search the database with a LDAP-like expression using indexes
810   returns -1 if an indexed search is not possible, in which
811   case the caller should call ltdb_search_full() 
812 */
813 int ltdb_search_indexed(struct ldb_handle *handle)
814 {
815         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
816         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
817         struct dn_list *dn_list;
818         int ret, idxattr, idxone;
819
820         idxattr = idxone = 0;
821         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
822         if (ret == 0 ) {
823                 idxattr = 1;
824         }
825
826         /* We do one level indexing only if requested */
827         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
828         if (ret == 0 ) {
829                 idxone = 1;
830         }
831
832         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
833             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
834                 /* no indexs? must do full search */
835                 return LDB_ERR_OPERATIONS_ERROR;
836         }
837
838         ret = LDB_ERR_OPERATIONS_ERROR;
839
840         dn_list = talloc_zero(handle, struct dn_list);
841         if (dn_list == NULL) {
842                 return LDB_ERR_OPERATIONS_ERROR;
843         }
844
845         if (ac->scope == LDB_SCOPE_BASE) {
846                 /* with BASE searches only one DN can match */
847                 dn_list->dn = talloc_array(dn_list, char *, 1);
848                 if (dn_list->dn == NULL) {
849                         ldb_oom(ac->module->ldb);
850                         return LDB_ERR_OPERATIONS_ERROR;
851                 }
852                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
853                 if (dn_list->dn[0] == NULL) {
854                         ldb_oom(ac->module->ldb);
855                         return LDB_ERR_OPERATIONS_ERROR;
856                 }
857                 dn_list->count = 1;
858                 ret = LDB_SUCCESS;
859         }
860
861         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
862                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
863
864                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
865                         talloc_free(dn_list);
866                         return ret;
867                 }
868         }
869
870         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
871                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
872         }
873
874         if (ret == LDB_SUCCESS) {
875                 /* we've got a candidate list - now filter by the full tree
876                    and extract the needed attributes */
877                 ret = ltdb_index_filter(dn_list, handle);
878                 handle->status = ret;
879                 handle->state = LDB_ASYNC_DONE;
880         }
881
882         talloc_free(dn_list);
883
884         return ret;
885 }
886
887 /*
888   add a index element where this is the first indexed DN for this value
889 */
890 static int ltdb_index_add1_new(struct ldb_context *ldb, 
891                                struct ldb_message *msg,
892                                const char *dn)
893 {
894         struct ldb_message_element *el;
895
896         /* add another entry */
897         el = talloc_realloc(msg, msg->elements, 
898                                struct ldb_message_element, msg->num_elements+1);
899         if (!el) {
900                 return LDB_ERR_OPERATIONS_ERROR;
901         }
902
903         msg->elements = el;
904         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
905         if (!msg->elements[msg->num_elements].name) {
906                 return LDB_ERR_OPERATIONS_ERROR;
907         }
908         msg->elements[msg->num_elements].num_values = 0;
909         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
910         if (!msg->elements[msg->num_elements].values) {
911                 return LDB_ERR_OPERATIONS_ERROR;
912         }
913         msg->elements[msg->num_elements].values[0].length = strlen(dn);
914         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
915         msg->elements[msg->num_elements].num_values = 1;
916         msg->num_elements++;
917
918         return LDB_SUCCESS;
919 }
920
921
922 /*
923   add a index element where this is not the first indexed DN for this
924   value
925 */
926 static int ltdb_index_add1_add(struct ldb_context *ldb, 
927                                struct ldb_message *msg,
928                                int idx,
929                                const char *dn)
930 {
931         struct ldb_val *v2;
932         unsigned int i;
933
934         /* for multi-valued attributes we can end up with repeats */
935         for (i=0;i<msg->elements[idx].num_values;i++) {
936                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
937                         return LDB_SUCCESS;
938                 }
939         }
940
941         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
942                               struct ldb_val, 
943                               msg->elements[idx].num_values+1);
944         if (!v2) {
945                 return LDB_ERR_OPERATIONS_ERROR;
946         }
947         msg->elements[idx].values = v2;
948
949         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
950         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
951         msg->elements[idx].num_values++;
952
953         return 0;
954 }
955
956 /*
957   add an index entry for one message element
958 */
959 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
960                            struct ldb_message_element *el, int v_idx)
961 {
962         struct ldb_context *ldb = module->ldb;
963         struct ldb_message *msg;
964         struct ldb_dn *dn_key;
965         int ret;
966         unsigned int i;
967
968         msg = talloc(module, struct ldb_message);
969         if (msg == NULL) {
970                 errno = ENOMEM;
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973
974         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
975         if (!dn_key) {
976                 talloc_free(msg);
977                 return LDB_ERR_OPERATIONS_ERROR;
978         }
979         talloc_steal(msg, dn_key);
980
981         ret = ltdb_search_dn1(module, dn_key, msg);
982         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
983                 talloc_free(msg);
984                 return ret;
985         }
986
987         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
988                 msg->dn = dn_key;
989                 msg->num_elements = 0;
990                 msg->elements = NULL;
991         }
992
993         for (i=0;i<msg->num_elements;i++) {
994                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
995                         break;
996                 }
997         }
998
999         if (i == msg->num_elements) {
1000                 ret = ltdb_index_add1_new(ldb, msg, dn);
1001         } else {
1002                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1003         }
1004
1005         if (ret == LDB_SUCCESS) {
1006                 ret = ltdb_store(module, msg, TDB_REPLACE);
1007         }
1008
1009         talloc_free(msg);
1010
1011         return ret;
1012 }
1013
1014 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1015                            struct ldb_message_element *elements, int num_el)
1016 {
1017         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1018         int ret;
1019         unsigned int i, j;
1020
1021         if (dn[0] == '@') {
1022                 return LDB_SUCCESS;
1023         }
1024
1025         if (ltdb->cache->indexlist->num_elements == 0) {
1026                 /* no indexed fields */
1027                 return LDB_SUCCESS;
1028         }
1029
1030         for (i = 0; i < num_el; i++) {
1031                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
1032                                        NULL, LTDB_IDXATTR);
1033                 if (ret == -1) {
1034                         continue;
1035                 }
1036                 for (j = 0; j < elements[i].num_values; j++) {
1037                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1038                         if (ret != LDB_SUCCESS) {
1039                                 return ret;
1040                         }
1041                 }
1042         }
1043
1044         return LDB_SUCCESS;
1045 }
1046
1047 /*
1048   add the index entries for a new record
1049 */
1050 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1051 {
1052         const char *dn;
1053         int ret;
1054
1055         dn = ldb_dn_get_linearized(msg->dn);
1056         if (dn == NULL) {
1057                 return LDB_ERR_OPERATIONS_ERROR;
1058         }
1059
1060         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1061
1062         return ret;
1063 }
1064
1065
1066 /*
1067   delete an index entry for one message element
1068 */
1069 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
1070                          struct ldb_message_element *el, int v_idx)
1071 {
1072         struct ldb_context *ldb = module->ldb;
1073         struct ldb_message *msg;
1074         struct ldb_dn *dn_key;
1075         int ret, i;
1076         unsigned int j;
1077
1078         if (dn[0] == '@') {
1079                 return LDB_SUCCESS;
1080         }
1081
1082         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1083         if (!dn_key) {
1084                 return LDB_ERR_OPERATIONS_ERROR;
1085         }
1086
1087         msg = talloc(dn_key, struct ldb_message);
1088         if (msg == NULL) {
1089                 talloc_free(dn_key);
1090                 return LDB_ERR_OPERATIONS_ERROR;
1091         }
1092
1093         ret = ltdb_search_dn1(module, dn_key, msg);
1094         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1095                 talloc_free(dn_key);
1096                 return ret;
1097         }
1098
1099         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1100                 /* it wasn't indexed. Did we have an earlier error? If we did then
1101                    its gone now */
1102                 talloc_free(dn_key);
1103                 return LDB_SUCCESS;
1104         }
1105
1106         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1107         if (i == -1) {
1108                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1109                                 "ERROR: dn %s not found in %s\n", dn,
1110                                 ldb_dn_get_linearized(dn_key));
1111                 /* it ain't there. hmmm */
1112                 talloc_free(dn_key);
1113                 return LDB_SUCCESS;
1114         }
1115
1116         if (j != msg->elements[i].num_values - 1) {
1117                 memmove(&msg->elements[i].values[j], 
1118                         &msg->elements[i].values[j+1], 
1119                         (msg->elements[i].num_values-(j+1)) * 
1120                         sizeof(msg->elements[i].values[0]));
1121         }
1122         msg->elements[i].num_values--;
1123
1124         if (msg->elements[i].num_values == 0) {
1125                 ret = ltdb_delete_noindex(module, dn_key);
1126         } else {
1127                 ret = ltdb_store(module, msg, TDB_REPLACE);
1128         }
1129
1130         talloc_free(dn_key);
1131
1132         return ret;
1133 }
1134
1135 /*
1136   delete the index entries for a record
1137   return -1 on failure
1138 */
1139 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1140 {
1141         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1142         int ret;
1143         const char *dn;
1144         unsigned int i, j;
1145
1146         /* find the list of indexed fields */   
1147         if (ltdb->cache->indexlist->num_elements == 0) {
1148                 /* no indexed fields */
1149                 return LDB_SUCCESS;
1150         }
1151
1152         if (ldb_dn_is_special(msg->dn)) {
1153                 return LDB_SUCCESS;
1154         }
1155
1156         dn = ldb_dn_get_linearized(msg->dn);
1157         if (dn == NULL) {
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160
1161         for (i = 0; i < msg->num_elements; i++) {
1162                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1163                                        NULL, LTDB_IDXATTR);
1164                 if (ret == -1) {
1165                         continue;
1166                 }
1167                 for (j = 0; j < msg->elements[i].num_values; j++) {
1168                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1169                         if (ret != LDB_SUCCESS) {
1170                                 return ret;
1171                         }
1172                 }
1173         }
1174
1175         return LDB_SUCCESS;
1176 }
1177
1178 /* 
1179   handle special index for one level searches
1180 */
1181 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1182 {
1183         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1184         struct ldb_message_element el;
1185         struct ldb_val val;
1186         struct ldb_dn *pdn;
1187         const char *dn;
1188         int ret;
1189
1190         /* We index for ONE Level only if requested */
1191         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1192         if (ret != 0) {
1193                 return LDB_SUCCESS;
1194         }
1195
1196         pdn = ldb_dn_get_parent(module, msg->dn);
1197         if (pdn == NULL) {
1198                 return LDB_ERR_OPERATIONS_ERROR;
1199         }
1200
1201         dn = ldb_dn_get_linearized(msg->dn);
1202         if (dn == NULL) {
1203                 talloc_free(pdn);
1204                 return LDB_ERR_OPERATIONS_ERROR;
1205         }
1206
1207         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(pdn));
1208         if (val.data == NULL) {
1209                 talloc_free(pdn);
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         val.length = strlen((char *)val.data);
1214         el.name = LTDB_IDXONE;
1215         el.values = &val;
1216         el.num_values = 1;
1217
1218         if (add) {
1219                 ret = ltdb_index_add1(module, dn, &el, 0);
1220         } else { /* delete */
1221                 ret = ltdb_index_del_value(module, dn, &el, 0);
1222         }
1223
1224         talloc_free(pdn);
1225
1226         return ret;
1227 }
1228
1229
1230 /*
1231   traversal function that deletes all @INDEX records
1232 */
1233 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1234 {
1235         const char *dn = "DN=" LTDB_INDEX ":";
1236         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1237                 return tdb_delete(tdb, key);
1238         }
1239         return 0;
1240 }
1241
1242 /*
1243   traversal function that adds @INDEX records during a re index
1244 */
1245 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1246 {
1247         struct ldb_module *module = (struct ldb_module *)state;
1248         struct ldb_message *msg;
1249         const char *dn = NULL;
1250         int ret;
1251         TDB_DATA key2;
1252
1253         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1254             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1255                 return 0;
1256         }
1257
1258         msg = talloc(module, struct ldb_message);
1259         if (msg == NULL) {
1260                 return -1;
1261         }
1262
1263         ret = ltdb_unpack_data(module, &data, msg);
1264         if (ret != 0) {
1265                 talloc_free(msg);
1266                 return -1;
1267         }
1268
1269         /* check if the DN key has changed, perhaps due to the 
1270            case insensitivity of an element changing */
1271         key2 = ltdb_key(module, msg->dn);
1272         if (key2.dptr == NULL) {
1273                 /* probably a corrupt record ... darn */
1274                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1275                                                         ldb_dn_get_linearized(msg->dn));
1276                 talloc_free(msg);
1277                 return 0;
1278         }
1279         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1280                 tdb_delete(tdb, key);
1281                 tdb_store(tdb, key2, data, 0);
1282         }
1283         talloc_free(key2.dptr);
1284
1285         if (msg->dn == NULL) {
1286                 dn = (char *)key.dptr + 3;
1287         } else {
1288                 dn = ldb_dn_get_linearized(msg->dn);
1289         }
1290
1291         ret = ltdb_index_one(module, msg, 1);
1292         if (ret == LDB_SUCCESS) {
1293                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1294         } else {
1295                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1296                         "Adding special ONE LEVEL index failed (%s)!\n",
1297                         ldb_dn_get_linearized(msg->dn));
1298         }
1299
1300         talloc_free(msg);
1301
1302         if (ret != LDB_SUCCESS) return -1;
1303
1304         return 0;
1305 }
1306
1307 /*
1308   force a complete reindex of the database
1309 */
1310 int ltdb_reindex(struct ldb_module *module)
1311 {
1312         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1313         int ret;
1314
1315         if (ltdb_cache_reload(module) != 0) {
1316                 return LDB_ERR_OPERATIONS_ERROR;
1317         }
1318
1319         /* first traverse the database deleting any @INDEX records */
1320         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1321         if (ret == -1) {
1322                 return LDB_ERR_OPERATIONS_ERROR;
1323         }
1324
1325         /* if we don't have indexes we have nothing todo */
1326         if (ltdb->cache->indexlist->num_elements == 0) {
1327                 return LDB_SUCCESS;
1328         }
1329
1330         /* now traverse adding any indexes for normal LDB records */
1331         ret = tdb_traverse(ltdb->tdb, re_index, module);
1332         if (ret == -1) {
1333                 return LDB_ERR_OPERATIONS_ERROR;
1334         }
1335
1336         return LDB_SUCCESS;
1337 }