fixed a speellling erra
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_includes.h"
35
36 #include "ldb_tdb.h"
37
38 /*
39   find an element in a list, using the given comparison function and
40   assuming that the list is already sorted using comp_fn
41
42   return -1 if not found, or the index of the first occurance of needle if found
43 */
44 static int ldb_list_find(const void *needle, 
45                          const void *base, size_t nmemb, size_t size, 
46                          comparison_fn_t comp_fn)
47 {
48         const char *base_p = (const char *)base;
49         size_t min_i, max_i, test_i;
50
51         if (nmemb == 0) {
52                 return -1;
53         }
54
55         min_i = 0;
56         max_i = nmemb-1;
57
58         while (min_i < max_i) {
59                 int r;
60
61                 test_i = (min_i + max_i) / 2;
62                 /* the following cast looks strange, but is
63                  correct. The key to understanding it is that base_p
64                  is a pointer to an array of pointers, so we have to
65                  dereference it after casting to void **. The strange
66                  const in the middle gives us the right type of pointer
67                  after the dereference  (tridge) */
68                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
69                 if (r == 0) {
70                         /* scan back for first element */
71                         while (test_i > 0 &&
72                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
73                                 test_i--;
74                         }
75                         return test_i;
76                 }
77                 if (r < 0) {
78                         if (test_i == 0) {
79                                 return -1;
80                         }
81                         max_i = test_i - 1;
82                 }
83                 if (r > 0) {
84                         min_i = test_i + 1;
85                 }
86         }
87
88         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
89                 return min_i;
90         }
91
92         return -1;
93 }
94
95 struct dn_list {
96         unsigned int count;
97         char **dn;
98 };
99
100 /*
101   return the dn key to be used for an index
102   caller frees
103 */
104 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
105                                      const char *attr, const struct ldb_val *value)
106 {
107         struct ldb_dn *ret;
108         struct ldb_val v;
109         const struct ldb_schema_attribute *a;
110         char *attr_folded;
111         int r;
112
113         attr_folded = ldb_attr_casefold(ldb, attr);
114         if (!attr_folded) {
115                 return NULL;
116         }
117
118         a = ldb_schema_attribute_by_name(ldb, attr);
119         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
120         if (r != LDB_SUCCESS) {
121                 const char *errstr = ldb_errstring(ldb);
122                 /* canonicalisation can be refused. For example, 
123                    a attribute that takes wildcards will refuse to canonicalise
124                    if the value contains a wildcard */
125                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
126                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
127                 talloc_free(attr_folded);
128                 return NULL;
129         }
130         if (ldb_should_b64_encode(&v)) {
131                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
132                 if (!vstr) return NULL;
133                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
134                 talloc_free(vstr);
135         } else {
136                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137         }
138
139         if (v.data != value->data) {
140                 talloc_free(v.data);
141         }
142         talloc_free(attr_folded);
143
144         return ret;
145 }
146
147 /*
148   see if a attribute value is in the list of indexed attributes
149 */
150 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
151                             unsigned int *v_idx, const char *key)
152 {
153         unsigned int i, j;
154         for (i=0;i<msg->num_elements;i++) {
155                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
156                         const struct ldb_message_element *el = &msg->elements[i];
157
158                         if (attr == NULL) {
159                                 /* in this case we are just looking to see if key is present,
160                                    we are not spearching for a specific index */
161                                 return 0;
162                         }
163
164                         for (j=0;j<el->num_values;j++) {
165                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
166                                         if (v_idx) {
167                                                 *v_idx = j;
168                                         }
169                                         return i;
170                                 }
171                         }
172                 }
173         }
174         return -1;
175 }
176
177 /* used in sorting dn lists */
178 static int list_cmp(const char **s1, const char **s2)
179 {
180         return strcmp(*s1, *s2);
181 }
182
183 /*
184   return a list of dn's that might match a simple indexed search or
185  */
186 static int ltdb_index_dn_simple(struct ldb_module *module, 
187                                 const struct ldb_parse_tree *tree,
188                                 const struct ldb_message *index_list,
189                                 struct dn_list *list)
190 {
191         struct ldb_context *ldb = module->ldb;
192         struct ldb_dn *dn;
193         int ret;
194         unsigned int i, j;
195         struct ldb_message *msg;
196
197         list->count = 0;
198         list->dn = NULL;
199
200         /* if the attribute isn't in the list of indexed attributes then
201            this node needs a full search */
202         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
203                 return LDB_ERR_OPERATIONS_ERROR;
204         }
205
206         /* the attribute is indexed. Pull the list of DNs that match the 
207            search criterion */
208         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
209         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
210
211         msg = talloc(list, struct ldb_message);
212         if (msg == NULL) {
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ltdb_search_dn1(module, dn, msg);
217         talloc_free(dn);
218         if (ret != LDB_SUCCESS) {
219                 return ret;
220         }
221
222         for (i=0;i<msg->num_elements;i++) {
223                 struct ldb_message_element *el;
224
225                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
226                         continue;
227                 }
228
229                 el = &msg->elements[i];
230
231                 list->dn = talloc_array(list, char *, el->num_values);
232                 if (!list->dn) {
233                         talloc_free(msg);
234                         return LDB_ERR_OPERATIONS_ERROR;
235                 }
236
237                 for (j=0;j<el->num_values;j++) {
238                         list->dn[list->count] = 
239                                 talloc_strdup(list->dn, (char *)el->values[j].data);
240                         if (!list->dn[list->count]) {
241                                 talloc_free(msg);
242                                 return LDB_ERR_OPERATIONS_ERROR;
243                         }
244                         list->count++;
245                 }
246         }
247
248         talloc_free(msg);
249
250         if (list->count > 1) {
251                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
252         }
253
254         return LDB_SUCCESS;
255 }
256
257
258 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
259
260 /*
261   return a list of dn's that might match a leaf indexed search
262  */
263 static int ltdb_index_dn_leaf(struct ldb_module *module, 
264                               const struct ldb_parse_tree *tree,
265                               const struct ldb_message *index_list,
266                               struct dn_list *list)
267 {
268         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
269                 list->dn = talloc_array(list, char *, 1);
270                 if (list->dn == NULL) {
271                         ldb_oom(module->ldb);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
275                 if (list->dn[0] == NULL) {
276                         ldb_oom(module->ldb);
277                         return LDB_ERR_OPERATIONS_ERROR;
278                 }
279                 list->count = 1;
280                 return LDB_SUCCESS;
281         }
282         return ltdb_index_dn_simple(module, tree, index_list, list);
283 }
284
285
286 /*
287   list intersection
288   list = list & list2
289   relies on the lists being sorted
290 */
291 static int list_intersect(struct ldb_context *ldb,
292                           struct dn_list *list, const struct dn_list *list2)
293 {
294         struct dn_list *list3;
295         unsigned int i;
296
297         if (list->count == 0 || list2->count == 0) {
298                 /* 0 & X == 0 */
299                 return LDB_ERR_NO_SUCH_OBJECT;
300         }
301
302         list3 = talloc(ldb, struct dn_list);
303         if (list3 == NULL) {
304                 return LDB_ERR_OPERATIONS_ERROR;
305         }
306
307         list3->dn = talloc_array(list3, char *, list->count);
308         if (!list3->dn) {
309                 talloc_free(list3);
310                 return LDB_ERR_OPERATIONS_ERROR;
311         }
312         list3->count = 0;
313
314         for (i=0;i<list->count;i++) {
315                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
316                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
317                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
318                         list3->count++;
319                 } else {
320                         talloc_free(list->dn[i]);
321                 }               
322         }
323
324         talloc_free(list->dn);
325         list->dn = talloc_move(list, &list3->dn);
326         list->count = list3->count;
327         talloc_free(list3);
328
329         return LDB_ERR_NO_SUCH_OBJECT;
330 }
331
332
333 /*
334   list union
335   list = list | list2
336   relies on the lists being sorted
337 */
338 static int list_union(struct ldb_context *ldb, 
339                       struct dn_list *list, const struct dn_list *list2)
340 {
341         unsigned int i;
342         char **d;
343         unsigned int count = list->count;
344
345         if (list->count == 0 && list2->count == 0) {
346                 /* 0 | 0 == 0 */
347                 return LDB_ERR_NO_SUCH_OBJECT;
348         }
349
350         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
351         if (!d) {
352                 return LDB_ERR_OPERATIONS_ERROR;
353         }
354         list->dn = d;
355
356         for (i=0;i<list2->count;i++) {
357                 if (ldb_list_find(list2->dn[i], list->dn, count, 
358                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
359                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
360                         if (!list->dn[list->count]) {
361                                 return LDB_ERR_OPERATIONS_ERROR;
362                         }
363                         list->count++;
364                 }               
365         }
366
367         if (list->count != count) {
368                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
369         }
370
371         return LDB_ERR_NO_SUCH_OBJECT;
372 }
373
374 static int ltdb_index_dn(struct ldb_module *module, 
375                          const struct ldb_parse_tree *tree,
376                          const struct ldb_message *index_list,
377                          struct dn_list *list);
378
379
380 /*
381   OR two index results
382  */
383 static int ltdb_index_dn_or(struct ldb_module *module, 
384                             const struct ldb_parse_tree *tree,
385                             const struct ldb_message *index_list,
386                             struct dn_list *list)
387 {
388         struct ldb_context *ldb = module->ldb;
389         unsigned int i;
390         int ret;
391         
392         ret = LDB_ERR_OPERATIONS_ERROR;
393         list->dn = NULL;
394         list->count = 0;
395
396         for (i=0;i<tree->u.list.num_elements;i++) {
397                 struct dn_list *list2;
398                 int v;
399
400                 list2 = talloc(module, struct dn_list);
401                 if (list2 == NULL) {
402                         return LDB_ERR_OPERATIONS_ERROR;
403                 }
404
405                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
406
407                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
408                         /* 0 || X == X */
409                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
410                                 ret = v;
411                         }
412                         talloc_free(list2);
413                         continue;
414                 }
415
416                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
417                         /* 1 || X == 1 */
418                         talloc_free(list->dn);
419                         talloc_free(list2);
420                         return v;
421                 }
422
423                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
424                         ret = LDB_SUCCESS;
425                         list->dn = talloc_move(list, &list2->dn);
426                         list->count = list2->count;
427                 } else {
428                         if (list_union(ldb, list, list2) == -1) {
429                                 talloc_free(list2);
430                                 return LDB_ERR_OPERATIONS_ERROR;
431                         }
432                         ret = LDB_SUCCESS;
433                 }
434                 talloc_free(list2);
435         }
436
437         if (list->count == 0) {
438                 return LDB_ERR_NO_SUCH_OBJECT;
439         }
440
441         return ret;
442 }
443
444
445 /*
446   NOT an index results
447  */
448 static int ltdb_index_dn_not(struct ldb_module *module, 
449                              const struct ldb_parse_tree *tree,
450                              const struct ldb_message *index_list,
451                              struct dn_list *list)
452 {
453         /* the only way to do an indexed not would be if we could
454            negate the not via another not or if we knew the total
455            number of database elements so we could know that the
456            existing expression covered the whole database. 
457            
458            instead, we just give up, and rely on a full index scan
459            (unless an outer & manages to reduce the list)
460         */
461         return LDB_ERR_OPERATIONS_ERROR;
462 }
463
464 /*
465   AND two index results
466  */
467 static int ltdb_index_dn_and(struct ldb_module *module, 
468                              const struct ldb_parse_tree *tree,
469                              const struct ldb_message *index_list,
470                              struct dn_list *list)
471 {
472         struct ldb_context *ldb = module->ldb;
473         unsigned int i;
474         int ret;
475         
476         ret = LDB_ERR_OPERATIONS_ERROR;
477         list->dn = NULL;
478         list->count = 0;
479
480         for (i=0;i<tree->u.list.num_elements;i++) {
481                 struct dn_list *list2;
482                 int v;
483
484                 list2 = talloc(module, struct dn_list);
485                 if (list2 == NULL) {
486                         return LDB_ERR_OPERATIONS_ERROR;
487                 }
488
489                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
490
491                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
492                         /* 0 && X == 0 */
493                         talloc_free(list->dn);
494                         talloc_free(list2);
495                         return LDB_ERR_NO_SUCH_OBJECT;
496                 }
497
498                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
499                         talloc_free(list2);
500                         continue;
501                 }
502
503                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
504                         ret = LDB_SUCCESS;
505                         talloc_free(list->dn);
506                         list->dn = talloc_move(list, &list2->dn);
507                         list->count = list2->count;
508                 } else {
509                         if (list_intersect(ldb, list, list2) == -1) {
510                                 talloc_free(list2);
511                                 return LDB_ERR_OPERATIONS_ERROR;
512                         }
513                 }
514
515                 talloc_free(list2);
516
517                 if (list->count == 0) {
518                         talloc_free(list->dn);
519                         return LDB_ERR_NO_SUCH_OBJECT;
520                 }
521         }
522
523         return ret;
524 }
525
526 /*
527   AND index results and ONE level special index
528  */
529 static int ltdb_index_dn_one(struct ldb_module *module, 
530                              struct ldb_dn *parent_dn,
531                              struct dn_list *list)
532 {
533         struct ldb_context *ldb = module->ldb;
534         struct dn_list *list2;
535         struct ldb_message *msg;
536         struct ldb_dn *key;
537         struct ldb_val val;
538         unsigned int i, j;
539         int ret;
540         
541         list2 = talloc_zero(module, struct dn_list);
542         if (list2 == NULL) {
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         /* the attribute is indexed. Pull the list of DNs that match the 
547            search criterion */
548         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
549         val.length = strlen((char *)val.data);
550         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
551         if (!key) {
552                 talloc_free(list2);
553                 return LDB_ERR_OPERATIONS_ERROR;
554         }
555
556         msg = talloc(list2, struct ldb_message);
557         if (msg == NULL) {
558                 talloc_free(list2);
559                 return LDB_ERR_OPERATIONS_ERROR;
560         }
561
562         ret = ltdb_search_dn1(module, key, msg);
563         talloc_free(key);
564         if (ret != LDB_SUCCESS) {
565                 return ret;
566         }
567
568         for (i = 0; i < msg->num_elements; i++) {
569                 struct ldb_message_element *el;
570
571                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
572                         continue;
573                 }
574
575                 el = &msg->elements[i];
576
577                 list2->dn = talloc_array(list2, char *, el->num_values);
578                 if (!list2->dn) {
579                         talloc_free(list2);
580                         return LDB_ERR_OPERATIONS_ERROR;
581                 }
582
583                 for (j = 0; j < el->num_values; j++) {
584                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
585                         if (!list2->dn[list2->count]) {
586                                 talloc_free(list2);
587                                 return LDB_ERR_OPERATIONS_ERROR;
588                         }
589                         list2->count++;
590                 }
591         }
592
593         if (list2->count == 0) {
594                 talloc_free(list2);
595                 return LDB_ERR_NO_SUCH_OBJECT;
596         }
597
598         if (list2->count > 1) {
599                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
600         }
601
602         if (list->count > 0) {
603                 if (list_intersect(ldb, list, list2) == -1) {
604                         talloc_free(list2);
605                         return LDB_ERR_OPERATIONS_ERROR;
606                 }
607
608                 if (list->count == 0) {
609                         talloc_free(list->dn);
610                         talloc_free(list2);
611                         return LDB_ERR_NO_SUCH_OBJECT;
612                 }
613         } else {
614                 list->dn = talloc_move(list, &list2->dn);
615                 list->count = list2->count;
616         }
617
618         talloc_free(list2);
619
620         return LDB_SUCCESS;
621 }
622
623 /*
624   return a list of dn's that might match a indexed search or
625   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
626  */
627 static int ltdb_index_dn(struct ldb_module *module, 
628                          const struct ldb_parse_tree *tree,
629                          const struct ldb_message *index_list,
630                          struct dn_list *list)
631 {
632         int ret = LDB_ERR_OPERATIONS_ERROR;
633
634         switch (tree->operation) {
635         case LDB_OP_AND:
636                 ret = ltdb_index_dn_and(module, tree, index_list, list);
637                 break;
638
639         case LDB_OP_OR:
640                 ret = ltdb_index_dn_or(module, tree, index_list, list);
641                 break;
642
643         case LDB_OP_NOT:
644                 ret = ltdb_index_dn_not(module, tree, index_list, list);
645                 break;
646
647         case LDB_OP_EQUALITY:
648                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
649                 break;
650
651         case LDB_OP_SUBSTRING:
652         case LDB_OP_GREATER:
653         case LDB_OP_LESS:
654         case LDB_OP_PRESENT:
655         case LDB_OP_APPROX:
656         case LDB_OP_EXTENDED:
657                 /* we can't index with fancy bitops yet */
658                 ret = LDB_ERR_OPERATIONS_ERROR;
659                 break;
660         }
661
662         return ret;
663 }
664
665 /*
666   filter a candidate dn_list from an indexed search into a set of results
667   extracting just the given attributes
668 */
669 static int ltdb_index_filter(const struct dn_list *dn_list, 
670                              struct ldb_handle *handle)
671 {
672         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
673         struct ldb_reply *ares = NULL;
674         unsigned int i;
675
676         for (i = 0; i < dn_list->count; i++) {
677                 struct ldb_dn *dn;
678                 int ret;
679
680                 ares = talloc_zero(ac, struct ldb_reply);
681                 if (!ares) {
682                         handle->status = LDB_ERR_OPERATIONS_ERROR;
683                         handle->state = LDB_ASYNC_DONE;
684                         return LDB_ERR_OPERATIONS_ERROR;
685                 }
686
687                 ares->message = ldb_msg_new(ares);
688                 if (!ares->message) {
689                         handle->status = LDB_ERR_OPERATIONS_ERROR;
690                         handle->state = LDB_ASYNC_DONE;
691                         talloc_free(ares);
692                         return LDB_ERR_OPERATIONS_ERROR;
693                 }
694
695
696                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
697                 if (dn == NULL) {
698                         talloc_free(ares);
699                         return LDB_ERR_OPERATIONS_ERROR;
700                 }
701
702                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
703                 talloc_free(dn);
704                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
705                         /* the record has disappeared? yes, this can happen */
706                         talloc_free(ares);
707                         continue;
708                 }
709
710                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
711                         /* an internal error */
712                         talloc_free(ares);
713                         return LDB_ERR_OPERATIONS_ERROR;
714                 }
715
716                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
717                         talloc_free(ares);
718                         continue;
719                 }
720
721                 /* filter the attributes that the user wants */
722                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
723
724                 if (ret == -1) {
725                         handle->status = LDB_ERR_OPERATIONS_ERROR;
726                         handle->state = LDB_ASYNC_DONE;
727                         talloc_free(ares);
728                         return LDB_ERR_OPERATIONS_ERROR;
729                 }
730
731                 ares->type = LDB_REPLY_ENTRY;
732                 handle->state = LDB_ASYNC_PENDING;
733                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
734
735                 if (handle->status != LDB_SUCCESS) {
736                         handle->state = LDB_ASYNC_DONE;
737                         return handle->status;
738                 }
739         }
740
741         return LDB_SUCCESS;
742 }
743
744 /*
745   search the database with a LDAP-like expression using indexes
746   returns -1 if an indexed search is not possible, in which
747   case the caller should call ltdb_search_full() 
748 */
749 int ltdb_search_indexed(struct ldb_handle *handle)
750 {
751         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
752         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
753         struct dn_list *dn_list;
754         int ret, idxattr, idxone;
755
756         idxattr = idxone = 0;
757         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
758         if (ret == 0 ) {
759                 idxattr = 1;
760         }
761
762         /* We do one level indexing only if requested */
763         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
764         if (ret == 0 ) {
765                 idxone = 1;
766         }
767
768         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
769             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
770                 /* no indexes? must do full search */
771                 return LDB_ERR_OPERATIONS_ERROR;
772         }
773
774         ret = LDB_ERR_OPERATIONS_ERROR;
775
776         dn_list = talloc_zero(handle, struct dn_list);
777         if (dn_list == NULL) {
778                 return LDB_ERR_OPERATIONS_ERROR;
779         }
780
781         if (ac->scope == LDB_SCOPE_BASE) {
782                 /* with BASE searches only one DN can match */
783                 dn_list->dn = talloc_array(dn_list, char *, 1);
784                 if (dn_list->dn == NULL) {
785                         ldb_oom(ac->module->ldb);
786                         return LDB_ERR_OPERATIONS_ERROR;
787                 }
788                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
789                 if (dn_list->dn[0] == NULL) {
790                         ldb_oom(ac->module->ldb);
791                         return LDB_ERR_OPERATIONS_ERROR;
792                 }
793                 dn_list->count = 1;
794                 ret = LDB_SUCCESS;
795         }
796
797         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
798                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
799
800                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
801                         talloc_free(dn_list);
802                         return ret;
803                 }
804         }
805
806         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
807                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
808         }
809
810         if (ret == LDB_SUCCESS) {
811                 /* we've got a candidate list - now filter by the full tree
812                    and extract the needed attributes */
813                 ret = ltdb_index_filter(dn_list, handle);
814                 handle->status = ret;
815                 handle->state = LDB_ASYNC_DONE;
816         }
817
818         talloc_free(dn_list);
819
820         return ret;
821 }
822
823 /*
824   add a index element where this is the first indexed DN for this value
825 */
826 static int ltdb_index_add1_new(struct ldb_context *ldb, 
827                                struct ldb_message *msg,
828                                const char *dn)
829 {
830         struct ldb_message_element *el;
831
832         /* add another entry */
833         el = talloc_realloc(msg, msg->elements, 
834                                struct ldb_message_element, msg->num_elements+1);
835         if (!el) {
836                 return LDB_ERR_OPERATIONS_ERROR;
837         }
838
839         msg->elements = el;
840         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
841         if (!msg->elements[msg->num_elements].name) {
842                 return LDB_ERR_OPERATIONS_ERROR;
843         }
844         msg->elements[msg->num_elements].num_values = 0;
845         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
846         if (!msg->elements[msg->num_elements].values) {
847                 return LDB_ERR_OPERATIONS_ERROR;
848         }
849         msg->elements[msg->num_elements].values[0].length = strlen(dn);
850         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
851         msg->elements[msg->num_elements].num_values = 1;
852         msg->num_elements++;
853
854         return LDB_SUCCESS;
855 }
856
857
858 /*
859   add a index element where this is not the first indexed DN for this
860   value
861 */
862 static int ltdb_index_add1_add(struct ldb_context *ldb, 
863                                struct ldb_message *msg,
864                                int idx,
865                                const char *dn)
866 {
867         struct ldb_val *v2;
868         unsigned int i;
869
870         /* for multi-valued attributes we can end up with repeats */
871         for (i=0;i<msg->elements[idx].num_values;i++) {
872                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
873                         return LDB_SUCCESS;
874                 }
875         }
876
877         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
878                               struct ldb_val, 
879                               msg->elements[idx].num_values+1);
880         if (!v2) {
881                 return LDB_ERR_OPERATIONS_ERROR;
882         }
883         msg->elements[idx].values = v2;
884
885         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
886         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
887         msg->elements[idx].num_values++;
888
889         return LDB_SUCCESS;
890 }
891
892 /*
893   add an index entry for one message element
894 */
895 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
896                            struct ldb_message_element *el, int v_idx)
897 {
898         struct ldb_context *ldb = module->ldb;
899         struct ldb_message *msg;
900         struct ldb_dn *dn_key;
901         int ret;
902         unsigned int i;
903
904         msg = talloc(module, struct ldb_message);
905         if (msg == NULL) {
906                 errno = ENOMEM;
907                 return LDB_ERR_OPERATIONS_ERROR;
908         }
909
910         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
911         if (!dn_key) {
912                 talloc_free(msg);
913                 return LDB_ERR_OPERATIONS_ERROR;
914         }
915         talloc_steal(msg, dn_key);
916
917         ret = ltdb_search_dn1(module, dn_key, msg);
918         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
919                 talloc_free(msg);
920                 return ret;
921         }
922
923         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
924                 msg->dn = dn_key;
925                 msg->num_elements = 0;
926                 msg->elements = NULL;
927         }
928
929         for (i=0;i<msg->num_elements;i++) {
930                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
931                         break;
932                 }
933         }
934
935         if (i == msg->num_elements) {
936                 ret = ltdb_index_add1_new(ldb, msg, dn);
937         } else {
938                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
939         }
940
941         if (ret == LDB_SUCCESS) {
942                 ret = ltdb_store(module, msg, TDB_REPLACE);
943         }
944
945         talloc_free(msg);
946
947         return ret;
948 }
949
950 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
951                            struct ldb_message_element *elements, int num_el)
952 {
953         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
954         int ret;
955         unsigned int i, j;
956
957         if (dn[0] == '@') {
958                 return LDB_SUCCESS;
959         }
960
961         if (ltdb->cache->indexlist->num_elements == 0) {
962                 /* no indexed fields */
963                 return LDB_SUCCESS;
964         }
965
966         for (i = 0; i < num_el; i++) {
967                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
968                                        NULL, LTDB_IDXATTR);
969                 if (ret == -1) {
970                         continue;
971                 }
972                 for (j = 0; j < elements[i].num_values; j++) {
973                         ret = ltdb_index_add1(module, dn, &elements[i], j);
974                         if (ret != LDB_SUCCESS) {
975                                 return ret;
976                         }
977                 }
978         }
979
980         return LDB_SUCCESS;
981 }
982
983 /*
984   add the index entries for a new record
985 */
986 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
987 {
988         const char *dn;
989         int ret;
990
991         dn = ldb_dn_get_linearized(msg->dn);
992         if (dn == NULL) {
993                 return LDB_ERR_OPERATIONS_ERROR;
994         }
995
996         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
997
998         return ret;
999 }
1000
1001
1002 /*
1003   delete an index entry for one message element
1004 */
1005 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
1006                          struct ldb_message_element *el, int v_idx)
1007 {
1008         struct ldb_context *ldb = module->ldb;
1009         struct ldb_message *msg;
1010         struct ldb_dn *dn_key;
1011         int ret, i;
1012         unsigned int j;
1013
1014         if (dn[0] == '@') {
1015                 return LDB_SUCCESS;
1016         }
1017
1018         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1019         if (!dn_key) {
1020                 return LDB_ERR_OPERATIONS_ERROR;
1021         }
1022
1023         msg = talloc(dn_key, struct ldb_message);
1024         if (msg == NULL) {
1025                 talloc_free(dn_key);
1026                 return LDB_ERR_OPERATIONS_ERROR;
1027         }
1028
1029         ret = ltdb_search_dn1(module, dn_key, msg);
1030         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1031                 talloc_free(dn_key);
1032                 return ret;
1033         }
1034
1035         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1036                 /* it wasn't indexed. Did we have an earlier error? If we did then
1037                    its gone now */
1038                 talloc_free(dn_key);
1039                 return LDB_SUCCESS;
1040         }
1041
1042         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1043         if (i == -1) {
1044                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1045                                 "ERROR: dn %s not found in %s\n", dn,
1046                                 ldb_dn_get_linearized(dn_key));
1047                 /* it ain't there. hmmm */
1048                 talloc_free(dn_key);
1049                 return LDB_SUCCESS;
1050         }
1051
1052         if (j != msg->elements[i].num_values - 1) {
1053                 memmove(&msg->elements[i].values[j], 
1054                         &msg->elements[i].values[j+1], 
1055                         (msg->elements[i].num_values-(j+1)) * 
1056                         sizeof(msg->elements[i].values[0]));
1057         }
1058         msg->elements[i].num_values--;
1059
1060         if (msg->elements[i].num_values == 0) {
1061                 ret = ltdb_delete_noindex(module, dn_key);
1062         } else {
1063                 ret = ltdb_store(module, msg, TDB_REPLACE);
1064         }
1065
1066         talloc_free(dn_key);
1067
1068         return ret;
1069 }
1070
1071 /*
1072   delete the index entries for a record
1073   return -1 on failure
1074 */
1075 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1076 {
1077         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1078         int ret;
1079         const char *dn;
1080         unsigned int i, j;
1081
1082         /* find the list of indexed fields */   
1083         if (ltdb->cache->indexlist->num_elements == 0) {
1084                 /* no indexed fields */
1085                 return LDB_SUCCESS;
1086         }
1087
1088         if (ldb_dn_is_special(msg->dn)) {
1089                 return LDB_SUCCESS;
1090         }
1091
1092         dn = ldb_dn_get_linearized(msg->dn);
1093         if (dn == NULL) {
1094                 return LDB_ERR_OPERATIONS_ERROR;
1095         }
1096
1097         for (i = 0; i < msg->num_elements; i++) {
1098                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1099                                        NULL, LTDB_IDXATTR);
1100                 if (ret == -1) {
1101                         continue;
1102                 }
1103                 for (j = 0; j < msg->elements[i].num_values; j++) {
1104                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1105                         if (ret != LDB_SUCCESS) {
1106                                 return ret;
1107                         }
1108                 }
1109         }
1110
1111         return LDB_SUCCESS;
1112 }
1113
1114 /* 
1115   handle special index for one level searches
1116 */
1117 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1118 {
1119         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1120         struct ldb_message_element el;
1121         struct ldb_val val;
1122         struct ldb_dn *pdn;
1123         const char *dn;
1124         int ret;
1125
1126         /* We index for ONE Level only if requested */
1127         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1128         if (ret != 0) {
1129                 return LDB_SUCCESS;
1130         }
1131
1132         pdn = ldb_dn_get_parent(module, msg->dn);
1133         if (pdn == NULL) {
1134                 return LDB_ERR_OPERATIONS_ERROR;
1135         }
1136
1137         dn = ldb_dn_get_linearized(msg->dn);
1138         if (dn == NULL) {
1139                 talloc_free(pdn);
1140                 return LDB_ERR_OPERATIONS_ERROR;
1141         }
1142
1143         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1144         if (val.data == NULL) {
1145                 talloc_free(pdn);
1146                 return LDB_ERR_OPERATIONS_ERROR;
1147         }
1148
1149         val.length = strlen((char *)val.data);
1150         el.name = LTDB_IDXONE;
1151         el.values = &val;
1152         el.num_values = 1;
1153
1154         if (add) {
1155                 ret = ltdb_index_add1(module, dn, &el, 0);
1156         } else { /* delete */
1157                 ret = ltdb_index_del_value(module, dn, &el, 0);
1158         }
1159
1160         talloc_free(pdn);
1161
1162         return ret;
1163 }
1164
1165
1166 /*
1167   traversal function that deletes all @INDEX records
1168 */
1169 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1170 {
1171         const char *dn = "DN=" LTDB_INDEX ":";
1172         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1173                 return tdb_delete(tdb, key);
1174         }
1175         return 0;
1176 }
1177
1178 /*
1179   traversal function that adds @INDEX records during a re index
1180 */
1181 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1182 {
1183         struct ldb_module *module = (struct ldb_module *)state;
1184         struct ldb_message *msg;
1185         const char *dn = NULL;
1186         int ret;
1187         TDB_DATA key2;
1188
1189         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1190             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1191                 return 0;
1192         }
1193
1194         msg = talloc(module, struct ldb_message);
1195         if (msg == NULL) {
1196                 return -1;
1197         }
1198
1199         ret = ltdb_unpack_data(module, &data, msg);
1200         if (ret != 0) {
1201                 talloc_free(msg);
1202                 return -1;
1203         }
1204
1205         /* check if the DN key has changed, perhaps due to the 
1206            case insensitivity of an element changing */
1207         key2 = ltdb_key(module, msg->dn);
1208         if (key2.dptr == NULL) {
1209                 /* probably a corrupt record ... darn */
1210                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1211                                                         ldb_dn_get_linearized(msg->dn));
1212                 talloc_free(msg);
1213                 return 0;
1214         }
1215         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1216                 tdb_delete(tdb, key);
1217                 tdb_store(tdb, key2, data, 0);
1218         }
1219         talloc_free(key2.dptr);
1220
1221         if (msg->dn == NULL) {
1222                 dn = (char *)key.dptr + 3;
1223         } else {
1224                 dn = ldb_dn_get_linearized(msg->dn);
1225         }
1226
1227         ret = ltdb_index_one(module, msg, 1);
1228         if (ret == LDB_SUCCESS) {
1229                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1230         } else {
1231                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1232                         "Adding special ONE LEVEL index failed (%s)!\n",
1233                         ldb_dn_get_linearized(msg->dn));
1234         }
1235
1236         talloc_free(msg);
1237
1238         if (ret != LDB_SUCCESS) return -1;
1239
1240         return 0;
1241 }
1242
1243 /*
1244   force a complete reindex of the database
1245 */
1246 int ltdb_reindex(struct ldb_module *module)
1247 {
1248         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1249         int ret;
1250
1251         if (ltdb_cache_reload(module) != 0) {
1252                 return LDB_ERR_OPERATIONS_ERROR;
1253         }
1254
1255         /* first traverse the database deleting any @INDEX records */
1256         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1257         if (ret == -1) {
1258                 return LDB_ERR_OPERATIONS_ERROR;
1259         }
1260
1261         /* if we don't have indexes we have nothing todo */
1262         if (ltdb->cache->indexlist->num_elements == 0) {
1263                 return LDB_SUCCESS;
1264         }
1265
1266         /* now traverse adding any indexes for normal LDB records */
1267         ret = tdb_traverse(ltdb->tdb, re_index, module);
1268         if (ret == -1) {
1269                 return LDB_ERR_OPERATIONS_ERROR;
1270         }
1271
1272         return LDB_SUCCESS;
1273 }