r13258: Fix the talloc heirachy for ldb_tdb.
[gd/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         struct ldb_dn *ret;
104         char *dn;
105         struct ldb_val v;
106         const struct ldb_attrib_handler *h;
107         char *attr_folded;
108
109         attr_folded = ldb_casefold(ldb, attr);
110         if (!attr_folded) {
111                 return NULL;
112         }
113
114         h = ldb_attrib_handler(ldb, attr);
115         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116                 /* canonicalisation can be refused. For example, 
117                    a attribute that takes wildcards will refuse to canonicalise
118                    if the value contains a wildcard */
119                 talloc_free(attr_folded);
120                 return NULL;
121         }
122         if (ldb_should_b64_encode(&v)) {
123                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124                 if (!vstr) return NULL;
125                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
126                 talloc_free(vstr);
127                 if (v.data != value->data) {
128                         talloc_free(v.data);
129                 }
130                 talloc_free(attr_folded);
131                 if (dn == NULL) return NULL;
132                 goto done;
133         }
134
135         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
136                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137
138         if (v.data != value->data) {
139                 talloc_free(v.data);
140         }
141         talloc_free(attr_folded);
142
143 done:
144         ret = ldb_dn_explode(ldb, dn);
145         talloc_free(dn);
146         return ret;
147 }
148
149 /*
150   see if a attribute value is in the list of indexed attributes
151 */
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153                             unsigned int *v_idx, const char *key)
154 {
155         unsigned int i, j;
156         for (i=0;i<msg->num_elements;i++) {
157                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158                         const struct ldb_message_element *el = 
159                                 &msg->elements[i];
160                         for (j=0;j<el->num_values;j++) {
161                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
162                                         if (v_idx) {
163                                                 *v_idx = j;
164                                         }
165                                         return i;
166                                 }
167                         }
168                 }
169         }
170         return -1;
171 }
172
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
175 {
176         return strcmp(*s1, *s2);
177 }
178
179 /*
180   return a list of dn's that might match a simple indexed search or
181  */
182 static int ltdb_index_dn_simple(struct ldb_module *module, 
183                                 struct ldb_parse_tree *tree,
184                                 const struct ldb_message *index_list,
185                                 struct dn_list *list)
186 {
187         struct ldb_context *ldb = module->ldb;
188         struct ldb_dn *dn;
189         int ret;
190         unsigned int i, j;
191         struct ldb_message *msg;
192
193         list->count = 0;
194         list->dn = NULL;
195
196         /* if the attribute isn't in the list of indexed attributes then
197            this node needs a full search */
198         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
199                 return -1;
200         }
201
202         /* the attribute is indexed. Pull the list of DNs that match the 
203            search criterion */
204         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
205         if (!dn) return -1;
206
207         msg = talloc(list, struct ldb_message);
208         if (msg == NULL) {
209                 return -1;
210         }
211
212         ret = ltdb_search_dn1(module, dn, msg);
213         talloc_free(dn);
214         if (ret == 0 || ret == -1) {
215                 return ret;
216         }
217
218         for (i=0;i<msg->num_elements;i++) {
219                 struct ldb_message_element *el;
220
221                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
222                         continue;
223                 }
224
225                 el = &msg->elements[i];
226
227                 list->dn = talloc_array(list, char *, el->num_values);
228                 if (!list->dn) {
229                         break;          
230                 }
231
232                 for (j=0;j<el->num_values;j++) {
233                         list->dn[list->count] = 
234                                 talloc_strdup(list->dn, (char *)el->values[j].data);
235                         if (!list->dn[list->count]) {
236                                 return -1;
237                         }
238                         list->count++;
239                 }
240         }
241
242         talloc_free(msg);
243
244         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
245
246         return 1;
247 }
248
249
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
251
252 /*
253   return a list of dn's that might match a simple indexed search on
254   the special objectclass attribute
255  */
256 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
257                                      struct ldb_parse_tree *tree,
258                                      const struct ldb_message *index_list,
259                                      struct dn_list *list)
260 {
261         struct ldb_context *ldb = module->ldb;
262         unsigned int i;
263         int ret;
264         const char *target = (const char *)tree->u.equality.value.data;
265         const char **subclasses;
266
267         list->count = 0;
268         list->dn = NULL;
269
270         ret = ltdb_index_dn_simple(module, tree, index_list, list);
271
272         subclasses = ldb_subclass_list(module->ldb, target);
273
274         if (subclasses == NULL) {
275                 return ret;
276         }
277
278         for (i=0;subclasses[i];i++) {
279                 struct ldb_parse_tree tree2;
280                 struct dn_list *list2;
281                 tree2.operation = LDB_OP_EQUALITY;
282                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
283                 if (!tree2.u.equality.attr) {
284                         return -1;
285                 }
286                 tree2.u.equality.value.data = 
287                         (uint8_t *)talloc_strdup(list, subclasses[i]);
288                 if (tree2.u.equality.value.data == NULL) {
289                         return -1;                      
290                 }
291                 tree2.u.equality.value.length = strlen(subclasses[i]);
292                 list2 = talloc(list, struct dn_list);
293                 if (list2 == NULL) {
294                         talloc_free(tree2.u.equality.value.data);
295                         return -1;
296                 }
297                 if (ltdb_index_dn_objectclass(module, &tree2, 
298                                               index_list, list2) == 1) {
299                         if (list->count == 0) {
300                                 *list = *list2;
301                                 ret = 1;
302                         } else {
303                                 list_union(ldb, list, list2);
304                                 talloc_free(list2);
305                         }
306                 }
307                 talloc_free(tree2.u.equality.value.data);
308         }
309
310         return ret;
311 }
312
313 /*
314   return a list of dn's that might match a leaf indexed search
315  */
316 static int ltdb_index_dn_leaf(struct ldb_module *module, 
317                               struct ldb_parse_tree *tree,
318                               const struct ldb_message *index_list,
319                               struct dn_list *list)
320 {
321         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
322                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
323         }
324         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
325                 list->dn = talloc_array(list, char *, 1);
326                 if (list->dn == NULL) {
327                         ldb_oom(module->ldb);
328                         return -1;
329                 }
330                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
331                 if (list->dn[0] == NULL) {
332                         ldb_oom(module->ldb);
333                         return -1;
334                 }
335                 list->count = 1;
336                 return 1;
337         }
338         return ltdb_index_dn_simple(module, tree, index_list, list);
339 }
340
341
342 /*
343   list intersection
344   list = list & list2
345   relies on the lists being sorted
346 */
347 static int list_intersect(struct ldb_context *ldb,
348                           struct dn_list *list, const struct dn_list *list2)
349 {
350         struct dn_list *list3;
351         unsigned int i;
352
353         if (list->count == 0 || list2->count == 0) {
354                 /* 0 & X == 0 */
355                 return 0;
356         }
357
358         list3 = talloc(ldb, struct dn_list);
359         if (list3 == NULL) {
360                 return -1;
361         }
362
363         list3->dn = talloc_array(list3, char *, list->count);
364         if (!list3->dn) {
365                 talloc_free(list3);
366                 return -1;
367         }
368         list3->count = 0;
369
370         for (i=0;i<list->count;i++) {
371                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
372                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
373                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
374                         list3->count++;
375                 } else {
376                         talloc_free(list->dn[i]);
377                 }               
378         }
379
380         talloc_free(list->dn);
381         list->dn = talloc_steal(list, list3->dn);
382         list->count = list3->count;
383         talloc_free(list3);
384
385         return 0;
386 }
387
388
389 /*
390   list union
391   list = list | list2
392   relies on the lists being sorted
393 */
394 static int list_union(struct ldb_context *ldb, 
395                       struct dn_list *list, const struct dn_list *list2)
396 {
397         unsigned int i;
398         char **d;
399         unsigned int count = list->count;
400
401         if (list->count == 0 && list2->count == 0) {
402                 /* 0 | 0 == 0 */
403                 return 0;
404         }
405
406         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
407         if (!d) {
408                 return -1;
409         }
410         list->dn = d;
411
412         for (i=0;i<list2->count;i++) {
413                 if (ldb_list_find(list2->dn[i], list->dn, count, 
414                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
415                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
416                         if (!list->dn[list->count]) {
417                                 return -1;
418                         }
419                         list->count++;
420                 }               
421         }
422
423         if (list->count != count) {
424                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
425         }
426
427         return 0;
428 }
429
430 static int ltdb_index_dn(struct ldb_module *module, 
431                          struct ldb_parse_tree *tree,
432                          const struct ldb_message *index_list,
433                          struct dn_list *list);
434
435
436 /*
437   OR two index results
438  */
439 static int ltdb_index_dn_or(struct ldb_module *module, 
440                             struct ldb_parse_tree *tree,
441                             const struct ldb_message *index_list,
442                             struct dn_list *list)
443 {
444         struct ldb_context *ldb = module->ldb;
445         unsigned int i;
446         int ret;
447         
448         ret = -1;
449         list->dn = NULL;
450         list->count = 0;
451
452         for (i=0;i<tree->u.list.num_elements;i++) {
453                 struct dn_list *list2;
454                 int v;
455
456                 list2 = talloc(module, struct dn_list);
457                 if (list2 == NULL) {
458                         return -1;
459                 }
460
461                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
462
463                 if (v == 0) {
464                         /* 0 || X == X */
465                         if (ret == -1) {
466                                 ret = 0;
467                         }
468                         talloc_free(list2);
469                         continue;
470                 }
471
472                 if (v == -1) {
473                         /* 1 || X == 1 */
474                         talloc_free(list->dn);
475                         talloc_free(list2);
476                         return -1;
477                 }
478
479                 if (ret == -1) {
480                         ret = 1;
481                         list->dn = talloc_steal(list, list2->dn);
482                         list->count = list2->count;
483                 } else {
484                         if (list_union(ldb, list, list2) == -1) {
485                                 talloc_free(list2);
486                                 return -1;
487                         }
488                         ret = 1;
489                 }
490                 talloc_free(list2);
491         }
492
493         if (list->count == 0) {
494                 return 0;
495         }
496
497         return ret;
498 }
499
500
501 /*
502   NOT an index results
503  */
504 static int ltdb_index_dn_not(struct ldb_module *module, 
505                              struct ldb_parse_tree *tree,
506                              const struct ldb_message *index_list,
507                              struct dn_list *list)
508 {
509         /* the only way to do an indexed not would be if we could
510            negate the not via another not or if we knew the total
511            number of database elements so we could know that the
512            existing expression covered the whole database. 
513            
514            instead, we just give up, and rely on a full index scan
515            (unless an outer & manages to reduce the list)
516         */
517         return -1;
518 }
519
520 /*
521   AND two index results
522  */
523 static int ltdb_index_dn_and(struct ldb_module *module, 
524                              struct ldb_parse_tree *tree,
525                              const struct ldb_message *index_list,
526                              struct dn_list *list)
527 {
528         struct ldb_context *ldb = module->ldb;
529         unsigned int i;
530         int ret;
531         
532         ret = -1;
533         list->dn = NULL;
534         list->count = 0;
535
536         for (i=0;i<tree->u.list.num_elements;i++) {
537                 struct dn_list *list2;
538                 int v;
539
540                 list2 = talloc(module, struct dn_list);
541                 if (list2 == NULL) {
542                         return -1;
543                 }
544
545                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
546
547                 if (v == 0) {
548                         /* 0 && X == 0 */
549                         talloc_free(list->dn);
550                         talloc_free(list2);
551                         return 0;
552                 }
553
554                 if (v == -1) {
555                         talloc_free(list2);
556                         continue;
557                 }
558
559                 if (ret == -1) {
560                         ret = 1;
561                         talloc_free(list->dn);
562                         list->dn = talloc_steal(list, list2->dn);
563                         list->count = list2->count;
564                 } else {
565                         if (list_intersect(ldb, list, list2) == -1) {
566                                 talloc_free(list2);
567                                 return -1;
568                         }
569                 }
570
571                 talloc_free(list2);
572
573                 if (list->count == 0) {
574                         talloc_free(list->dn);
575                         return 0;
576                 }
577         }
578
579         return ret;
580 }
581
582 /*
583   return a list of dn's that might match a indexed search or
584   -1 if an error. return 0 for no matches, or 1 for matches
585  */
586 static int ltdb_index_dn(struct ldb_module *module, 
587                          struct ldb_parse_tree *tree,
588                          const struct ldb_message *index_list,
589                          struct dn_list *list)
590 {
591         int ret = -1;
592
593         switch (tree->operation) {
594         case LDB_OP_AND:
595                 ret = ltdb_index_dn_and(module, tree, index_list, list);
596                 break;
597
598         case LDB_OP_OR:
599                 ret = ltdb_index_dn_or(module, tree, index_list, list);
600                 break;
601
602         case LDB_OP_NOT:
603                 ret = ltdb_index_dn_not(module, tree, index_list, list);
604                 break;
605
606         case LDB_OP_EQUALITY:
607                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
608                 break;
609
610         case LDB_OP_SUBSTRING:
611         case LDB_OP_GREATER:
612         case LDB_OP_LESS:
613         case LDB_OP_PRESENT:
614         case LDB_OP_APPROX:
615         case LDB_OP_EXTENDED:
616                 /* we can't index with fancy bitops yet */
617                 ret = -1;
618                 break;
619         }
620
621         return ret;
622 }
623
624 /*
625   filter a candidate dn_list from an indexed search into a set of results
626   extracting just the given attributes
627 */
628 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
629                             const struct ldb_dn *base,
630                             enum ldb_scope scope,
631                             const struct dn_list *dn_list, 
632                             const char * const attrs[], struct ldb_result *res)
633 {
634         unsigned int i;
635
636         for (i = 0; i < dn_list->count; i++) {
637                 struct ldb_message *msg;
638                 struct ldb_dn *dn;
639                 int ret;
640
641                 msg = talloc(module, struct ldb_message);
642                 if (msg == NULL) {
643                         return LDB_ERR_OTHER;
644                 }
645
646                 dn = ldb_dn_explode(msg, dn_list->dn[i]);
647                 if (dn == NULL) {
648                         talloc_free(msg);
649                         return LDB_ERR_OTHER;
650                 }
651
652                 ret = ltdb_search_dn1(module, dn, msg);
653                 talloc_free(dn);
654                 if (ret == 0) {
655                         /* the record has disappeared? yes, this can happen */
656                         talloc_free(msg);
657                         continue;
658                 }
659
660                 if (ret == -1) {
661                         /* an internal error */
662                         talloc_free(msg);
663                         return LDB_ERR_OTHER;
664                 }
665
666                 ret = 0;
667                 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
668                         ret = ltdb_add_attr_results(module, res, msg, 
669                                                     attrs, &(res->count), &(res->msgs));
670                 }
671                 talloc_free(msg);
672                 if (ret != 0) {
673                         return LDB_ERR_OTHER;
674                 }
675         }
676
677         return LDB_SUCCESS;
678 }
679
680 /*
681   search the database with a LDAP-like expression using indexes
682   returns -1 if an indexed search is not possible, in which
683   case the caller should call ltdb_search_full() 
684 */
685 int ltdb_search_indexed(struct ldb_module *module, 
686                         const struct ldb_dn *base,
687                         enum ldb_scope scope,
688                         struct ldb_parse_tree *tree,
689                         const char * const attrs[], struct ldb_result **res)
690 {
691         struct ltdb_private *ltdb = module->private_data;
692         struct dn_list *dn_list;
693         int ret;
694
695         if (ltdb->cache->indexlist->num_elements == 0 && 
696             scope != LDB_SCOPE_BASE) {
697                 /* no index list? must do full search */
698                 return -1;
699         }
700
701         dn_list = talloc(module, struct dn_list);
702         if (dn_list == NULL) {
703                 return -1;
704         }
705
706         *res = talloc(module, struct ldb_result);
707         if (*res == NULL) {
708                 return LDB_ERR_OTHER;
709         }
710         (*res)->count = 0;
711         (*res)->msgs = NULL;
712         (*res)->controls = NULL;
713
714         if (scope == LDB_SCOPE_BASE) {
715                 /* with BASE searches only one DN can match */
716                 dn_list->dn = talloc_array(dn_list, char *, 1);
717                 if (dn_list->dn == NULL) {
718                         ldb_oom(module->ldb);
719                         return -1;
720                 }
721                 dn_list->dn[0] = ldb_dn_linearize(dn_list, base);
722                 if (dn_list->dn[0] == NULL) {
723                         ldb_oom(module->ldb);
724                         return -1;
725                 }
726                 dn_list->count = 1;
727                 ret = 1;
728         } else {
729                 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
730         }
731
732         if (ret == 1) {
733                 /* we've got a candidate list - now filter by the full tree
734                    and extract the needed attributes */
735                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
736                                        attrs, *res);
737         }
738
739         talloc_free(dn_list);
740
741         return ret;
742 }
743
744 /*
745   add a index element where this is the first indexed DN for this value
746 */
747 static int ltdb_index_add1_new(struct ldb_context *ldb, 
748                                struct ldb_message *msg,
749                                struct ldb_message_element *el,
750                                char *dn)
751 {
752         struct ldb_message_element *el2;
753
754         /* add another entry */
755         el2 = talloc_realloc(msg, msg->elements, 
756                                struct ldb_message_element, msg->num_elements+1);
757         if (!el2) {
758                 return -1;
759         }
760
761         msg->elements = el2;
762         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
763         if (!msg->elements[msg->num_elements].name) {
764                 return -1;
765         }
766         msg->elements[msg->num_elements].num_values = 0;
767         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
768         if (!msg->elements[msg->num_elements].values) {
769                 return -1;
770         }
771         msg->elements[msg->num_elements].values[0].length = strlen(dn);
772         msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
773         msg->elements[msg->num_elements].num_values = 1;
774         msg->num_elements++;
775
776         return 0;
777 }
778
779
780 /*
781   add a index element where this is not the first indexed DN for this
782   value
783 */
784 static int ltdb_index_add1_add(struct ldb_context *ldb, 
785                                struct ldb_message *msg,
786                                struct ldb_message_element *el,
787                                int idx,
788                                char *dn)
789 {
790         struct ldb_val *v2;
791         unsigned int i;
792
793         /* for multi-valued attributes we can end up with repeats */
794         for (i=0;i<msg->elements[idx].num_values;i++) {
795                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
796                         return 0;
797                 }
798         }
799
800         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
801                               struct ldb_val, 
802                               msg->elements[idx].num_values+1);
803         if (!v2) {
804                 return -1;
805         }
806         msg->elements[idx].values = v2;
807
808         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
809         msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
810         msg->elements[idx].num_values++;
811
812         return 0;
813 }
814
815 /*
816   add an index entry for one message element
817 */
818 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
819                            struct ldb_message_element *el, int v_idx)
820 {
821         struct ldb_context *ldb = module->ldb;
822         struct ldb_message *msg;
823         struct ldb_dn *dn_key;
824         int ret;
825         unsigned int i;
826
827         msg = talloc(module, struct ldb_message);
828         if (msg == NULL) {
829                 errno = ENOMEM;
830                 return -1;
831         }
832
833         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
834         if (!dn_key) {
835                 talloc_free(msg);
836                 errno = ENOMEM;
837                 return -1;
838         }
839         talloc_steal(msg, dn_key);
840
841         ret = ltdb_search_dn1(module, dn_key, msg);
842         if (ret == -1) {
843                 talloc_free(msg);
844                 return -1;
845         }
846
847         if (ret == 0) {
848                 msg->dn = dn_key;
849                 msg->num_elements = 0;
850                 msg->elements = NULL;
851         }
852
853         for (i=0;i<msg->num_elements;i++) {
854                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
855                         break;
856                 }
857         }
858
859         if (i == msg->num_elements) {
860                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
861         } else {
862                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
863         }
864
865         if (ret == 0) {
866                 ret = ltdb_store(module, msg, TDB_REPLACE);
867         }
868
869         talloc_free(msg);
870
871         return ret;
872 }
873
874 static int ltdb_index_add0(struct ldb_module *module, char *dn,
875                            struct ldb_message_element *elements, int num_el)
876 {
877         struct ltdb_private *ltdb = module->private_data;
878         int ret;
879         unsigned int i, j;
880
881         if (dn[0] == '@') {
882                 return 0;
883         }
884
885         if (ltdb->cache->indexlist->num_elements == 0) {
886                 /* no indexed fields */
887                 return 0;
888         }
889
890         for (i = 0; i < num_el; i++) {
891                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
892                                        NULL, LTDB_IDXATTR);
893                 if (ret == -1) {
894                         continue;
895                 }
896                 for (j = 0; j < elements[i].num_values; j++) {
897                         ret = ltdb_index_add1(module, dn, &elements[i], j);
898                         if (ret == -1) {
899                                 talloc_free(dn);
900                                 return -1;
901                         }
902                 }
903         }
904
905         return 0;
906 }
907
908 /*
909   add the index entries for a new record
910   return -1 on failure
911 */
912 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
913 {
914         struct ltdb_private *ltdb = module->private_data;
915         char *dn;
916         int ret;
917
918         dn = ldb_dn_linearize(ltdb, msg->dn);
919         if (dn == NULL) {
920                 return -1;
921         }
922
923         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
924
925         talloc_free(dn);
926
927         return ret;
928 }
929
930
931 /*
932   delete an index entry for one message element
933 */
934 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
935                          struct ldb_message_element *el, int v_idx)
936 {
937         struct ldb_context *ldb = module->ldb;
938         struct ldb_message *msg;
939         struct ldb_dn *dn_key;
940         int ret, i;
941         unsigned int j;
942
943         if (dn[0] == '@') {
944                 return 0;
945         }
946
947         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
948         if (!dn_key) {
949                 return -1;
950         }
951
952         msg = talloc(dn_key, struct ldb_message);
953         if (msg == NULL) {
954                 talloc_free(dn_key);
955                 return -1;
956         }
957
958         ret = ltdb_search_dn1(module, dn_key, msg);
959         if (ret == -1) {
960                 talloc_free(dn_key);
961                 return -1;
962         }
963
964         if (ret == 0) {
965                 /* it wasn't indexed. Did we have an earlier error? If we did then
966                    its gone now */
967                 talloc_free(dn_key);
968                 return 0;
969         }
970
971         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
972         if (i == -1) {
973                 ldb_debug(ldb, LDB_DEBUG_ERROR,
974                                 "ERROR: dn %s not found in %s\n", dn,
975                                 ldb_dn_linearize(dn_key, dn_key));
976                 /* it ain't there. hmmm */
977                 talloc_free(dn_key);
978                 return 0;
979         }
980
981         if (j != msg->elements[i].num_values - 1) {
982                 memmove(&msg->elements[i].values[j], 
983                         &msg->elements[i].values[j+1], 
984                         (msg->elements[i].num_values-(j+1)) * 
985                         sizeof(msg->elements[i].values[0]));
986         }
987         msg->elements[i].num_values--;
988
989         if (msg->elements[i].num_values == 0) {
990                 ret = ltdb_delete_noindex(module, dn_key);
991         } else {
992                 ret = ltdb_store(module, msg, TDB_REPLACE);
993         }
994
995         talloc_free(dn_key);
996
997         return ret;
998 }
999
1000 /*
1001   delete the index entries for a record
1002   return -1 on failure
1003 */
1004 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1005 {
1006         struct ltdb_private *ltdb = module->private_data;
1007         int ret;
1008         char *dn;
1009         unsigned int i, j;
1010
1011         if (ldb_dn_is_special(msg->dn)) {
1012                 return 0;
1013         }
1014
1015         dn = ldb_dn_linearize(ltdb, msg->dn);
1016         if (dn == NULL) {
1017                 return -1;
1018         }
1019
1020         /* find the list of indexed fields */   
1021         if (ltdb->cache->indexlist->num_elements == 0) {
1022                 /* no indexed fields */
1023                 return 0;
1024         }
1025
1026         for (i = 0; i < msg->num_elements; i++) {
1027                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1028                                        NULL, LTDB_IDXATTR);
1029                 if (ret == -1) {
1030                         continue;
1031                 }
1032                 for (j = 0; j < msg->elements[i].num_values; j++) {
1033                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1034                         if (ret == -1) {
1035                                 talloc_free(dn);
1036                                 return -1;
1037                         }
1038                 }
1039         }
1040
1041         talloc_free(dn);
1042         return 0;
1043 }
1044
1045
1046 /*
1047   traversal function that deletes all @INDEX records
1048 */
1049 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1050 {
1051         const char *dn = "DN=" LTDB_INDEX ":";
1052         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1053                 return tdb_delete(tdb, key);
1054         }
1055         return 0;
1056 }
1057
1058 /*
1059   traversal function that adds @INDEX records during a re index
1060 */
1061 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1062 {
1063         struct ldb_module *module = state;
1064         struct ldb_message *msg;
1065         char *dn = NULL;
1066         int ret;
1067         TDB_DATA key2;
1068
1069         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1070             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1071                 return 0;
1072         }
1073
1074         msg = talloc(module, struct ldb_message);
1075         if (msg == NULL) {
1076                 return -1;
1077         }
1078
1079         ret = ltdb_unpack_data(module, &data, msg);
1080         if (ret != 0) {
1081                 talloc_free(msg);
1082                 return -1;
1083         }
1084
1085         /* check if the DN key has changed, perhaps due to the 
1086            case insensitivity of an element changing */
1087         key2 = ltdb_key(module, msg->dn);
1088         if (key2.dptr == NULL) {
1089                 /* probably a corrupt record ... darn */
1090                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1091                                                         ldb_dn_linearize(msg, msg->dn));
1092                 talloc_free(msg);
1093                 return 0;
1094         }
1095         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1096                 tdb_delete(tdb, key);
1097                 tdb_store(tdb, key2, data, 0);
1098         }
1099         talloc_free(key2.dptr);
1100
1101         if (msg->dn == NULL) {
1102                 dn = (char *)key.dptr + 3;
1103         } else {
1104                 dn = ldb_dn_linearize(msg->dn, msg->dn);
1105         }
1106
1107         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1108
1109         talloc_free(msg);
1110
1111         return ret;
1112 }
1113
1114 /*
1115   force a complete reindex of the database
1116 */
1117 int ltdb_reindex(struct ldb_module *module)
1118 {
1119         struct ltdb_private *ltdb = module->private_data;
1120         int ret;
1121
1122         if (ltdb_cache_reload(module) != 0) {
1123                 return -1;
1124         }
1125
1126         /* first traverse the database deleting any @INDEX records */
1127         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1128         if (ret == -1) {
1129                 return -1;
1130         }
1131
1132         /* now traverse adding any indexes for normal LDB records */
1133         ret = tdb_traverse(ltdb->tdb, re_index, module);
1134         if (ret == -1) {
1135                 return -1;
1136         }
1137
1138         return 0;
1139 }