Merge branch 'master' of ssh://git.samba.org/data/git/samba into abartlet-devel
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_includes.h"
35
36 #include "ldb_tdb.h"
37
38 /*
39   find an element in a list, using the given comparison function and
40   assuming that the list is already sorted using comp_fn
41
42   return -1 if not found, or the index of the first occurance of needle if found
43 */
44 static int ldb_list_find(const void *needle, 
45                          const void *base, size_t nmemb, size_t size, 
46                          comparison_fn_t comp_fn)
47 {
48         const char *base_p = (const char *)base;
49         size_t min_i, max_i, test_i;
50
51         if (nmemb == 0) {
52                 return -1;
53         }
54
55         min_i = 0;
56         max_i = nmemb-1;
57
58         while (min_i < max_i) {
59                 int r;
60
61                 test_i = (min_i + max_i) / 2;
62                 /* the following cast looks strange, but is
63                  correct. The key to understanding it is that base_p
64                  is a pointer to an array of pointers, so we have to
65                  dereference it after casting to void **. The strange
66                  const in the middle gives us the right type of pointer
67                  after the dereference  (tridge) */
68                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
69                 if (r == 0) {
70                         /* scan back for first element */
71                         while (test_i > 0 &&
72                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
73                                 test_i--;
74                         }
75                         return test_i;
76                 }
77                 if (r < 0) {
78                         if (test_i == 0) {
79                                 return -1;
80                         }
81                         max_i = test_i - 1;
82                 }
83                 if (r > 0) {
84                         min_i = test_i + 1;
85                 }
86         }
87
88         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
89                 return min_i;
90         }
91
92         return -1;
93 }
94
95 struct dn_list {
96         unsigned int count;
97         char **dn;
98 };
99
100 /*
101   return the dn key to be used for an index
102   caller frees
103 */
104 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
105                                      const char *attr, const struct ldb_val *value)
106 {
107         struct ldb_dn *ret;
108         struct ldb_val v;
109         const struct ldb_schema_attribute *a;
110         char *attr_folded;
111         int r;
112
113         attr_folded = ldb_attr_casefold(ldb, attr);
114         if (!attr_folded) {
115                 return NULL;
116         }
117
118         a = ldb_schema_attribute_by_name(ldb, attr);
119         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
120         if (r != LDB_SUCCESS) {
121                 const char *errstr = ldb_errstring(ldb);
122                 /* canonicalisation can be refused. For example, 
123                    a attribute that takes wildcards will refuse to canonicalise
124                    if the value contains a wildcard */
125                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
126                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
127                 talloc_free(attr_folded);
128                 return NULL;
129         }
130         if (ldb_should_b64_encode(&v)) {
131                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
132                 if (!vstr) return NULL;
133                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
134                 talloc_free(vstr);
135         } else {
136                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137         }
138
139         if (v.data != value->data) {
140                 talloc_free(v.data);
141         }
142         talloc_free(attr_folded);
143
144         return ret;
145 }
146
147 /*
148   see if a attribute value is in the list of indexed attributes
149 */
150 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
151                             unsigned int *v_idx, const char *key)
152 {
153         unsigned int i, j;
154         for (i=0;i<msg->num_elements;i++) {
155                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
156                         const struct ldb_message_element *el = &msg->elements[i];
157
158                         if (attr == NULL) {
159                                 /* in this case we are just looking to see if key is present,
160                                    we are not spearching for a specific index */
161                                 return 0;
162                         }
163
164                         for (j=0;j<el->num_values;j++) {
165                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
166                                         if (v_idx) {
167                                                 *v_idx = j;
168                                         }
169                                         return i;
170                                 }
171                         }
172                 }
173         }
174         return -1;
175 }
176
177 /* used in sorting dn lists */
178 static int list_cmp(const char **s1, const char **s2)
179 {
180         return strcmp(*s1, *s2);
181 }
182
183 /*
184   return a list of dn's that might match a simple indexed search or
185  */
186 static int ltdb_index_dn_simple(struct ldb_module *module, 
187                                 const struct ldb_parse_tree *tree,
188                                 const struct ldb_message *index_list,
189                                 struct dn_list *list)
190 {
191         struct ldb_context *ldb = module->ldb;
192         struct ldb_dn *dn;
193         int ret;
194         unsigned int i, j;
195         struct ldb_message *msg;
196
197         list->count = 0;
198         list->dn = NULL;
199
200         /* if the attribute isn't in the list of indexed attributes then
201            this node needs a full search */
202         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
203                 return LDB_ERR_OPERATIONS_ERROR;
204         }
205
206         /* the attribute is indexed. Pull the list of DNs that match the 
207            search criterion */
208         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
209         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
210
211         msg = talloc(list, struct ldb_message);
212         if (msg == NULL) {
213                 return LDB_ERR_OPERATIONS_ERROR;
214         }
215
216         ret = ltdb_search_dn1(module, dn, msg);
217         talloc_free(dn);
218         if (ret != LDB_SUCCESS) {
219                 return ret;
220         }
221
222         for (i=0;i<msg->num_elements;i++) {
223                 struct ldb_message_element *el;
224
225                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
226                         continue;
227                 }
228
229                 el = &msg->elements[i];
230
231                 list->dn = talloc_array(list, char *, el->num_values);
232                 if (!list->dn) {
233                         talloc_free(msg);
234                         return LDB_ERR_OPERATIONS_ERROR;
235                 }
236
237                 for (j=0;j<el->num_values;j++) {
238                         list->dn[list->count] = 
239                                 talloc_strdup(list->dn, (char *)el->values[j].data);
240                         if (!list->dn[list->count]) {
241                                 talloc_free(msg);
242                                 return LDB_ERR_OPERATIONS_ERROR;
243                         }
244                         list->count++;
245                 }
246         }
247
248         talloc_free(msg);
249
250         if (list->count > 1) {
251                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
252         }
253
254         return LDB_SUCCESS;
255 }
256
257
258 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
259
260 /*
261   return a list of dn's that might match a leaf indexed search
262  */
263 static int ltdb_index_dn_leaf(struct ldb_module *module, 
264                               const struct ldb_parse_tree *tree,
265                               const struct ldb_message *index_list,
266                               struct dn_list *list)
267 {
268         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
269                 list->dn = talloc_array(list, char *, 1);
270                 if (list->dn == NULL) {
271                         ldb_oom(module->ldb);
272                         return LDB_ERR_OPERATIONS_ERROR;
273                 }
274                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
275                 if (list->dn[0] == NULL) {
276                         ldb_oom(module->ldb);
277                         return LDB_ERR_OPERATIONS_ERROR;
278                 }
279                 list->count = 1;
280                 return LDB_SUCCESS;
281         }
282         return ltdb_index_dn_simple(module, tree, index_list, list);
283 }
284
285
286 /*
287   list intersection
288   list = list & list2
289   relies on the lists being sorted
290 */
291 static int list_intersect(struct ldb_context *ldb,
292                           struct dn_list *list, const struct dn_list *list2)
293 {
294         struct dn_list *list3;
295         unsigned int i;
296
297         if (list->count == 0 || list2->count == 0) {
298                 /* 0 & X == 0 */
299                 return LDB_ERR_NO_SUCH_OBJECT;
300         }
301
302         list3 = talloc(ldb, struct dn_list);
303         if (list3 == NULL) {
304                 return LDB_ERR_OPERATIONS_ERROR;
305         }
306
307         list3->dn = talloc_array(list3, char *, list->count);
308         if (!list3->dn) {
309                 talloc_free(list3);
310                 return LDB_ERR_OPERATIONS_ERROR;
311         }
312         list3->count = 0;
313
314         for (i=0;i<list->count;i++) {
315                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
316                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
317                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
318                         list3->count++;
319                 } else {
320                         talloc_free(list->dn[i]);
321                 }               
322         }
323
324         talloc_free(list->dn);
325         list->dn = talloc_move(list, &list3->dn);
326         list->count = list3->count;
327         talloc_free(list3);
328
329         return LDB_ERR_NO_SUCH_OBJECT;
330 }
331
332
333 /*
334   list union
335   list = list | list2
336   relies on the lists being sorted
337 */
338 static int list_union(struct ldb_context *ldb, 
339                       struct dn_list *list, const struct dn_list *list2)
340 {
341         unsigned int i;
342         char **d;
343         unsigned int count = list->count;
344
345         if (list->count == 0 && list2->count == 0) {
346                 /* 0 | 0 == 0 */
347                 return LDB_ERR_NO_SUCH_OBJECT;
348         }
349
350         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
351         if (!d) {
352                 return LDB_ERR_OPERATIONS_ERROR;
353         }
354         list->dn = d;
355
356         for (i=0;i<list2->count;i++) {
357                 if (ldb_list_find(list2->dn[i], list->dn, count, 
358                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
359                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
360                         if (!list->dn[list->count]) {
361                                 return LDB_ERR_OPERATIONS_ERROR;
362                         }
363                         list->count++;
364                 }               
365         }
366
367         if (list->count != count) {
368                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
369         }
370
371         return LDB_ERR_NO_SUCH_OBJECT;
372 }
373
374 static int ltdb_index_dn(struct ldb_module *module, 
375                          const struct ldb_parse_tree *tree,
376                          const struct ldb_message *index_list,
377                          struct dn_list *list);
378
379
380 /*
381   OR two index results
382  */
383 static int ltdb_index_dn_or(struct ldb_module *module, 
384                             const struct ldb_parse_tree *tree,
385                             const struct ldb_message *index_list,
386                             struct dn_list *list)
387 {
388         struct ldb_context *ldb = module->ldb;
389         unsigned int i;
390         int ret;
391         
392         ret = LDB_ERR_OPERATIONS_ERROR;
393         list->dn = NULL;
394         list->count = 0;
395
396         for (i=0;i<tree->u.list.num_elements;i++) {
397                 struct dn_list *list2;
398                 int v;
399
400                 list2 = talloc(module, struct dn_list);
401                 if (list2 == NULL) {
402                         return LDB_ERR_OPERATIONS_ERROR;
403                 }
404
405                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
406
407                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
408                         /* 0 || X == X */
409                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
410                                 ret = v;
411                         }
412                         talloc_free(list2);
413                         continue;
414                 }
415
416                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
417                         /* 1 || X == 1 */
418                         talloc_free(list->dn);
419                         talloc_free(list2);
420                         return v;
421                 }
422
423                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
424                         ret = LDB_SUCCESS;
425                         list->dn = talloc_move(list, &list2->dn);
426                         list->count = list2->count;
427                 } else {
428                         if (list_union(ldb, list, list2) == -1) {
429                                 talloc_free(list2);
430                                 return LDB_ERR_OPERATIONS_ERROR;
431                         }
432                         ret = LDB_SUCCESS;
433                 }
434                 talloc_free(list2);
435         }
436
437         if (list->count == 0) {
438                 return LDB_ERR_NO_SUCH_OBJECT;
439         }
440
441         return ret;
442 }
443
444
445 /*
446   NOT an index results
447  */
448 static int ltdb_index_dn_not(struct ldb_module *module, 
449                              const struct ldb_parse_tree *tree,
450                              const struct ldb_message *index_list,
451                              struct dn_list *list)
452 {
453         /* the only way to do an indexed not would be if we could
454            negate the not via another not or if we knew the total
455            number of database elements so we could know that the
456            existing expression covered the whole database. 
457            
458            instead, we just give up, and rely on a full index scan
459            (unless an outer & manages to reduce the list)
460         */
461         return LDB_ERR_OPERATIONS_ERROR;
462 }
463
464 /*
465   AND two index results
466  */
467 static int ltdb_index_dn_and(struct ldb_module *module, 
468                              const struct ldb_parse_tree *tree,
469                              const struct ldb_message *index_list,
470                              struct dn_list *list)
471 {
472         struct ldb_context *ldb = module->ldb;
473         unsigned int i;
474         int ret;
475         
476         ret = LDB_ERR_OPERATIONS_ERROR;
477         list->dn = NULL;
478         list->count = 0;
479
480         for (i=0;i<tree->u.list.num_elements;i++) {
481                 struct dn_list *list2;
482                 int v;
483
484                 list2 = talloc(module, struct dn_list);
485                 if (list2 == NULL) {
486                         return LDB_ERR_OPERATIONS_ERROR;
487                 }
488
489                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
490
491                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
492                         /* 0 && X == 0 */
493                         talloc_free(list->dn);
494                         talloc_free(list2);
495                         return LDB_ERR_NO_SUCH_OBJECT;
496                 }
497
498                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
499                         talloc_free(list2);
500                         continue;
501                 }
502
503                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
504                         ret = LDB_SUCCESS;
505                         talloc_free(list->dn);
506                         list->dn = talloc_move(list, &list2->dn);
507                         list->count = list2->count;
508                 } else {
509                         if (list_intersect(ldb, list, list2) == -1) {
510                                 talloc_free(list2);
511                                 return LDB_ERR_OPERATIONS_ERROR;
512                         }
513                 }
514
515                 talloc_free(list2);
516
517                 if (list->count == 0) {
518                         talloc_free(list->dn);
519                         return LDB_ERR_NO_SUCH_OBJECT;
520                 }
521         }
522
523         return ret;
524 }
525
526 /*
527   AND index results and ONE level special index
528  */
529 static int ltdb_index_dn_one(struct ldb_module *module, 
530                              struct ldb_dn *parent_dn,
531                              struct dn_list *list)
532 {
533         struct ldb_context *ldb = module->ldb;
534         struct dn_list *list2;
535         struct ldb_message *msg;
536         struct ldb_dn *key;
537         struct ldb_val val;
538         unsigned int i, j;
539         int ret;
540         
541         list2 = talloc_zero(module, struct dn_list);
542         if (list2 == NULL) {
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         /* the attribute is indexed. Pull the list of DNs that match the 
547            search criterion */
548         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
549         val.length = strlen((char *)val.data);
550         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
551         if (!key) {
552                 talloc_free(list2);
553                 return LDB_ERR_OPERATIONS_ERROR;
554         }
555
556         msg = talloc(list2, struct ldb_message);
557         if (msg == NULL) {
558                 talloc_free(list2);
559                 return LDB_ERR_OPERATIONS_ERROR;
560         }
561
562         ret = ltdb_search_dn1(module, key, msg);
563         talloc_free(key);
564         if (ret != LDB_SUCCESS) {
565                 return ret;
566         }
567
568         for (i = 0; i < msg->num_elements; i++) {
569                 struct ldb_message_element *el;
570
571                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
572                         continue;
573                 }
574
575                 el = &msg->elements[i];
576
577                 list2->dn = talloc_array(list2, char *, el->num_values);
578                 if (!list2->dn) {
579                         talloc_free(list2);
580                         return LDB_ERR_OPERATIONS_ERROR;
581                 }
582
583                 for (j = 0; j < el->num_values; j++) {
584                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
585                         if (!list2->dn[list2->count]) {
586                                 talloc_free(list2);
587                                 return LDB_ERR_OPERATIONS_ERROR;
588                         }
589                         list2->count++;
590                 }
591         }
592
593         if (list2->count == 0) {
594                 talloc_free(list2);
595                 return LDB_ERR_NO_SUCH_OBJECT;
596         }
597
598         if (list2->count > 1) {
599                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
600         }
601
602         if (list->count > 0) {
603                 if (list_intersect(ldb, list, list2) == -1) {
604                         talloc_free(list2);
605                         return LDB_ERR_OPERATIONS_ERROR;
606                 }
607
608                 if (list->count == 0) {
609                         talloc_free(list->dn);
610                         talloc_free(list2);
611                         return LDB_ERR_NO_SUCH_OBJECT;
612                 }
613         } else {
614                 list->dn = talloc_move(list, &list2->dn);
615                 list->count = list2->count;
616         }
617
618         talloc_free(list2);
619
620         return LDB_SUCCESS;
621 }
622
623 /*
624   return a list of dn's that might match a indexed search or
625   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
626  */
627 static int ltdb_index_dn(struct ldb_module *module, 
628                          const struct ldb_parse_tree *tree,
629                          const struct ldb_message *index_list,
630                          struct dn_list *list)
631 {
632         int ret = LDB_ERR_OPERATIONS_ERROR;
633
634         switch (tree->operation) {
635         case LDB_OP_AND:
636                 ret = ltdb_index_dn_and(module, tree, index_list, list);
637                 break;
638
639         case LDB_OP_OR:
640                 ret = ltdb_index_dn_or(module, tree, index_list, list);
641                 break;
642
643         case LDB_OP_NOT:
644                 ret = ltdb_index_dn_not(module, tree, index_list, list);
645                 break;
646
647         case LDB_OP_EQUALITY:
648                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
649                 break;
650
651         case LDB_OP_SUBSTRING:
652         case LDB_OP_GREATER:
653         case LDB_OP_LESS:
654         case LDB_OP_PRESENT:
655         case LDB_OP_APPROX:
656         case LDB_OP_EXTENDED:
657                 /* we can't index with fancy bitops yet */
658                 ret = LDB_ERR_OPERATIONS_ERROR;
659                 break;
660         }
661
662         return ret;
663 }
664
665 /*
666   filter a candidate dn_list from an indexed search into a set of results
667   extracting just the given attributes
668 */
669 static int ltdb_index_filter(const struct dn_list *dn_list, 
670                              struct ltdb_context *ac)
671 {
672         struct ldb_message *msg;
673         unsigned int i;
674
675         for (i = 0; i < dn_list->count; i++) {
676                 struct ldb_dn *dn;
677                 int ret;
678
679                 msg = ldb_msg_new(ac);
680                 if (!msg) {
681                         return LDB_ERR_OPERATIONS_ERROR;
682                 }
683
684                 dn = ldb_dn_new(msg, ac->module->ldb, dn_list->dn[i]);
685                 if (dn == NULL) {
686                         talloc_free(msg);
687                         return LDB_ERR_OPERATIONS_ERROR;
688                 }
689
690                 ret = ltdb_search_dn1(ac->module, dn, msg);
691                 talloc_free(dn);
692                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
693                         /* the record has disappeared? yes, this can happen */
694                         talloc_free(msg);
695                         continue;
696                 }
697
698                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
699                         /* an internal error */
700                         talloc_free(msg);
701                         return LDB_ERR_OPERATIONS_ERROR;
702                 }
703
704                 if (!ldb_match_msg(ac->module->ldb, msg,
705                                    ac->tree, ac->base, ac->scope)) {
706                         talloc_free(msg);
707                         continue;
708                 }
709
710                 /* filter the attributes that the user wants */
711                 ret = ltdb_filter_attrs(msg, ac->attrs);
712
713                 if (ret == -1) {
714                         talloc_free(msg);
715                         return LDB_ERR_OPERATIONS_ERROR;
716                 }
717
718                 ret = ldb_module_send_entry(ac->req, msg);
719                 if (ret != LDB_SUCCESS) {
720                         ac->callback_failed = true;
721                         return ret;
722                 }
723         }
724
725         return LDB_SUCCESS;
726 }
727
728 /*
729   search the database with a LDAP-like expression using indexes
730   returns -1 if an indexed search is not possible, in which
731   case the caller should call ltdb_search_full() 
732 */
733 int ltdb_search_indexed(struct ltdb_context *ac)
734 {
735         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
736         struct dn_list *dn_list;
737         int ret, idxattr, idxone;
738
739         idxattr = idxone = 0;
740         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
741         if (ret == 0 ) {
742                 idxattr = 1;
743         }
744
745         /* We do one level indexing only if requested */
746         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
747         if (ret == 0 ) {
748                 idxone = 1;
749         }
750
751         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
752             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
753                 /* no indexes? must do full search */
754                 return LDB_ERR_OPERATIONS_ERROR;
755         }
756
757         ret = LDB_ERR_OPERATIONS_ERROR;
758
759         dn_list = talloc_zero(ac, struct dn_list);
760         if (dn_list == NULL) {
761                 return LDB_ERR_OPERATIONS_ERROR;
762         }
763
764         if (ac->scope == LDB_SCOPE_BASE) {
765                 /* with BASE searches only one DN can match */
766                 dn_list->dn = talloc_array(dn_list, char *, 1);
767                 if (dn_list->dn == NULL) {
768                         ldb_oom(ac->module->ldb);
769                         return LDB_ERR_OPERATIONS_ERROR;
770                 }
771                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
772                 if (dn_list->dn[0] == NULL) {
773                         ldb_oom(ac->module->ldb);
774                         return LDB_ERR_OPERATIONS_ERROR;
775                 }
776                 dn_list->count = 1;
777                 ret = LDB_SUCCESS;
778         }
779
780         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
781                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
782
783                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
784                         talloc_free(dn_list);
785                         return ret;
786                 }
787         }
788
789         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
790                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
791         }
792
793         if (ret == LDB_SUCCESS) {
794                 /* we've got a candidate list - now filter by the full tree
795                    and extract the needed attributes */
796                 ret = ltdb_index_filter(dn_list, ac);
797         }
798
799         talloc_free(dn_list);
800
801         return ret;
802 }
803
804 /*
805   add a index element where this is the first indexed DN for this value
806 */
807 static int ltdb_index_add1_new(struct ldb_context *ldb, 
808                                struct ldb_message *msg,
809                                const char *dn)
810 {
811         struct ldb_message_element *el;
812
813         /* add another entry */
814         el = talloc_realloc(msg, msg->elements, 
815                                struct ldb_message_element, msg->num_elements+1);
816         if (!el) {
817                 return LDB_ERR_OPERATIONS_ERROR;
818         }
819
820         msg->elements = el;
821         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
822         if (!msg->elements[msg->num_elements].name) {
823                 return LDB_ERR_OPERATIONS_ERROR;
824         }
825         msg->elements[msg->num_elements].num_values = 0;
826         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
827         if (!msg->elements[msg->num_elements].values) {
828                 return LDB_ERR_OPERATIONS_ERROR;
829         }
830         msg->elements[msg->num_elements].values[0].length = strlen(dn);
831         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
832         msg->elements[msg->num_elements].num_values = 1;
833         msg->num_elements++;
834
835         return LDB_SUCCESS;
836 }
837
838
839 /*
840   add a index element where this is not the first indexed DN for this
841   value
842 */
843 static int ltdb_index_add1_add(struct ldb_context *ldb, 
844                                struct ldb_message *msg,
845                                int idx,
846                                const char *dn)
847 {
848         struct ldb_val *v2;
849         unsigned int i;
850
851         /* for multi-valued attributes we can end up with repeats */
852         for (i=0;i<msg->elements[idx].num_values;i++) {
853                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
854                         return LDB_SUCCESS;
855                 }
856         }
857
858         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
859                               struct ldb_val, 
860                               msg->elements[idx].num_values+1);
861         if (!v2) {
862                 return LDB_ERR_OPERATIONS_ERROR;
863         }
864         msg->elements[idx].values = v2;
865
866         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
867         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
868         msg->elements[idx].num_values++;
869
870         return LDB_SUCCESS;
871 }
872
873 /*
874   add an index entry for one message element
875 */
876 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
877                            struct ldb_message_element *el, int v_idx)
878 {
879         struct ldb_context *ldb = module->ldb;
880         struct ldb_message *msg;
881         struct ldb_dn *dn_key;
882         int ret;
883         unsigned int i;
884
885         msg = talloc(module, struct ldb_message);
886         if (msg == NULL) {
887                 errno = ENOMEM;
888                 return LDB_ERR_OPERATIONS_ERROR;
889         }
890
891         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
892         if (!dn_key) {
893                 talloc_free(msg);
894                 return LDB_ERR_OPERATIONS_ERROR;
895         }
896         talloc_steal(msg, dn_key);
897
898         ret = ltdb_search_dn1(module, dn_key, msg);
899         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
900                 talloc_free(msg);
901                 return ret;
902         }
903
904         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
905                 msg->dn = dn_key;
906                 msg->num_elements = 0;
907                 msg->elements = NULL;
908         }
909
910         for (i=0;i<msg->num_elements;i++) {
911                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
912                         break;
913                 }
914         }
915
916         if (i == msg->num_elements) {
917                 ret = ltdb_index_add1_new(ldb, msg, dn);
918         } else {
919                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
920         }
921
922         if (ret == LDB_SUCCESS) {
923                 ret = ltdb_store(module, msg, TDB_REPLACE);
924         }
925
926         talloc_free(msg);
927
928         return ret;
929 }
930
931 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
932                            struct ldb_message_element *elements, int num_el)
933 {
934         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
935         int ret;
936         unsigned int i, j;
937
938         if (dn[0] == '@') {
939                 return LDB_SUCCESS;
940         }
941
942         if (ltdb->cache->indexlist->num_elements == 0) {
943                 /* no indexed fields */
944                 return LDB_SUCCESS;
945         }
946
947         for (i = 0; i < num_el; i++) {
948                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
949                                        NULL, LTDB_IDXATTR);
950                 if (ret == -1) {
951                         continue;
952                 }
953                 for (j = 0; j < elements[i].num_values; j++) {
954                         ret = ltdb_index_add1(module, dn, &elements[i], j);
955                         if (ret != LDB_SUCCESS) {
956                                 return ret;
957                         }
958                 }
959         }
960
961         return LDB_SUCCESS;
962 }
963
964 /*
965   add the index entries for a new record
966 */
967 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
968 {
969         const char *dn;
970         int ret;
971
972         dn = ldb_dn_get_linearized(msg->dn);
973         if (dn == NULL) {
974                 return LDB_ERR_OPERATIONS_ERROR;
975         }
976
977         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
978
979         return ret;
980 }
981
982
983 /*
984   delete an index entry for one message element
985 */
986 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
987                          struct ldb_message_element *el, int v_idx)
988 {
989         struct ldb_context *ldb = module->ldb;
990         struct ldb_message *msg;
991         struct ldb_dn *dn_key;
992         int ret, i;
993         unsigned int j;
994
995         if (dn[0] == '@') {
996                 return LDB_SUCCESS;
997         }
998
999         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1000         if (!dn_key) {
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         msg = talloc(dn_key, struct ldb_message);
1005         if (msg == NULL) {
1006                 talloc_free(dn_key);
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         ret = ltdb_search_dn1(module, dn_key, msg);
1011         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1012                 talloc_free(dn_key);
1013                 return ret;
1014         }
1015
1016         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1017                 /* it wasn't indexed. Did we have an earlier error? If we did then
1018                    its gone now */
1019                 talloc_free(dn_key);
1020                 return LDB_SUCCESS;
1021         }
1022
1023         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1024         if (i == -1) {
1025                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1026                                 "ERROR: dn %s not found in %s\n", dn,
1027                                 ldb_dn_get_linearized(dn_key));
1028                 /* it ain't there. hmmm */
1029                 talloc_free(dn_key);
1030                 return LDB_SUCCESS;
1031         }
1032
1033         if (j != msg->elements[i].num_values - 1) {
1034                 memmove(&msg->elements[i].values[j], 
1035                         &msg->elements[i].values[j+1], 
1036                         (msg->elements[i].num_values-(j+1)) * 
1037                         sizeof(msg->elements[i].values[0]));
1038         }
1039         msg->elements[i].num_values--;
1040
1041         if (msg->elements[i].num_values == 0) {
1042                 ret = ltdb_delete_noindex(module, dn_key);
1043         } else {
1044                 ret = ltdb_store(module, msg, TDB_REPLACE);
1045         }
1046
1047         talloc_free(dn_key);
1048
1049         return ret;
1050 }
1051
1052 /*
1053   delete the index entries for a record
1054   return -1 on failure
1055 */
1056 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1057 {
1058         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1059         int ret;
1060         const char *dn;
1061         unsigned int i, j;
1062
1063         /* find the list of indexed fields */   
1064         if (ltdb->cache->indexlist->num_elements == 0) {
1065                 /* no indexed fields */
1066                 return LDB_SUCCESS;
1067         }
1068
1069         if (ldb_dn_is_special(msg->dn)) {
1070                 return LDB_SUCCESS;
1071         }
1072
1073         dn = ldb_dn_get_linearized(msg->dn);
1074         if (dn == NULL) {
1075                 return LDB_ERR_OPERATIONS_ERROR;
1076         }
1077
1078         for (i = 0; i < msg->num_elements; i++) {
1079                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1080                                        NULL, LTDB_IDXATTR);
1081                 if (ret == -1) {
1082                         continue;
1083                 }
1084                 for (j = 0; j < msg->elements[i].num_values; j++) {
1085                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1086                         if (ret != LDB_SUCCESS) {
1087                                 return ret;
1088                         }
1089                 }
1090         }
1091
1092         return LDB_SUCCESS;
1093 }
1094
1095 /* 
1096   handle special index for one level searches
1097 */
1098 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1099 {
1100         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1101         struct ldb_message_element el;
1102         struct ldb_val val;
1103         struct ldb_dn *pdn;
1104         const char *dn;
1105         int ret;
1106
1107         /* We index for ONE Level only if requested */
1108         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1109         if (ret != 0) {
1110                 return LDB_SUCCESS;
1111         }
1112
1113         pdn = ldb_dn_get_parent(module, msg->dn);
1114         if (pdn == NULL) {
1115                 return LDB_ERR_OPERATIONS_ERROR;
1116         }
1117
1118         dn = ldb_dn_get_linearized(msg->dn);
1119         if (dn == NULL) {
1120                 talloc_free(pdn);
1121                 return LDB_ERR_OPERATIONS_ERROR;
1122         }
1123
1124         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1125         if (val.data == NULL) {
1126                 talloc_free(pdn);
1127                 return LDB_ERR_OPERATIONS_ERROR;
1128         }
1129
1130         val.length = strlen((char *)val.data);
1131         el.name = LTDB_IDXONE;
1132         el.values = &val;
1133         el.num_values = 1;
1134
1135         if (add) {
1136                 ret = ltdb_index_add1(module, dn, &el, 0);
1137         } else { /* delete */
1138                 ret = ltdb_index_del_value(module, dn, &el, 0);
1139         }
1140
1141         talloc_free(pdn);
1142
1143         return ret;
1144 }
1145
1146
1147 /*
1148   traversal function that deletes all @INDEX records
1149 */
1150 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1151 {
1152         const char *dn = "DN=" LTDB_INDEX ":";
1153         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1154                 return tdb_delete(tdb, key);
1155         }
1156         return 0;
1157 }
1158
1159 /*
1160   traversal function that adds @INDEX records during a re index
1161 */
1162 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1163 {
1164         struct ldb_module *module = (struct ldb_module *)state;
1165         struct ldb_message *msg;
1166         const char *dn = NULL;
1167         int ret;
1168         TDB_DATA key2;
1169
1170         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1171             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1172                 return 0;
1173         }
1174
1175         msg = talloc(module, struct ldb_message);
1176         if (msg == NULL) {
1177                 return -1;
1178         }
1179
1180         ret = ltdb_unpack_data(module, &data, msg);
1181         if (ret != 0) {
1182                 talloc_free(msg);
1183                 return -1;
1184         }
1185
1186         /* check if the DN key has changed, perhaps due to the 
1187            case insensitivity of an element changing */
1188         key2 = ltdb_key(module, msg->dn);
1189         if (key2.dptr == NULL) {
1190                 /* probably a corrupt record ... darn */
1191                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1192                                                         ldb_dn_get_linearized(msg->dn));
1193                 talloc_free(msg);
1194                 return 0;
1195         }
1196         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1197                 tdb_delete(tdb, key);
1198                 tdb_store(tdb, key2, data, 0);
1199         }
1200         talloc_free(key2.dptr);
1201
1202         if (msg->dn == NULL) {
1203                 dn = (char *)key.dptr + 3;
1204         } else {
1205                 dn = ldb_dn_get_linearized(msg->dn);
1206         }
1207
1208         ret = ltdb_index_one(module, msg, 1);
1209         if (ret == LDB_SUCCESS) {
1210                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1211         } else {
1212                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1213                         "Adding special ONE LEVEL index failed (%s)!\n",
1214                         ldb_dn_get_linearized(msg->dn));
1215         }
1216
1217         talloc_free(msg);
1218
1219         if (ret != LDB_SUCCESS) return -1;
1220
1221         return 0;
1222 }
1223
1224 /*
1225   force a complete reindex of the database
1226 */
1227 int ltdb_reindex(struct ldb_module *module)
1228 {
1229         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1230         int ret;
1231
1232         if (ltdb_cache_reload(module) != 0) {
1233                 return LDB_ERR_OPERATIONS_ERROR;
1234         }
1235
1236         /* first traverse the database deleting any @INDEX records */
1237         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1238         if (ret == -1) {
1239                 return LDB_ERR_OPERATIONS_ERROR;
1240         }
1241
1242         /* if we don't have indexes we have nothing todo */
1243         if (ltdb->cache->indexlist->num_elements == 0) {
1244                 return LDB_SUCCESS;
1245         }
1246
1247         /* now traverse adding any indexes for normal LDB records */
1248         ret = tdb_traverse(ltdb->tdb, re_index, module);
1249         if (ret == -1) {
1250                 return LDB_ERR_OPERATIONS_ERROR;
1251         }
1252
1253         if (tdb_repack(ltdb->tdb) != 0) {
1254                 return LDB_ERR_OPERATIONS_ERROR;                
1255         }
1256
1257         return LDB_SUCCESS;
1258 }