r20684: if we don't have any indexes, then we should not waste time
[gd/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 /* the following cast looks strange, but is
65                  correct. The key to understanding it is that base_p
66                  is a pointer to an array of pointers, so we have to
67                  dereference it after casting to void **. The strange
68                  const in the middle gives us the right type of pointer
69                  after the dereference  (tridge) */
70                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71                 if (r == 0) {
72                         /* scan back for first element */
73                         while (test_i > 0 &&
74                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
75                                 test_i--;
76                         }
77                         return test_i;
78                 }
79                 if (r < 0) {
80                         if (test_i == 0) {
81                                 return -1;
82                         }
83                         max_i = test_i - 1;
84                 }
85                 if (r > 0) {
86                         min_i = test_i + 1;
87                 }
88         }
89
90         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
91                 return min_i;
92         }
93
94         return -1;
95 }
96
97 struct dn_list {
98         unsigned int count;
99         char **dn;
100 };
101
102 /*
103   return the dn key to be used for an index
104   caller frees
105 */
106 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
107                                      const char *attr, const struct ldb_val *value)
108 {
109         struct ldb_dn *ret;
110         struct ldb_val v;
111         const struct ldb_schema_attribute *a;
112         char *attr_folded;
113         int r;
114
115         attr_folded = ldb_attr_casefold(ldb, attr);
116         if (!attr_folded) {
117                 return NULL;
118         }
119
120         a = ldb_schema_attribute_by_name(ldb, attr);
121         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
122         if (r != LDB_SUCCESS) {
123                 const char *errstr = ldb_errstring(ldb);
124                 /* canonicalisation can be refused. For example, 
125                    a attribute that takes wildcards will refuse to canonicalise
126                    if the value contains a wildcard */
127                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
128                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
129                 talloc_free(attr_folded);
130                 return NULL;
131         }
132         if (ldb_should_b64_encode(&v)) {
133                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
134                 if (!vstr) return NULL;
135                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
136                 talloc_free(vstr);
137         } else {
138                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
139         }
140
141         if (v.data != value->data) {
142                 talloc_free(v.data);
143         }
144         talloc_free(attr_folded);
145
146         return ret;
147 }
148
149 /*
150   see if a attribute value is in the list of indexed attributes
151 */
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153                             unsigned int *v_idx, const char *key)
154 {
155         unsigned int i, j;
156         for (i=0;i<msg->num_elements;i++) {
157                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158                         const struct ldb_message_element *el = &msg->elements[i];
159
160                         if (attr == NULL) {
161                                 /* in this case we are just looking to see if key is present,
162                                    we are not spearching for a specific index */
163                                 return 0;
164                         }
165
166                         for (j=0;j<el->num_values;j++) {
167                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
168                                         if (v_idx) {
169                                                 *v_idx = j;
170                                         }
171                                         return i;
172                                 }
173                         }
174                 }
175         }
176         return -1;
177 }
178
179 /* used in sorting dn lists */
180 static int list_cmp(const char **s1, const char **s2)
181 {
182         return strcmp(*s1, *s2);
183 }
184
185 /*
186   return a list of dn's that might match a simple indexed search or
187  */
188 static int ltdb_index_dn_simple(struct ldb_module *module, 
189                                 const struct ldb_parse_tree *tree,
190                                 const struct ldb_message *index_list,
191                                 struct dn_list *list)
192 {
193         struct ldb_context *ldb = module->ldb;
194         struct ldb_dn *dn;
195         int ret;
196         unsigned int i, j;
197         struct ldb_message *msg;
198
199         list->count = 0;
200         list->dn = NULL;
201
202         /* if the attribute isn't in the list of indexed attributes then
203            this node needs a full search */
204         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
205                 return -1;
206         }
207
208         /* the attribute is indexed. Pull the list of DNs that match the 
209            search criterion */
210         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
211         if (!dn) return -1;
212
213         msg = talloc(list, struct ldb_message);
214         if (msg == NULL) {
215                 return -1;
216         }
217
218         ret = ltdb_search_dn1(module, dn, msg);
219         talloc_free(dn);
220         if (ret == 0 || ret == -1) {
221                 return ret;
222         }
223
224         for (i=0;i<msg->num_elements;i++) {
225                 struct ldb_message_element *el;
226
227                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
228                         continue;
229                 }
230
231                 el = &msg->elements[i];
232
233                 list->dn = talloc_array(list, char *, el->num_values);
234                 if (!list->dn) {
235                         talloc_free(msg);
236                         return -1;
237                 }
238
239                 for (j=0;j<el->num_values;j++) {
240                         list->dn[list->count] = 
241                                 talloc_strdup(list->dn, (char *)el->values[j].data);
242                         if (!list->dn[list->count]) {
243                                 talloc_free(msg);
244                                 return -1;
245                         }
246                         list->count++;
247                 }
248         }
249
250         talloc_free(msg);
251
252         if (list->count > 1) {
253                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
254         }
255
256         return 1;
257 }
258
259
260 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
261
262 /*
263   return a list of dn's that might match a simple indexed search on
264   the special objectclass attribute
265  */
266 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
267                                      const struct ldb_parse_tree *tree,
268                                      const struct ldb_message *index_list,
269                                      struct dn_list *list)
270 {
271         struct ldb_context *ldb = module->ldb;
272         unsigned int i;
273         int ret;
274         const char *target = (const char *)tree->u.equality.value.data;
275         const char **subclasses;
276
277         list->count = 0;
278         list->dn = NULL;
279
280         ret = ltdb_index_dn_simple(module, tree, index_list, list);
281
282         subclasses = ldb_subclass_list(module->ldb, target);
283
284         if (subclasses == NULL) {
285                 return ret;
286         }
287
288         for (i=0;subclasses[i];i++) {
289                 struct ldb_parse_tree tree2;
290                 struct dn_list *list2;
291                 tree2.operation = LDB_OP_EQUALITY;
292                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
293                 if (!tree2.u.equality.attr) {
294                         return -1;
295                 }
296                 tree2.u.equality.value.data = 
297                         (uint8_t *)talloc_strdup(list, subclasses[i]);
298                 if (tree2.u.equality.value.data == NULL) {
299                         return -1;                      
300                 }
301                 tree2.u.equality.value.length = strlen(subclasses[i]);
302                 list2 = talloc(list, struct dn_list);
303                 if (list2 == NULL) {
304                         talloc_free(tree2.u.equality.value.data);
305                         return -1;
306                 }
307                 if (ltdb_index_dn_objectclass(module, &tree2, 
308                                               index_list, list2) == 1) {
309                         if (list->count == 0) {
310                                 *list = *list2;
311                                 ret = 1;
312                         } else {
313                                 list_union(ldb, list, list2);
314                                 talloc_free(list2);
315                         }
316                 }
317                 talloc_free(tree2.u.equality.value.data);
318         }
319
320         return ret;
321 }
322
323 /*
324   return a list of dn's that might match a leaf indexed search
325  */
326 static int ltdb_index_dn_leaf(struct ldb_module *module, 
327                               const struct ldb_parse_tree *tree,
328                               const struct ldb_message *index_list,
329                               struct dn_list *list)
330 {
331         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
332                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
333         }
334         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
335                 list->dn = talloc_array(list, char *, 1);
336                 if (list->dn == NULL) {
337                         ldb_oom(module->ldb);
338                         return -1;
339                 }
340                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
341                 if (list->dn[0] == NULL) {
342                         ldb_oom(module->ldb);
343                         return -1;
344                 }
345                 list->count = 1;
346                 return 1;
347         }
348         return ltdb_index_dn_simple(module, tree, index_list, list);
349 }
350
351
352 /*
353   list intersection
354   list = list & list2
355   relies on the lists being sorted
356 */
357 static int list_intersect(struct ldb_context *ldb,
358                           struct dn_list *list, const struct dn_list *list2)
359 {
360         struct dn_list *list3;
361         unsigned int i;
362
363         if (list->count == 0 || list2->count == 0) {
364                 /* 0 & X == 0 */
365                 return 0;
366         }
367
368         list3 = talloc(ldb, struct dn_list);
369         if (list3 == NULL) {
370                 return -1;
371         }
372
373         list3->dn = talloc_array(list3, char *, list->count);
374         if (!list3->dn) {
375                 talloc_free(list3);
376                 return -1;
377         }
378         list3->count = 0;
379
380         for (i=0;i<list->count;i++) {
381                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
382                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
383                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
384                         list3->count++;
385                 } else {
386                         talloc_free(list->dn[i]);
387                 }               
388         }
389
390         talloc_free(list->dn);
391         list->dn = talloc_move(list, &list3->dn);
392         list->count = list3->count;
393         talloc_free(list3);
394
395         return 0;
396 }
397
398
399 /*
400   list union
401   list = list | list2
402   relies on the lists being sorted
403 */
404 static int list_union(struct ldb_context *ldb, 
405                       struct dn_list *list, const struct dn_list *list2)
406 {
407         unsigned int i;
408         char **d;
409         unsigned int count = list->count;
410
411         if (list->count == 0 && list2->count == 0) {
412                 /* 0 | 0 == 0 */
413                 return 0;
414         }
415
416         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
417         if (!d) {
418                 return -1;
419         }
420         list->dn = d;
421
422         for (i=0;i<list2->count;i++) {
423                 if (ldb_list_find(list2->dn[i], list->dn, count, 
424                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
425                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
426                         if (!list->dn[list->count]) {
427                                 return -1;
428                         }
429                         list->count++;
430                 }               
431         }
432
433         if (list->count != count) {
434                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
435         }
436
437         return 0;
438 }
439
440 static int ltdb_index_dn(struct ldb_module *module, 
441                          const struct ldb_parse_tree *tree,
442                          const struct ldb_message *index_list,
443                          struct dn_list *list);
444
445
446 /*
447   OR two index results
448  */
449 static int ltdb_index_dn_or(struct ldb_module *module, 
450                             const struct ldb_parse_tree *tree,
451                             const struct ldb_message *index_list,
452                             struct dn_list *list)
453 {
454         struct ldb_context *ldb = module->ldb;
455         unsigned int i;
456         int ret;
457         
458         ret = -1;
459         list->dn = NULL;
460         list->count = 0;
461
462         for (i=0;i<tree->u.list.num_elements;i++) {
463                 struct dn_list *list2;
464                 int v;
465
466                 list2 = talloc(module, struct dn_list);
467                 if (list2 == NULL) {
468                         return -1;
469                 }
470
471                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
472
473                 if (v == 0) {
474                         /* 0 || X == X */
475                         if (ret == -1) {
476                                 ret = 0;
477                         }
478                         talloc_free(list2);
479                         continue;
480                 }
481
482                 if (v == -1) {
483                         /* 1 || X == 1 */
484                         talloc_free(list->dn);
485                         talloc_free(list2);
486                         return -1;
487                 }
488
489                 if (ret == -1) {
490                         ret = 1;
491                         list->dn = talloc_move(list, &list2->dn);
492                         list->count = list2->count;
493                 } else {
494                         if (list_union(ldb, list, list2) == -1) {
495                                 talloc_free(list2);
496                                 return -1;
497                         }
498                         ret = 1;
499                 }
500                 talloc_free(list2);
501         }
502
503         if (list->count == 0) {
504                 return 0;
505         }
506
507         return ret;
508 }
509
510
511 /*
512   NOT an index results
513  */
514 static int ltdb_index_dn_not(struct ldb_module *module, 
515                              const struct ldb_parse_tree *tree,
516                              const struct ldb_message *index_list,
517                              struct dn_list *list)
518 {
519         /* the only way to do an indexed not would be if we could
520            negate the not via another not or if we knew the total
521            number of database elements so we could know that the
522            existing expression covered the whole database. 
523            
524            instead, we just give up, and rely on a full index scan
525            (unless an outer & manages to reduce the list)
526         */
527         return -1;
528 }
529
530 /*
531   AND two index results
532  */
533 static int ltdb_index_dn_and(struct ldb_module *module, 
534                              const struct ldb_parse_tree *tree,
535                              const struct ldb_message *index_list,
536                              struct dn_list *list)
537 {
538         struct ldb_context *ldb = module->ldb;
539         unsigned int i;
540         int ret;
541         
542         ret = -1;
543         list->dn = NULL;
544         list->count = 0;
545
546         for (i=0;i<tree->u.list.num_elements;i++) {
547                 struct dn_list *list2;
548                 int v;
549
550                 list2 = talloc(module, struct dn_list);
551                 if (list2 == NULL) {
552                         return -1;
553                 }
554
555                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
556
557                 if (v == 0) {
558                         /* 0 && X == 0 */
559                         talloc_free(list->dn);
560                         talloc_free(list2);
561                         return 0;
562                 }
563
564                 if (v == -1) {
565                         talloc_free(list2);
566                         continue;
567                 }
568
569                 if (ret == -1) {
570                         ret = 1;
571                         talloc_free(list->dn);
572                         list->dn = talloc_move(list, &list2->dn);
573                         list->count = list2->count;
574                 } else {
575                         if (list_intersect(ldb, list, list2) == -1) {
576                                 talloc_free(list2);
577                                 return -1;
578                         }
579                 }
580
581                 talloc_free(list2);
582
583                 if (list->count == 0) {
584                         talloc_free(list->dn);
585                         return 0;
586                 }
587         }
588
589         return ret;
590 }
591
592 /*
593   AND index results and ONE level special index
594  */
595 static int ltdb_index_dn_one(struct ldb_module *module, 
596                              struct ldb_dn *parent_dn,
597                              struct dn_list *list)
598 {
599         struct ldb_context *ldb = module->ldb;
600         struct dn_list *list2;
601         struct ldb_message *msg;
602         struct ldb_dn *key;
603         struct ldb_val val;
604         unsigned int i, j;
605         int ret;
606         
607         list2 = talloc_zero(module, struct dn_list);
608         if (list2 == NULL) {
609                 return -1;
610         }
611
612         /* the attribute is indexed. Pull the list of DNs that match the 
613            search criterion */
614         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(parent_dn));
615         val.length = strlen((char *)val.data);
616         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
617         if (!key) {
618                 talloc_free(list2);
619                 return -1;
620         }
621
622         msg = talloc(list2, struct ldb_message);
623         if (msg == NULL) {
624                 talloc_free(list2);
625                 return -1;
626         }
627
628         ret = ltdb_search_dn1(module, key, msg);
629         talloc_free(key);
630         if (ret == 0 || ret == -1) {
631                 return ret;
632         }
633
634         for (i = 0; i < msg->num_elements; i++) {
635                 struct ldb_message_element *el;
636
637                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
638                         continue;
639                 }
640
641                 el = &msg->elements[i];
642
643                 list2->dn = talloc_array(list2, char *, el->num_values);
644                 if (!list2->dn) {
645                         talloc_free(list2);
646                         return -1;
647                 }
648
649                 for (j = 0; j < el->num_values; j++) {
650                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
651                         if (!list2->dn[list2->count]) {
652                                 talloc_free(list2);
653                                 return -1;
654                         }
655                         list2->count++;
656                 }
657         }
658
659         if (list2->count == 0) {
660                 talloc_free(list2);
661                 return 0;
662         }
663
664         if (list2->count > 1) {
665                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
666         }
667
668         if (list->count > 0) {
669                 if (list_intersect(ldb, list, list2) == -1) {
670                         talloc_free(list2);
671                         return -1;
672                 }
673
674                 if (list->count == 0) {
675                         talloc_free(list->dn);
676                         talloc_free(list2);
677                         return 0;
678                 }
679         } else {
680                 list->dn = talloc_move(list, &list2->dn);
681                 list->count = list2->count;
682         }
683
684         talloc_free(list2);
685
686         return 1;
687 }
688
689 /*
690   return a list of dn's that might match a indexed search or
691   -1 if an error. return 0 for no matches, or 1 for matches
692  */
693 static int ltdb_index_dn(struct ldb_module *module, 
694                          const struct ldb_parse_tree *tree,
695                          const struct ldb_message *index_list,
696                          struct dn_list *list)
697 {
698         int ret = -1;
699
700         switch (tree->operation) {
701         case LDB_OP_AND:
702                 ret = ltdb_index_dn_and(module, tree, index_list, list);
703                 break;
704
705         case LDB_OP_OR:
706                 ret = ltdb_index_dn_or(module, tree, index_list, list);
707                 break;
708
709         case LDB_OP_NOT:
710                 ret = ltdb_index_dn_not(module, tree, index_list, list);
711                 break;
712
713         case LDB_OP_EQUALITY:
714                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
715                 break;
716
717         case LDB_OP_SUBSTRING:
718         case LDB_OP_GREATER:
719         case LDB_OP_LESS:
720         case LDB_OP_PRESENT:
721         case LDB_OP_APPROX:
722         case LDB_OP_EXTENDED:
723                 /* we can't index with fancy bitops yet */
724                 ret = -1;
725                 break;
726         }
727
728         return ret;
729 }
730
731 /*
732   filter a candidate dn_list from an indexed search into a set of results
733   extracting just the given attributes
734 */
735 static int ltdb_index_filter(const struct dn_list *dn_list, 
736                              struct ldb_handle *handle)
737 {
738         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
739         struct ldb_reply *ares = NULL;
740         unsigned int i;
741
742         for (i = 0; i < dn_list->count; i++) {
743                 struct ldb_dn *dn;
744                 int ret;
745
746                 ares = talloc_zero(ac, struct ldb_reply);
747                 if (!ares) {
748                         handle->status = LDB_ERR_OPERATIONS_ERROR;
749                         handle->state = LDB_ASYNC_DONE;
750                         return LDB_ERR_OPERATIONS_ERROR;
751                 }
752
753                 ares->message = ldb_msg_new(ares);
754                 if (!ares->message) {
755                         handle->status = LDB_ERR_OPERATIONS_ERROR;
756                         handle->state = LDB_ASYNC_DONE;
757                         talloc_free(ares);
758                         return LDB_ERR_OPERATIONS_ERROR;
759                 }
760
761
762                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
763                 if (dn == NULL) {
764                         talloc_free(ares);
765                         return LDB_ERR_OPERATIONS_ERROR;
766                 }
767
768                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
769                 talloc_free(dn);
770                 if (ret == 0) {
771                         /* the record has disappeared? yes, this can happen */
772                         talloc_free(ares);
773                         continue;
774                 }
775
776                 if (ret == -1) {
777                         /* an internal error */
778                         talloc_free(ares);
779                         return LDB_ERR_OPERATIONS_ERROR;
780                 }
781
782                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
783                         talloc_free(ares);
784                         continue;
785                 }
786
787                 /* filter the attributes that the user wants */
788                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
789
790                 if (ret == -1) {
791                         handle->status = LDB_ERR_OPERATIONS_ERROR;
792                         handle->state = LDB_ASYNC_DONE;
793                         talloc_free(ares);
794                         return LDB_ERR_OPERATIONS_ERROR;
795                 }
796
797                 ares->type = LDB_REPLY_ENTRY;
798                 handle->state = LDB_ASYNC_PENDING;
799                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
800
801                 if (handle->status != LDB_SUCCESS) {
802                         handle->state = LDB_ASYNC_DONE;
803                         return handle->status;
804                 }
805         }
806
807         return LDB_SUCCESS;
808 }
809
810 /*
811   search the database with a LDAP-like expression using indexes
812   returns -1 if an indexed search is not possible, in which
813   case the caller should call ltdb_search_full() 
814 */
815 int ltdb_search_indexed(struct ldb_handle *handle)
816 {
817         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
818         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
819         struct dn_list *dn_list;
820         int ret, idxattr, idxone;
821
822         idxattr = idxone = 0;
823         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
824         if (ret == 0 ) {
825                 idxattr = 1;
826         }
827
828         /* We do one level indexing only if requested */
829         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
830         if (ret == 0 ) {
831                 idxone = 1;
832         }
833
834         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
835             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
836                 /* no indexs? must do full search */
837                 return -1;
838         }
839
840         ret = -1;
841
842         dn_list = talloc_zero(handle, struct dn_list);
843         if (dn_list == NULL) {
844                 return -1;
845         }
846
847         if (ac->scope == LDB_SCOPE_BASE) {
848                 /* with BASE searches only one DN can match */
849                 dn_list->dn = talloc_array(dn_list, char *, 1);
850                 if (dn_list->dn == NULL) {
851                         ldb_oom(ac->module->ldb);
852                         return -1;
853                 }
854                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
855                 if (dn_list->dn[0] == NULL) {
856                         ldb_oom(ac->module->ldb);
857                         return -1;
858                 }
859                 dn_list->count = 1;
860                 ret = 1;
861         }
862
863         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
864                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
865
866                 if (ret < 0) {
867                         talloc_free(dn_list);
868                         return ret;
869                 }
870         }
871
872         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
873                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
874         }
875
876         if (ret == 1) {
877                 /* we've got a candidate list - now filter by the full tree
878                    and extract the needed attributes */
879                 ret = ltdb_index_filter(dn_list, handle);
880                 handle->status = ret;
881                 handle->state = LDB_ASYNC_DONE;
882         }
883
884         talloc_free(dn_list);
885
886         return ret;
887 }
888
889 /*
890   add a index element where this is the first indexed DN for this value
891 */
892 static int ltdb_index_add1_new(struct ldb_context *ldb, 
893                                struct ldb_message *msg,
894                                const char *dn)
895 {
896         struct ldb_message_element *el;
897
898         /* add another entry */
899         el = talloc_realloc(msg, msg->elements, 
900                                struct ldb_message_element, msg->num_elements+1);
901         if (!el) {
902                 return -1;
903         }
904
905         msg->elements = el;
906         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
907         if (!msg->elements[msg->num_elements].name) {
908                 return -1;
909         }
910         msg->elements[msg->num_elements].num_values = 0;
911         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
912         if (!msg->elements[msg->num_elements].values) {
913                 return -1;
914         }
915         msg->elements[msg->num_elements].values[0].length = strlen(dn);
916         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
917         msg->elements[msg->num_elements].num_values = 1;
918         msg->num_elements++;
919
920         return 0;
921 }
922
923
924 /*
925   add a index element where this is not the first indexed DN for this
926   value
927 */
928 static int ltdb_index_add1_add(struct ldb_context *ldb, 
929                                struct ldb_message *msg,
930                                int idx,
931                                const char *dn)
932 {
933         struct ldb_val *v2;
934         unsigned int i;
935
936         /* for multi-valued attributes we can end up with repeats */
937         for (i=0;i<msg->elements[idx].num_values;i++) {
938                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
939                         return 0;
940                 }
941         }
942
943         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
944                               struct ldb_val, 
945                               msg->elements[idx].num_values+1);
946         if (!v2) {
947                 return -1;
948         }
949         msg->elements[idx].values = v2;
950
951         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
952         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
953         msg->elements[idx].num_values++;
954
955         return 0;
956 }
957
958 /*
959   add an index entry for one message element
960 */
961 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
962                            struct ldb_message_element *el, int v_idx)
963 {
964         struct ldb_context *ldb = module->ldb;
965         struct ldb_message *msg;
966         struct ldb_dn *dn_key;
967         int ret;
968         unsigned int i;
969
970         msg = talloc(module, struct ldb_message);
971         if (msg == NULL) {
972                 errno = ENOMEM;
973                 return -1;
974         }
975
976         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
977         if (!dn_key) {
978                 talloc_free(msg);
979                 return -1;
980         }
981         talloc_steal(msg, dn_key);
982
983         ret = ltdb_search_dn1(module, dn_key, msg);
984         if (ret == -1) {
985                 talloc_free(msg);
986                 return -1;
987         }
988
989         if (ret == 0) {
990                 msg->dn = dn_key;
991                 msg->num_elements = 0;
992                 msg->elements = NULL;
993         }
994
995         for (i=0;i<msg->num_elements;i++) {
996                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
997                         break;
998                 }
999         }
1000
1001         if (i == msg->num_elements) {
1002                 ret = ltdb_index_add1_new(ldb, msg, dn);
1003         } else {
1004                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1005         }
1006
1007         if (ret == 0) {
1008                 ret = ltdb_store(module, msg, TDB_REPLACE);
1009         }
1010
1011         talloc_free(msg);
1012
1013         return ret;
1014 }
1015
1016 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1017                            struct ldb_message_element *elements, int num_el)
1018 {
1019         struct ltdb_private *ltdb = module->private_data;
1020         int ret;
1021         unsigned int i, j;
1022
1023         if (dn[0] == '@') {
1024                 return 0;
1025         }
1026
1027         if (ltdb->cache->indexlist->num_elements == 0) {
1028                 /* no indexed fields */
1029                 return 0;
1030         }
1031
1032         for (i = 0; i < num_el; i++) {
1033                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
1034                                        NULL, LTDB_IDXATTR);
1035                 if (ret == -1) {
1036                         continue;
1037                 }
1038                 for (j = 0; j < elements[i].num_values; j++) {
1039                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1040                         if (ret == -1) {
1041                                 return -1;
1042                         }
1043                 }
1044         }
1045
1046         return 0;
1047 }
1048
1049 /*
1050   add the index entries for a new record
1051   return -1 on failure
1052 */
1053 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1054 {
1055         const char *dn;
1056         int ret;
1057
1058         dn = ldb_dn_get_linearized(msg->dn);
1059         if (dn == NULL) {
1060                 return -1;
1061         }
1062
1063         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1064
1065         return ret;
1066 }
1067
1068
1069 /*
1070   delete an index entry for one message element
1071 */
1072 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
1073                          struct ldb_message_element *el, int v_idx)
1074 {
1075         struct ldb_context *ldb = module->ldb;
1076         struct ldb_message *msg;
1077         struct ldb_dn *dn_key;
1078         int ret, i;
1079         unsigned int j;
1080
1081         if (dn[0] == '@') {
1082                 return 0;
1083         }
1084
1085         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1086         if (!dn_key) {
1087                 return -1;
1088         }
1089
1090         msg = talloc(dn_key, struct ldb_message);
1091         if (msg == NULL) {
1092                 talloc_free(dn_key);
1093                 return -1;
1094         }
1095
1096         ret = ltdb_search_dn1(module, dn_key, msg);
1097         if (ret == -1) {
1098                 talloc_free(dn_key);
1099                 return -1;
1100         }
1101
1102         if (ret == 0) {
1103                 /* it wasn't indexed. Did we have an earlier error? If we did then
1104                    its gone now */
1105                 talloc_free(dn_key);
1106                 return 0;
1107         }
1108
1109         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1110         if (i == -1) {
1111                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1112                                 "ERROR: dn %s not found in %s\n", dn,
1113                                 ldb_dn_get_linearized(dn_key));
1114                 /* it ain't there. hmmm */
1115                 talloc_free(dn_key);
1116                 return 0;
1117         }
1118
1119         if (j != msg->elements[i].num_values - 1) {
1120                 memmove(&msg->elements[i].values[j], 
1121                         &msg->elements[i].values[j+1], 
1122                         (msg->elements[i].num_values-(j+1)) * 
1123                         sizeof(msg->elements[i].values[0]));
1124         }
1125         msg->elements[i].num_values--;
1126
1127         if (msg->elements[i].num_values == 0) {
1128                 ret = ltdb_delete_noindex(module, dn_key);
1129         } else {
1130                 ret = ltdb_store(module, msg, TDB_REPLACE);
1131         }
1132
1133         talloc_free(dn_key);
1134
1135         return ret;
1136 }
1137
1138 /*
1139   delete the index entries for a record
1140   return -1 on failure
1141 */
1142 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1143 {
1144         struct ltdb_private *ltdb = module->private_data;
1145         int ret;
1146         const char *dn;
1147         unsigned int i, j;
1148
1149         /* find the list of indexed fields */   
1150         if (ltdb->cache->indexlist->num_elements == 0) {
1151                 /* no indexed fields */
1152                 return 0;
1153         }
1154
1155         if (ldb_dn_is_special(msg->dn)) {
1156                 return 0;
1157         }
1158
1159         dn = ldb_dn_get_linearized(msg->dn);
1160         if (dn == NULL) {
1161                 return -1;
1162         }
1163
1164         for (i = 0; i < msg->num_elements; i++) {
1165                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1166                                        NULL, LTDB_IDXATTR);
1167                 if (ret == -1) {
1168                         continue;
1169                 }
1170                 for (j = 0; j < msg->elements[i].num_values; j++) {
1171                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1172                         if (ret == -1) {
1173                                 return -1;
1174                         }
1175                 }
1176         }
1177
1178         return 0;
1179 }
1180
1181 /* 
1182   handle special index for one level searches
1183 */
1184 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1185 {
1186         struct ltdb_private *ltdb = module->private_data;
1187         struct ldb_message_element el;
1188         struct ldb_val val;
1189         struct ldb_dn *pdn;
1190         const char *dn;
1191         int ret;
1192
1193         /* We index for ONE Level only if requested */
1194         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1195         if (ret != 0) {
1196                 return 0;
1197         }
1198
1199         pdn = ldb_dn_get_parent(module, msg->dn);
1200         if (pdn == NULL) {
1201                 return -1;
1202         }
1203
1204         dn = ldb_dn_get_linearized(msg->dn);
1205         if (dn == NULL) {
1206                 talloc_free(pdn);
1207                 return -1;
1208         }
1209
1210         val.data = (uint8_t *)((intptr_t)ldb_dn_get_casefold(pdn));
1211         if (val.data == NULL) {
1212                 talloc_free(pdn);
1213                 return -1;
1214         }
1215
1216         val.length = strlen((char *)val.data);
1217         el.name = LTDB_IDXONE;
1218         el.values = &val;
1219         el.num_values = 1;
1220
1221         if (add) {
1222                 ret = ltdb_index_add1(module, dn, &el, 0);
1223         } else { /* delete */
1224                 ret = ltdb_index_del_value(module, dn, &el, 0);
1225         }
1226
1227         talloc_free(pdn);
1228
1229         return ret;
1230 }
1231
1232
1233 /*
1234   traversal function that deletes all @INDEX records
1235 */
1236 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1237 {
1238         const char *dn = "DN=" LTDB_INDEX ":";
1239         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1240                 return tdb_delete(tdb, key);
1241         }
1242         return 0;
1243 }
1244
1245 /*
1246   traversal function that adds @INDEX records during a re index
1247 */
1248 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1249 {
1250         struct ldb_module *module = state;
1251         struct ldb_message *msg;
1252         const char *dn = NULL;
1253         int ret;
1254         TDB_DATA key2;
1255
1256         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1257             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1258                 return 0;
1259         }
1260
1261         msg = talloc(module, struct ldb_message);
1262         if (msg == NULL) {
1263                 return -1;
1264         }
1265
1266         ret = ltdb_unpack_data(module, &data, msg);
1267         if (ret != 0) {
1268                 talloc_free(msg);
1269                 return -1;
1270         }
1271
1272         /* check if the DN key has changed, perhaps due to the 
1273            case insensitivity of an element changing */
1274         key2 = ltdb_key(module, msg->dn);
1275         if (key2.dptr == NULL) {
1276                 /* probably a corrupt record ... darn */
1277                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1278                                                         ldb_dn_get_linearized(msg->dn));
1279                 talloc_free(msg);
1280                 return 0;
1281         }
1282         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1283                 tdb_delete(tdb, key);
1284                 tdb_store(tdb, key2, data, 0);
1285         }
1286         talloc_free(key2.dptr);
1287
1288         if (msg->dn == NULL) {
1289                 dn = (char *)key.dptr + 3;
1290         } else {
1291                 dn = ldb_dn_get_linearized(msg->dn);
1292         }
1293
1294         ret = ltdb_index_one(module, msg, 1);
1295         if (ret == 0) {
1296                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1297         } else {
1298                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1299                         "Adding special ONE LEVEL index failed (%s)!\n",
1300                         ldb_dn_get_linearized(msg->dn));
1301         }
1302
1303         talloc_free(msg);
1304
1305         return ret;
1306 }
1307
1308 /*
1309   force a complete reindex of the database
1310 */
1311 int ltdb_reindex(struct ldb_module *module)
1312 {
1313         struct ltdb_private *ltdb = module->private_data;
1314         int ret;
1315
1316         if (ltdb_cache_reload(module) != 0) {
1317                 return -1;
1318         }
1319
1320         /* first traverse the database deleting any @INDEX records */
1321         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1322         if (ret == -1) {
1323                 return -1;
1324         }
1325
1326         /* if we don't have indexes we have nothing todo */
1327         if (ltdb->cache->indexlist->num_elements == 0) {
1328                 return 0;
1329         }
1330
1331         /* now traverse adding any indexes for normal LDB records */
1332         ret = tdb_traverse(ltdb->tdb, re_index, module);
1333         if (ret == -1) {
1334                 return -1;
1335         }
1336
1337         return 0;
1338 }