r23795: more v2->v3 conversion
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "ldb_includes.h"
36
37 #include "ldb_tdb.h"
38
39 /*
40   find an element in a list, using the given comparison function and
41   assuming that the list is already sorted using comp_fn
42
43   return -1 if not found, or the index of the first occurance of needle if found
44 */
45 static int ldb_list_find(const void *needle, 
46                          const void *base, size_t nmemb, size_t size, 
47                          comparison_fn_t comp_fn)
48 {
49         const char *base_p = base;
50         size_t min_i, max_i, test_i;
51
52         if (nmemb == 0) {
53                 return -1;
54         }
55
56         min_i = 0;
57         max_i = nmemb-1;
58
59         while (min_i < max_i) {
60                 int r;
61
62                 test_i = (min_i + max_i) / 2;
63                 /* the following cast looks strange, but is
64                  correct. The key to understanding it is that base_p
65                  is a pointer to an array of pointers, so we have to
66                  dereference it after casting to void **. The strange
67                  const in the middle gives us the right type of pointer
68                  after the dereference  (tridge) */
69                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
70                 if (r == 0) {
71                         /* scan back for first element */
72                         while (test_i > 0 &&
73                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
74                                 test_i--;
75                         }
76                         return test_i;
77                 }
78                 if (r < 0) {
79                         if (test_i == 0) {
80                                 return -1;
81                         }
82                         max_i = test_i - 1;
83                 }
84                 if (r > 0) {
85                         min_i = test_i + 1;
86                 }
87         }
88
89         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
90                 return min_i;
91         }
92
93         return -1;
94 }
95
96 struct dn_list {
97         unsigned int count;
98         char **dn;
99 };
100
101 /*
102   return the dn key to be used for an index
103   caller frees
104 */
105 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
106                                      const char *attr, const struct ldb_val *value)
107 {
108         struct ldb_dn *ret;
109         struct ldb_val v;
110         const struct ldb_schema_attribute *a;
111         char *attr_folded;
112         int r;
113
114         attr_folded = ldb_attr_casefold(ldb, attr);
115         if (!attr_folded) {
116                 return NULL;
117         }
118
119         a = ldb_schema_attribute_by_name(ldb, attr);
120         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
121         if (r != LDB_SUCCESS) {
122                 const char *errstr = ldb_errstring(ldb);
123                 /* canonicalisation can be refused. For example, 
124                    a attribute that takes wildcards will refuse to canonicalise
125                    if the value contains a wildcard */
126                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
127                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
128                 talloc_free(attr_folded);
129                 return NULL;
130         }
131         if (ldb_should_b64_encode(&v)) {
132                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
133                 if (!vstr) return NULL;
134                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
135                 talloc_free(vstr);
136         } else {
137                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138         }
139
140         if (v.data != value->data) {
141                 talloc_free(v.data);
142         }
143         talloc_free(attr_folded);
144
145         return ret;
146 }
147
148 /*
149   see if a attribute value is in the list of indexed attributes
150 */
151 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
152                             unsigned int *v_idx, const char *key)
153 {
154         unsigned int i, j;
155         for (i=0;i<msg->num_elements;i++) {
156                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
157                         const struct ldb_message_element *el = &msg->elements[i];
158
159                         if (attr == NULL) {
160                                 /* in this case we are just looking to see if key is present,
161                                    we are not spearching for a specific index */
162                                 return 0;
163                         }
164
165                         for (j=0;j<el->num_values;j++) {
166                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
167                                         if (v_idx) {
168                                                 *v_idx = j;
169                                         }
170                                         return i;
171                                 }
172                         }
173                 }
174         }
175         return -1;
176 }
177
178 /* used in sorting dn lists */
179 static int list_cmp(const char **s1, const char **s2)
180 {
181         return strcmp(*s1, *s2);
182 }
183
184 /*
185   return a list of dn's that might match a simple indexed search or
186  */
187 static int ltdb_index_dn_simple(struct ldb_module *module, 
188                                 const struct ldb_parse_tree *tree,
189                                 const struct ldb_message *index_list,
190                                 struct dn_list *list)
191 {
192         struct ldb_context *ldb = module->ldb;
193         struct ldb_dn *dn;
194         int ret;
195         unsigned int i, j;
196         struct ldb_message *msg;
197
198         list->count = 0;
199         list->dn = NULL;
200
201         /* if the attribute isn't in the list of indexed attributes then
202            this node needs a full search */
203         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
204                 return LDB_ERR_OPERATIONS_ERROR;
205         }
206
207         /* the attribute is indexed. Pull the list of DNs that match the 
208            search criterion */
209         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
210         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
211
212         msg = talloc(list, struct ldb_message);
213         if (msg == NULL) {
214                 return LDB_ERR_OPERATIONS_ERROR;
215         }
216
217         ret = ltdb_search_dn1(module, dn, msg);
218         talloc_free(dn);
219         if (ret != LDB_SUCCESS) {
220                 return ret;
221         }
222
223         for (i=0;i<msg->num_elements;i++) {
224                 struct ldb_message_element *el;
225
226                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
227                         continue;
228                 }
229
230                 el = &msg->elements[i];
231
232                 list->dn = talloc_array(list, char *, el->num_values);
233                 if (!list->dn) {
234                         talloc_free(msg);
235                         return LDB_ERR_OPERATIONS_ERROR;
236                 }
237
238                 for (j=0;j<el->num_values;j++) {
239                         list->dn[list->count] = 
240                                 talloc_strdup(list->dn, (char *)el->values[j].data);
241                         if (!list->dn[list->count]) {
242                                 talloc_free(msg);
243                                 return LDB_ERR_OPERATIONS_ERROR;
244                         }
245                         list->count++;
246                 }
247         }
248
249         talloc_free(msg);
250
251         if (list->count > 1) {
252                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
253         }
254
255         return LDB_SUCCESS;
256 }
257
258
259 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
260
261 /*
262   return a list of dn's that might match a simple indexed search on
263   the special objectclass attribute
264  */
265 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
266                                      const struct ldb_parse_tree *tree,
267                                      const struct ldb_message *index_list,
268                                      struct dn_list *list)
269 {
270         struct ldb_context *ldb = module->ldb;
271         unsigned int i;
272         int ret;
273         const char *target = (const char *)tree->u.equality.value.data;
274         const char **subclasses;
275
276         list->count = 0;
277         list->dn = NULL;
278
279         ret = ltdb_index_dn_simple(module, tree, index_list, list);
280
281         subclasses = ldb_subclass_list(module->ldb, target);
282
283         if (subclasses == NULL) {
284                 return ret;
285         }
286
287         for (i=0;subclasses[i];i++) {
288                 struct ldb_parse_tree tree2;
289                 struct dn_list *list2;
290                 tree2.operation = LDB_OP_EQUALITY;
291                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
292                 if (!tree2.u.equality.attr) {
293                         return LDB_ERR_OPERATIONS_ERROR;
294                 }
295                 tree2.u.equality.value.data = 
296                         (uint8_t *)talloc_strdup(list, subclasses[i]);
297                 if (tree2.u.equality.value.data == NULL) {
298                         return LDB_ERR_OPERATIONS_ERROR;                        
299                 }
300                 tree2.u.equality.value.length = strlen(subclasses[i]);
301                 list2 = talloc(list, struct dn_list);
302                 if (list2 == NULL) {
303                         talloc_free(tree2.u.equality.value.data);
304                         return LDB_ERR_OPERATIONS_ERROR;
305                 }
306                 if (ltdb_index_dn_objectclass(module, &tree2, 
307                                               index_list, list2) == LDB_SUCCESS) {
308                         if (list->count == 0) {
309                                 *list = *list2;
310                                 ret = LDB_SUCCESS;
311                         } else {
312                                 list_union(ldb, list, list2);
313                                 talloc_free(list2);
314                         }
315                 }
316                 talloc_free(tree2.u.equality.value.data);
317         }
318
319         return ret;
320 }
321
322 /*
323   return a list of dn's that might match a leaf indexed search
324  */
325 static int ltdb_index_dn_leaf(struct ldb_module *module, 
326                               const struct ldb_parse_tree *tree,
327                               const struct ldb_message *index_list,
328                               struct dn_list *list)
329 {
330         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
331                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
332         }
333         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
334                 list->dn = talloc_array(list, char *, 1);
335                 if (list->dn == NULL) {
336                         ldb_oom(module->ldb);
337                         return LDB_ERR_OPERATIONS_ERROR;
338                 }
339                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
340                 if (list->dn[0] == NULL) {
341                         ldb_oom(module->ldb);
342                         return LDB_ERR_OPERATIONS_ERROR;
343                 }
344                 list->count = 1;
345                 return LDB_SUCCESS;
346         }
347         return ltdb_index_dn_simple(module, tree, index_list, list);
348 }
349
350
351 /*
352   list intersection
353   list = list & list2
354   relies on the lists being sorted
355 */
356 static int list_intersect(struct ldb_context *ldb,
357                           struct dn_list *list, const struct dn_list *list2)
358 {
359         struct dn_list *list3;
360         unsigned int i;
361
362         if (list->count == 0 || list2->count == 0) {
363                 /* 0 & X == 0 */
364                 return LDB_ERR_NO_SUCH_OBJECT;
365         }
366
367         list3 = talloc(ldb, struct dn_list);
368         if (list3 == NULL) {
369                 return LDB_ERR_OPERATIONS_ERROR;
370         }
371
372         list3->dn = talloc_array(list3, char *, list->count);
373         if (!list3->dn) {
374                 talloc_free(list3);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         list3->count = 0;
378
379         for (i=0;i<list->count;i++) {
380                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
381                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
382                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
383                         list3->count++;
384                 } else {
385                         talloc_free(list->dn[i]);
386                 }               
387         }
388
389         talloc_free(list->dn);
390         list->dn = talloc_move(list, &list3->dn);
391         list->count = list3->count;
392         talloc_free(list3);
393
394         return LDB_ERR_NO_SUCH_OBJECT;
395 }
396
397
398 /*
399   list union
400   list = list | list2
401   relies on the lists being sorted
402 */
403 static int list_union(struct ldb_context *ldb, 
404                       struct dn_list *list, const struct dn_list *list2)
405 {
406         unsigned int i;
407         char **d;
408         unsigned int count = list->count;
409
410         if (list->count == 0 && list2->count == 0) {
411                 /* 0 | 0 == 0 */
412                 return LDB_ERR_NO_SUCH_OBJECT;
413         }
414
415         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
416         if (!d) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419         list->dn = d;
420
421         for (i=0;i<list2->count;i++) {
422                 if (ldb_list_find(list2->dn[i], list->dn, count, 
423                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
424                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
425                         if (!list->dn[list->count]) {
426                                 return LDB_ERR_OPERATIONS_ERROR;
427                         }
428                         list->count++;
429                 }               
430         }
431
432         if (list->count != count) {
433                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
434         }
435
436         return LDB_ERR_NO_SUCH_OBJECT;
437 }
438
439 static int ltdb_index_dn(struct ldb_module *module, 
440                          const struct ldb_parse_tree *tree,
441                          const struct ldb_message *index_list,
442                          struct dn_list *list);
443
444
445 /*
446   OR two index results
447  */
448 static int ltdb_index_dn_or(struct ldb_module *module, 
449                             const struct ldb_parse_tree *tree,
450                             const struct ldb_message *index_list,
451                             struct dn_list *list)
452 {
453         struct ldb_context *ldb = module->ldb;
454         unsigned int i;
455         int ret;
456         
457         ret = LDB_ERR_OPERATIONS_ERROR;
458         list->dn = NULL;
459         list->count = 0;
460
461         for (i=0;i<tree->u.list.num_elements;i++) {
462                 struct dn_list *list2;
463                 int v;
464
465                 list2 = talloc(module, struct dn_list);
466                 if (list2 == NULL) {
467                         return LDB_ERR_OPERATIONS_ERROR;
468                 }
469
470                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
471
472                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
473                         /* 0 || X == X */
474                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
475                                 ret = v;
476                         }
477                         talloc_free(list2);
478                         continue;
479                 }
480
481                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
482                         /* 1 || X == 1 */
483                         talloc_free(list->dn);
484                         talloc_free(list2);
485                         return v;
486                 }
487
488                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
489                         ret = LDB_SUCCESS;
490                         list->dn = talloc_move(list, &list2->dn);
491                         list->count = list2->count;
492                 } else {
493                         if (list_union(ldb, list, list2) == -1) {
494                                 talloc_free(list2);
495                                 return LDB_ERR_OPERATIONS_ERROR;
496                         }
497                         ret = LDB_SUCCESS;
498                 }
499                 talloc_free(list2);
500         }
501
502         if (list->count == 0) {
503                 return LDB_ERR_NO_SUCH_OBJECT;
504         }
505
506         return ret;
507 }
508
509
510 /*
511   NOT an index results
512  */
513 static int ltdb_index_dn_not(struct ldb_module *module, 
514                              const struct ldb_parse_tree *tree,
515                              const struct ldb_message *index_list,
516                              struct dn_list *list)
517 {
518         /* the only way to do an indexed not would be if we could
519            negate the not via another not or if we knew the total
520            number of database elements so we could know that the
521            existing expression covered the whole database. 
522            
523            instead, we just give up, and rely on a full index scan
524            (unless an outer & manages to reduce the list)
525         */
526         return LDB_ERR_OPERATIONS_ERROR;
527 }
528
529 /*
530   AND two index results
531  */
532 static int ltdb_index_dn_and(struct ldb_module *module, 
533                              const struct ldb_parse_tree *tree,
534                              const struct ldb_message *index_list,
535                              struct dn_list *list)
536 {
537         struct ldb_context *ldb = module->ldb;
538         unsigned int i;
539         int ret;
540         
541         ret = LDB_ERR_OPERATIONS_ERROR;
542         list->dn = NULL;
543         list->count = 0;
544
545         for (i=0;i<tree->u.list.num_elements;i++) {
546                 struct dn_list *list2;
547                 int v;
548
549                 list2 = talloc(module, struct dn_list);
550                 if (list2 == NULL) {
551                         return LDB_ERR_OPERATIONS_ERROR;
552                 }
553
554                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
555
556                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
557                         /* 0 && X == 0 */
558                         talloc_free(list->dn);
559                         talloc_free(list2);
560                         return LDB_ERR_NO_SUCH_OBJECT;
561                 }
562
563                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
564                         talloc_free(list2);
565                         continue;
566                 }
567
568                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
569                         ret = LDB_SUCCESS;
570                         talloc_free(list->dn);
571                         list->dn = talloc_move(list, &list2->dn);
572                         list->count = list2->count;
573                 } else {
574                         if (list_intersect(ldb, list, list2) == -1) {
575                                 talloc_free(list2);
576                                 return LDB_ERR_OPERATIONS_ERROR;
577                         }
578                 }
579
580                 talloc_free(list2);
581
582                 if (list->count == 0) {
583                         talloc_free(list->dn);
584                         return LDB_ERR_NO_SUCH_OBJECT;
585                 }
586         }
587
588         return ret;
589 }
590
591 /*
592   AND index results and ONE level special index
593  */
594 static int ltdb_index_dn_one(struct ldb_module *module, 
595                              struct ldb_dn *parent_dn,
596                              struct dn_list *list)
597 {
598         struct ldb_context *ldb = module->ldb;
599         struct dn_list *list2;
600         struct ldb_message *msg;
601         struct ldb_dn *key;
602         struct ldb_val val;
603         unsigned int i, j;
604         int ret;
605         
606         list2 = talloc_zero(module, struct dn_list);
607         if (list2 == NULL) {
608                 return LDB_ERR_OPERATIONS_ERROR;
609         }
610
611         /* the attribute is indexed. Pull the list of DNs that match the 
612            search criterion */
613         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(parent_dn));
614         val.length = strlen((char *)val.data);
615         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
616         if (!key) {
617                 talloc_free(list2);
618                 return LDB_ERR_OPERATIONS_ERROR;
619         }
620
621         msg = talloc(list2, struct ldb_message);
622         if (msg == NULL) {
623                 talloc_free(list2);
624                 return LDB_ERR_OPERATIONS_ERROR;
625         }
626
627         ret = ltdb_search_dn1(module, key, msg);
628         talloc_free(key);
629         if (ret != LDB_SUCCESS) {
630                 return ret;
631         }
632
633         for (i = 0; i < msg->num_elements; i++) {
634                 struct ldb_message_element *el;
635
636                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
637                         continue;
638                 }
639
640                 el = &msg->elements[i];
641
642                 list2->dn = talloc_array(list2, char *, el->num_values);
643                 if (!list2->dn) {
644                         talloc_free(list2);
645                         return LDB_ERR_OPERATIONS_ERROR;
646                 }
647
648                 for (j = 0; j < el->num_values; j++) {
649                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
650                         if (!list2->dn[list2->count]) {
651                                 talloc_free(list2);
652                                 return LDB_ERR_OPERATIONS_ERROR;
653                         }
654                         list2->count++;
655                 }
656         }
657
658         if (list2->count == 0) {
659                 talloc_free(list2);
660                 return LDB_ERR_NO_SUCH_OBJECT;
661         }
662
663         if (list2->count > 1) {
664                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
665         }
666
667         if (list->count > 0) {
668                 if (list_intersect(ldb, list, list2) == -1) {
669                         talloc_free(list2);
670                         return LDB_ERR_OPERATIONS_ERROR;
671                 }
672
673                 if (list->count == 0) {
674                         talloc_free(list->dn);
675                         talloc_free(list2);
676                         return LDB_ERR_NO_SUCH_OBJECT;
677                 }
678         } else {
679                 list->dn = talloc_move(list, &list2->dn);
680                 list->count = list2->count;
681         }
682
683         talloc_free(list2);
684
685         return LDB_SUCCESS;
686 }
687
688 /*
689   return a list of dn's that might match a indexed search or
690   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
691  */
692 static int ltdb_index_dn(struct ldb_module *module, 
693                          const struct ldb_parse_tree *tree,
694                          const struct ldb_message *index_list,
695                          struct dn_list *list)
696 {
697         int ret = LDB_ERR_OPERATIONS_ERROR;
698
699         switch (tree->operation) {
700         case LDB_OP_AND:
701                 ret = ltdb_index_dn_and(module, tree, index_list, list);
702                 break;
703
704         case LDB_OP_OR:
705                 ret = ltdb_index_dn_or(module, tree, index_list, list);
706                 break;
707
708         case LDB_OP_NOT:
709                 ret = ltdb_index_dn_not(module, tree, index_list, list);
710                 break;
711
712         case LDB_OP_EQUALITY:
713                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
714                 break;
715
716         case LDB_OP_SUBSTRING:
717         case LDB_OP_GREATER:
718         case LDB_OP_LESS:
719         case LDB_OP_PRESENT:
720         case LDB_OP_APPROX:
721         case LDB_OP_EXTENDED:
722                 /* we can't index with fancy bitops yet */
723                 ret = LDB_ERR_OPERATIONS_ERROR;
724                 break;
725         }
726
727         return ret;
728 }
729
730 /*
731   filter a candidate dn_list from an indexed search into a set of results
732   extracting just the given attributes
733 */
734 static int ltdb_index_filter(const struct dn_list *dn_list, 
735                              struct ldb_handle *handle)
736 {
737         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
738         struct ldb_reply *ares = NULL;
739         unsigned int i;
740
741         for (i = 0; i < dn_list->count; i++) {
742                 struct ldb_dn *dn;
743                 int ret;
744
745                 ares = talloc_zero(ac, struct ldb_reply);
746                 if (!ares) {
747                         handle->status = LDB_ERR_OPERATIONS_ERROR;
748                         handle->state = LDB_ASYNC_DONE;
749                         return LDB_ERR_OPERATIONS_ERROR;
750                 }
751
752                 ares->message = ldb_msg_new(ares);
753                 if (!ares->message) {
754                         handle->status = LDB_ERR_OPERATIONS_ERROR;
755                         handle->state = LDB_ASYNC_DONE;
756                         talloc_free(ares);
757                         return LDB_ERR_OPERATIONS_ERROR;
758                 }
759
760
761                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
762                 if (dn == NULL) {
763                         talloc_free(ares);
764                         return LDB_ERR_OPERATIONS_ERROR;
765                 }
766
767                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
768                 talloc_free(dn);
769                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
770                         /* the record has disappeared? yes, this can happen */
771                         talloc_free(ares);
772                         continue;
773                 }
774
775                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
776                         /* an internal error */
777                         talloc_free(ares);
778                         return LDB_ERR_OPERATIONS_ERROR;
779                 }
780
781                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
782                         talloc_free(ares);
783                         continue;
784                 }
785
786                 /* filter the attributes that the user wants */
787                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
788
789                 if (ret == -1) {
790                         handle->status = LDB_ERR_OPERATIONS_ERROR;
791                         handle->state = LDB_ASYNC_DONE;
792                         talloc_free(ares);
793                         return LDB_ERR_OPERATIONS_ERROR;
794                 }
795
796                 ares->type = LDB_REPLY_ENTRY;
797                 handle->state = LDB_ASYNC_PENDING;
798                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
799
800                 if (handle->status != LDB_SUCCESS) {
801                         handle->state = LDB_ASYNC_DONE;
802                         return handle->status;
803                 }
804         }
805
806         return LDB_SUCCESS;
807 }
808
809 /*
810   search the database with a LDAP-like expression using indexes
811   returns -1 if an indexed search is not possible, in which
812   case the caller should call ltdb_search_full() 
813 */
814 int ltdb_search_indexed(struct ldb_handle *handle)
815 {
816         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
817         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
818         struct dn_list *dn_list;
819         int ret, idxattr, idxone;
820
821         idxattr = idxone = 0;
822         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
823         if (ret == 0 ) {
824                 idxattr = 1;
825         }
826
827         /* We do one level indexing only if requested */
828         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
829         if (ret == 0 ) {
830                 idxone = 1;
831         }
832
833         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
834             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
835                 /* no indexs? must do full search */
836                 return LDB_ERR_OPERATIONS_ERROR;
837         }
838
839         ret = LDB_ERR_OPERATIONS_ERROR;
840
841         dn_list = talloc_zero(handle, struct dn_list);
842         if (dn_list == NULL) {
843                 return LDB_ERR_OPERATIONS_ERROR;
844         }
845
846         if (ac->scope == LDB_SCOPE_BASE) {
847                 /* with BASE searches only one DN can match */
848                 dn_list->dn = talloc_array(dn_list, char *, 1);
849                 if (dn_list->dn == NULL) {
850                         ldb_oom(ac->module->ldb);
851                         return LDB_ERR_OPERATIONS_ERROR;
852                 }
853                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
854                 if (dn_list->dn[0] == NULL) {
855                         ldb_oom(ac->module->ldb);
856                         return LDB_ERR_OPERATIONS_ERROR;
857                 }
858                 dn_list->count = 1;
859                 ret = LDB_SUCCESS;
860         }
861
862         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
863                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
864
865                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
866                         talloc_free(dn_list);
867                         return ret;
868                 }
869         }
870
871         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
872                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
873         }
874
875         if (ret == LDB_SUCCESS) {
876                 /* we've got a candidate list - now filter by the full tree
877                    and extract the needed attributes */
878                 ret = ltdb_index_filter(dn_list, handle);
879                 handle->status = ret;
880                 handle->state = LDB_ASYNC_DONE;
881         }
882
883         talloc_free(dn_list);
884
885         return ret;
886 }
887
888 /*
889   add a index element where this is the first indexed DN for this value
890 */
891 static int ltdb_index_add1_new(struct ldb_context *ldb, 
892                                struct ldb_message *msg,
893                                const char *dn)
894 {
895         struct ldb_message_element *el;
896
897         /* add another entry */
898         el = talloc_realloc(msg, msg->elements, 
899                                struct ldb_message_element, msg->num_elements+1);
900         if (!el) {
901                 return LDB_ERR_OPERATIONS_ERROR;
902         }
903
904         msg->elements = el;
905         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
906         if (!msg->elements[msg->num_elements].name) {
907                 return LDB_ERR_OPERATIONS_ERROR;
908         }
909         msg->elements[msg->num_elements].num_values = 0;
910         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
911         if (!msg->elements[msg->num_elements].values) {
912                 return LDB_ERR_OPERATIONS_ERROR;
913         }
914         msg->elements[msg->num_elements].values[0].length = strlen(dn);
915         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
916         msg->elements[msg->num_elements].num_values = 1;
917         msg->num_elements++;
918
919         return LDB_SUCCESS;
920 }
921
922
923 /*
924   add a index element where this is not the first indexed DN for this
925   value
926 */
927 static int ltdb_index_add1_add(struct ldb_context *ldb, 
928                                struct ldb_message *msg,
929                                int idx,
930                                const char *dn)
931 {
932         struct ldb_val *v2;
933         unsigned int i;
934
935         /* for multi-valued attributes we can end up with repeats */
936         for (i=0;i<msg->elements[idx].num_values;i++) {
937                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
938                         return LDB_SUCCESS;
939                 }
940         }
941
942         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
943                               struct ldb_val, 
944                               msg->elements[idx].num_values+1);
945         if (!v2) {
946                 return LDB_ERR_OPERATIONS_ERROR;
947         }
948         msg->elements[idx].values = v2;
949
950         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
951         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
952         msg->elements[idx].num_values++;
953
954         return 0;
955 }
956
957 /*
958   add an index entry for one message element
959 */
960 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
961                            struct ldb_message_element *el, int v_idx)
962 {
963         struct ldb_context *ldb = module->ldb;
964         struct ldb_message *msg;
965         struct ldb_dn *dn_key;
966         int ret;
967         unsigned int i;
968
969         msg = talloc(module, struct ldb_message);
970         if (msg == NULL) {
971                 errno = ENOMEM;
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974
975         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
976         if (!dn_key) {
977                 talloc_free(msg);
978                 return LDB_ERR_OPERATIONS_ERROR;
979         }
980         talloc_steal(msg, dn_key);
981
982         ret = ltdb_search_dn1(module, dn_key, msg);
983         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
984                 talloc_free(msg);
985                 return ret;
986         }
987
988         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
989                 msg->dn = dn_key;
990                 msg->num_elements = 0;
991                 msg->elements = NULL;
992         }
993
994         for (i=0;i<msg->num_elements;i++) {
995                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
996                         break;
997                 }
998         }
999
1000         if (i == msg->num_elements) {
1001                 ret = ltdb_index_add1_new(ldb, msg, dn);
1002         } else {
1003                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1004         }
1005
1006         if (ret == LDB_SUCCESS) {
1007                 ret = ltdb_store(module, msg, TDB_REPLACE);
1008         }
1009
1010         talloc_free(msg);
1011
1012         return ret;
1013 }
1014
1015 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1016                            struct ldb_message_element *elements, int num_el)
1017 {
1018         struct ltdb_private *ltdb = module->private_data;
1019         int ret;
1020         unsigned int i, j;
1021
1022         if (dn[0] == '@') {
1023                 return LDB_SUCCESS;
1024         }
1025
1026         if (ltdb->cache->indexlist->num_elements == 0) {
1027                 /* no indexed fields */
1028                 return LDB_SUCCESS;
1029         }
1030
1031         for (i = 0; i < num_el; i++) {
1032                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
1033                                        NULL, LTDB_IDXATTR);
1034                 if (ret == -1) {
1035                         continue;
1036                 }
1037                 for (j = 0; j < elements[i].num_values; j++) {
1038                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1039                         if (ret != LDB_SUCCESS) {
1040                                 return ret;
1041                         }
1042                 }
1043         }
1044
1045         return LDB_SUCCESS;
1046 }
1047
1048 /*
1049   add the index entries for a new record
1050 */
1051 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1052 {
1053         const char *dn;
1054         int ret;
1055
1056         dn = ldb_dn_get_linearized(msg->dn);
1057         if (dn == NULL) {
1058                 return LDB_ERR_OPERATIONS_ERROR;
1059         }
1060
1061         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1062
1063         return ret;
1064 }
1065
1066
1067 /*
1068   delete an index entry for one message element
1069 */
1070 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
1071                          struct ldb_message_element *el, int v_idx)
1072 {
1073         struct ldb_context *ldb = module->ldb;
1074         struct ldb_message *msg;
1075         struct ldb_dn *dn_key;
1076         int ret, i;
1077         unsigned int j;
1078
1079         if (dn[0] == '@') {
1080                 return LDB_SUCCESS;
1081         }
1082
1083         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1084         if (!dn_key) {
1085                 return LDB_ERR_OPERATIONS_ERROR;
1086         }
1087
1088         msg = talloc(dn_key, struct ldb_message);
1089         if (msg == NULL) {
1090                 talloc_free(dn_key);
1091                 return LDB_ERR_OPERATIONS_ERROR;
1092         }
1093
1094         ret = ltdb_search_dn1(module, dn_key, msg);
1095         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1096                 talloc_free(dn_key);
1097                 return ret;
1098         }
1099
1100         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1101                 /* it wasn't indexed. Did we have an earlier error? If we did then
1102                    its gone now */
1103                 talloc_free(dn_key);
1104                 return LDB_SUCCESS;
1105         }
1106
1107         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1108         if (i == -1) {
1109                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1110                                 "ERROR: dn %s not found in %s\n", dn,
1111                                 ldb_dn_get_linearized(dn_key));
1112                 /* it ain't there. hmmm */
1113                 talloc_free(dn_key);
1114                 return LDB_SUCCESS;
1115         }
1116
1117         if (j != msg->elements[i].num_values - 1) {
1118                 memmove(&msg->elements[i].values[j], 
1119                         &msg->elements[i].values[j+1], 
1120                         (msg->elements[i].num_values-(j+1)) * 
1121                         sizeof(msg->elements[i].values[0]));
1122         }
1123         msg->elements[i].num_values--;
1124
1125         if (msg->elements[i].num_values == 0) {
1126                 ret = ltdb_delete_noindex(module, dn_key);
1127         } else {
1128                 ret = ltdb_store(module, msg, TDB_REPLACE);
1129         }
1130
1131         talloc_free(dn_key);
1132
1133         return ret;
1134 }
1135
1136 /*
1137   delete the index entries for a record
1138   return -1 on failure
1139 */
1140 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1141 {
1142         struct ltdb_private *ltdb = module->private_data;
1143         int ret;
1144         const char *dn;
1145         unsigned int i, j;
1146
1147         /* find the list of indexed fields */   
1148         if (ltdb->cache->indexlist->num_elements == 0) {
1149                 /* no indexed fields */
1150                 return LDB_SUCCESS;
1151         }
1152
1153         if (ldb_dn_is_special(msg->dn)) {
1154                 return LDB_SUCCESS;
1155         }
1156
1157         dn = ldb_dn_get_linearized(msg->dn);
1158         if (dn == NULL) {
1159                 return LDB_ERR_OPERATIONS_ERROR;
1160         }
1161
1162         for (i = 0; i < msg->num_elements; i++) {
1163                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1164                                        NULL, LTDB_IDXATTR);
1165                 if (ret == -1) {
1166                         continue;
1167                 }
1168                 for (j = 0; j < msg->elements[i].num_values; j++) {
1169                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1170                         if (ret != LDB_SUCCESS) {
1171                                 return ret;
1172                         }
1173                 }
1174         }
1175
1176         return LDB_SUCCESS;
1177 }
1178
1179 /* 
1180   handle special index for one level searches
1181 */
1182 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1183 {
1184         struct ltdb_private *ltdb = module->private_data;
1185         struct ldb_message_element el;
1186         struct ldb_val val;
1187         struct ldb_dn *pdn;
1188         const char *dn;
1189         int ret;
1190
1191         /* We index for ONE Level only if requested */
1192         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1193         if (ret != 0) {
1194                 return LDB_SUCCESS;
1195         }
1196
1197         pdn = ldb_dn_get_parent(module, msg->dn);
1198         if (pdn == NULL) {
1199                 return LDB_ERR_OPERATIONS_ERROR;
1200         }
1201
1202         dn = ldb_dn_get_linearized(msg->dn);
1203         if (dn == NULL) {
1204                 talloc_free(pdn);
1205                 return LDB_ERR_OPERATIONS_ERROR;
1206         }
1207
1208         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(pdn));
1209         if (val.data == NULL) {
1210                 talloc_free(pdn);
1211                 return LDB_ERR_OPERATIONS_ERROR;
1212         }
1213
1214         val.length = strlen((char *)val.data);
1215         el.name = LTDB_IDXONE;
1216         el.values = &val;
1217         el.num_values = 1;
1218
1219         if (add) {
1220                 ret = ltdb_index_add1(module, dn, &el, 0);
1221         } else { /* delete */
1222                 ret = ltdb_index_del_value(module, dn, &el, 0);
1223         }
1224
1225         talloc_free(pdn);
1226
1227         return ret;
1228 }
1229
1230
1231 /*
1232   traversal function that deletes all @INDEX records
1233 */
1234 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1235 {
1236         const char *dn = "DN=" LTDB_INDEX ":";
1237         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1238                 return tdb_delete(tdb, key);
1239         }
1240         return 0;
1241 }
1242
1243 /*
1244   traversal function that adds @INDEX records during a re index
1245 */
1246 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1247 {
1248         struct ldb_module *module = state;
1249         struct ldb_message *msg;
1250         const char *dn = NULL;
1251         int ret;
1252         TDB_DATA key2;
1253
1254         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1255             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1256                 return 0;
1257         }
1258
1259         msg = talloc(module, struct ldb_message);
1260         if (msg == NULL) {
1261                 return -1;
1262         }
1263
1264         ret = ltdb_unpack_data(module, &data, msg);
1265         if (ret != 0) {
1266                 talloc_free(msg);
1267                 return -1;
1268         }
1269
1270         /* check if the DN key has changed, perhaps due to the 
1271            case insensitivity of an element changing */
1272         key2 = ltdb_key(module, msg->dn);
1273         if (key2.dptr == NULL) {
1274                 /* probably a corrupt record ... darn */
1275                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1276                                                         ldb_dn_get_linearized(msg->dn));
1277                 talloc_free(msg);
1278                 return 0;
1279         }
1280         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1281                 tdb_delete(tdb, key);
1282                 tdb_store(tdb, key2, data, 0);
1283         }
1284         talloc_free(key2.dptr);
1285
1286         if (msg->dn == NULL) {
1287                 dn = (char *)key.dptr + 3;
1288         } else {
1289                 dn = ldb_dn_get_linearized(msg->dn);
1290         }
1291
1292         ret = ltdb_index_one(module, msg, 1);
1293         if (ret == LDB_SUCCESS) {
1294                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1295         } else {
1296                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1297                         "Adding special ONE LEVEL index failed (%s)!\n",
1298                         ldb_dn_get_linearized(msg->dn));
1299         }
1300
1301         talloc_free(msg);
1302
1303         if (ret != LDB_SUCCESS) return -1;
1304
1305         return 0;
1306 }
1307
1308 /*
1309   force a complete reindex of the database
1310 */
1311 int ltdb_reindex(struct ldb_module *module)
1312 {
1313         struct ltdb_private *ltdb = module->private_data;
1314         int ret;
1315
1316         if (ltdb_cache_reload(module) != 0) {
1317                 return LDB_ERR_OPERATIONS_ERROR;
1318         }
1319
1320         /* first traverse the database deleting any @INDEX records */
1321         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1322         if (ret == -1) {
1323                 return LDB_ERR_OPERATIONS_ERROR;
1324         }
1325
1326         /* if we don't have indexes we have nothing todo */
1327         if (ltdb->cache->indexlist->num_elements == 0) {
1328                 return LDB_SUCCESS;
1329         }
1330
1331         /* now traverse adding any indexes for normal LDB records */
1332         ret = tdb_traverse(ltdb->tdb, re_index, module);
1333         if (ret == -1) {
1334                 return LDB_ERR_OPERATIONS_ERROR;
1335         }
1336
1337         return LDB_SUCCESS;
1338 }