r8520: fixed a pile of warnings from the build farm gcc -Wall output on
[mat/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static char *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         char *ret = NULL;
104         struct ldb_val v;
105         const struct ldb_attrib_handler *h;
106         char *attr_folded;
107
108         attr_folded = ldb_casefold(ldb, attr);
109         if (!attr_folded) {
110                 return NULL;
111         }
112
113         h = ldb_attrib_handler(ldb, attr);
114         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
115                 /* canonicalisation can be refused. For example, 
116                    a attribute that takes wildcards will refuse to canonicalise
117                    if the value contains a wildcard */
118                 talloc_free(attr_folded);
119                 return NULL;
120         }
121         if (ldb_should_b64_encode(&v)) {
122                 char *vstr = ldb_base64_encode(ldb, v.data, v.length);
123                 if (!vstr) return NULL;
124                 ret = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
125                 talloc_free(vstr);
126                 if (v.data != value->data) {
127                         talloc_free(v.data);
128                 }
129                 talloc_free(attr_folded);
130                 return ret;
131         }
132
133         ret = talloc_asprintf(ldb, "%s:%s:%.*s", 
134                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
135
136         if (v.data != value->data) {
137                 talloc_free(v.data);
138         }
139         talloc_free(attr_folded);
140
141         return ret;
142 }
143
144 /*
145   see if a attribute value is in the list of indexed attributes
146 */
147 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
148                             unsigned int *v_idx, const char *key)
149 {
150         unsigned int i, j;
151         for (i=0;i<msg->num_elements;i++) {
152                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
153                         const struct ldb_message_element *el = 
154                                 &msg->elements[i];
155                         for (j=0;j<el->num_values;j++) {
156                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
157                                         if (v_idx) {
158                                                 *v_idx = j;
159                                         }
160                                         return i;
161                                 }
162                         }
163                 }
164         }
165         return -1;
166 }
167
168 /* used in sorting dn lists */
169 static int list_cmp(const char **s1, const char **s2)
170 {
171         return strcmp(*s1, *s2);
172 }
173
174 /*
175   return a list of dn's that might match a simple indexed search or
176  */
177 static int ltdb_index_dn_simple(struct ldb_module *module, 
178                                 struct ldb_parse_tree *tree,
179                                 const struct ldb_message *index_list,
180                                 struct dn_list *list)
181 {
182         struct ldb_context *ldb = module->ldb;
183         char *dn = NULL;
184         int ret;
185         unsigned int i, j;
186         struct ldb_message *msg;
187
188         list->count = 0;
189         list->dn = NULL;
190
191         /* if the attribute isn't in the list of indexed attributes then
192            this node needs a full search */
193         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
194                 return -1;
195         }
196
197         /* the attribute is indexed. Pull the list of DNs that match the 
198            search criterion */
199         dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
200         if (!dn) return -1;
201
202         msg = talloc(list, struct ldb_message);
203         if (msg == NULL) {
204                 return -1;
205         }
206
207         ret = ltdb_search_dn1(module, dn, msg);
208         talloc_free(dn);
209         if (ret == 0 || ret == -1) {
210                 return ret;
211         }
212
213         for (i=0;i<msg->num_elements;i++) {
214                 struct ldb_message_element *el;
215
216                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
217                         continue;
218                 }
219
220                 el = &msg->elements[i];
221
222                 list->dn = talloc_array(list, char *, el->num_values);
223                 if (!list->dn) {
224                         break;          
225                 }
226
227                 for (j=0;j<el->num_values;j++) {
228                         list->dn[list->count] = 
229                                 talloc_strdup(list->dn, (char *)el->values[j].data);
230                         if (!list->dn[list->count]) {
231                                 return -1;
232                         }
233                         list->count++;
234                 }
235         }
236
237         talloc_free(msg);
238
239         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
240
241         return 1;
242 }
243
244
245 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
246
247 /*
248   return a list of dn's that might match a simple indexed search on
249   the special objectclass attribute
250  */
251 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
252                                      struct ldb_parse_tree *tree,
253                                      const struct ldb_message *index_list,
254                                      struct dn_list *list)
255 {
256         struct ldb_context *ldb = module->ldb;
257         unsigned int i;
258         int ret;
259         const char *target = tree->u.simple.value.data;
260         const char **subclasses;
261
262         list->count = 0;
263         list->dn = NULL;
264
265         ret = ltdb_index_dn_simple(module, tree, index_list, list);
266
267         subclasses = ldb_subclass_list(module->ldb, target);
268
269         if (subclasses == NULL) {
270                 return ret;
271         }
272
273         for (i=0;subclasses[i];i++) {
274                 struct ldb_parse_tree tree2;
275                 struct dn_list *list2;
276                 tree2.operation = LDB_OP_SIMPLE;
277                 tree2.u.simple.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
278                 if (!tree2.u.simple.attr) {
279                         return -1;
280                 }
281                 tree2.u.simple.value.data = talloc_strdup(tree2.u.simple.attr, subclasses[i]);
282                 if (tree2.u.simple.value.data == NULL) {
283                         return -1;                      
284                 }
285                 tree2.u.simple.value.length = strlen(subclasses[i]);
286                 list2 = talloc(list, struct dn_list);
287                 if (list2 == NULL) {
288                         return -1;
289                 }
290                 if (ltdb_index_dn_objectclass(module, &tree2, 
291                                               index_list, list2) == 1) {
292                         if (list->count == 0) {
293                                 *list = *list2;
294                                 ret = 1;
295                         } else {
296                                 list_union(ldb, list, list2);
297                                 talloc_free(list2);
298                         }
299                 }
300                 talloc_free(tree2.u.simple.attr);
301         }
302
303         return ret;
304 }
305
306 /*
307   return a list of dn's that might match a leaf indexed search
308  */
309 static int ltdb_index_dn_leaf(struct ldb_module *module, 
310                               struct ldb_parse_tree *tree,
311                               const struct ldb_message *index_list,
312                               struct dn_list *list)
313 {
314         if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
315                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
316         }
317         return ltdb_index_dn_simple(module, tree, index_list, list);
318 }
319
320
321 /*
322   list intersection
323   list = list & list2
324   relies on the lists being sorted
325 */
326 static int list_intersect(struct ldb_context *ldb,
327                           struct dn_list *list, const struct dn_list *list2)
328 {
329         struct dn_list *list3;
330         unsigned int i;
331
332         if (list->count == 0 || list2->count == 0) {
333                 /* 0 & X == 0 */
334                 return 0;
335         }
336
337         list3 = talloc(ldb, struct dn_list);
338         if (list3 == NULL) {
339                 return -1;
340         }
341
342         list3->dn = talloc_array(list3, char *, list->count);
343         if (!list3->dn) {
344                 talloc_free(list3);
345                 return -1;
346         }
347         list3->count = 0;
348
349         for (i=0;i<list->count;i++) {
350                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
351                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
352                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
353                         list3->count++;
354                 } else {
355                         talloc_free(list->dn[i]);
356                 }               
357         }
358
359         talloc_free(list->dn);
360         list->dn = talloc_steal(list, list3->dn);
361         list->count = list3->count;
362         talloc_free(list3);
363
364         return 0;
365 }
366
367
368 /*
369   list union
370   list = list | list2
371   relies on the lists being sorted
372 */
373 static int list_union(struct ldb_context *ldb, 
374                       struct dn_list *list, const struct dn_list *list2)
375 {
376         unsigned int i;
377         char **d;
378         unsigned int count = list->count;
379
380         if (list->count == 0 && list2->count == 0) {
381                 /* 0 | 0 == 0 */
382                 return 0;
383         }
384
385         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
386         if (!d) {
387                 return -1;
388         }
389         list->dn = d;
390
391         for (i=0;i<list2->count;i++) {
392                 if (ldb_list_find(list2->dn[i], list->dn, count, 
393                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
394                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
395                         if (!list->dn[list->count]) {
396                                 return -1;
397                         }
398                         list->count++;
399                 }               
400         }
401
402         if (list->count != count) {
403                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
404         }
405
406         return 0;
407 }
408
409 static int ltdb_index_dn(struct ldb_module *module, 
410                          struct ldb_parse_tree *tree,
411                          const struct ldb_message *index_list,
412                          struct dn_list *list);
413
414
415 /*
416   OR two index results
417  */
418 static int ltdb_index_dn_or(struct ldb_module *module, 
419                             struct ldb_parse_tree *tree,
420                             const struct ldb_message *index_list,
421                             struct dn_list *list)
422 {
423         struct ldb_context *ldb = module->ldb;
424         unsigned int i;
425         int ret;
426         
427         ret = -1;
428         list->dn = NULL;
429         list->count = 0;
430
431         for (i=0;i<tree->u.list.num_elements;i++) {
432                 struct dn_list *list2;
433                 int v;
434
435                 list2 = talloc(module, struct dn_list);
436                 if (list2 == NULL) {
437                         return -1;
438                 }
439
440                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
441
442                 if (v == 0) {
443                         /* 0 || X == X */
444                         if (ret == -1) {
445                                 ret = 0;
446                         }
447                         talloc_free(list2);
448                         continue;
449                 }
450
451                 if (v == -1) {
452                         /* 1 || X == 1 */
453                         talloc_free(list->dn);
454                         talloc_free(list2);
455                         return -1;
456                 }
457
458                 if (ret == -1) {
459                         ret = 1;
460                         list->dn = talloc_steal(list, list2->dn);
461                         list->count = list2->count;
462                 } else {
463                         if (list_union(ldb, list, list2) == -1) {
464                                 talloc_free(list2);
465                                 return -1;
466                         }
467                         ret = 1;
468                 }
469                 talloc_free(list2);
470         }
471
472         if (list->count == 0) {
473                 return 0;
474         }
475
476         return ret;
477 }
478
479
480 /*
481   NOT an index results
482  */
483 static int ltdb_index_dn_not(struct ldb_module *module, 
484                              struct ldb_parse_tree *tree,
485                              const struct ldb_message *index_list,
486                              struct dn_list *list)
487 {
488         /* the only way to do an indexed not would be if we could
489            negate the not via another not or if we knew the total
490            number of database elements so we could know that the
491            existing expression covered the whole database. 
492            
493            instead, we just give up, and rely on a full index scan
494            (unless an outer & manages to reduce the list)
495         */
496         return -1;
497 }
498
499 /*
500   AND two index results
501  */
502 static int ltdb_index_dn_and(struct ldb_module *module, 
503                              struct ldb_parse_tree *tree,
504                              const struct ldb_message *index_list,
505                              struct dn_list *list)
506 {
507         struct ldb_context *ldb = module->ldb;
508         unsigned int i;
509         int ret;
510         
511         ret = -1;
512         list->dn = NULL;
513         list->count = 0;
514
515         for (i=0;i<tree->u.list.num_elements;i++) {
516                 struct dn_list *list2;
517                 int v;
518
519                 list2 = talloc(module, struct dn_list);
520                 if (list2 == NULL) {
521                         return -1;
522                 }
523
524                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
525
526                 if (v == 0) {
527                         /* 0 && X == 0 */
528                         talloc_free(list->dn);
529                         talloc_free(list2);
530                         return 0;
531                 }
532
533                 if (v == -1) {
534                         talloc_free(list2);
535                         continue;
536                 }
537
538                 if (ret == -1) {
539                         ret = 1;
540                         talloc_free(list->dn);
541                         list->dn = talloc_steal(list, list2->dn);
542                         list->count = list2->count;
543                 } else {
544                         if (list_intersect(ldb, list, list2) == -1) {
545                                 talloc_free(list2);
546                                 return -1;
547                         }
548                 }
549
550                 talloc_free(list2);
551
552                 if (list->count == 0) {
553                         talloc_free(list->dn);
554                         return 0;
555                 }
556         }
557
558         return ret;
559 }
560
561 /*
562   return a list of dn's that might match a indexed search or
563   -1 if an error. return 0 for no matches, or 1 for matches
564  */
565 static int ltdb_index_dn(struct ldb_module *module, 
566                          struct ldb_parse_tree *tree,
567                          const struct ldb_message *index_list,
568                          struct dn_list *list)
569 {
570         int ret = -1;
571
572         switch (tree->operation) {
573         case LDB_OP_SIMPLE:
574                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
575                 break;
576
577         case LDB_OP_PRESENT:
578         case LDB_OP_SUBSTRING:
579         case LDB_OP_EXTENDED:
580                 /* we can't index with fancy bitops yet */
581                 ret = -1;
582                 break;
583
584         case LDB_OP_AND:
585                 ret = ltdb_index_dn_and(module, tree, index_list, list);
586                 break;
587
588         case LDB_OP_OR:
589                 ret = ltdb_index_dn_or(module, tree, index_list, list);
590                 break;
591
592         case LDB_OP_NOT:
593                 ret = ltdb_index_dn_not(module, tree, index_list, list);
594                 break;
595         }
596
597         return ret;
598 }
599
600 /*
601   filter a candidate dn_list from an indexed search into a set of results
602   extracting just the given attributes
603 */
604 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
605                             const char *base,
606                             enum ldb_scope scope,
607                             const struct dn_list *dn_list, 
608                             const char * const attrs[], struct ldb_message ***res)
609 {
610         unsigned int i;
611         int count = 0;
612
613         for (i=0;i<dn_list->count;i++) {
614                 struct ldb_message *msg;
615                 int ret;
616
617                 msg = talloc(module, struct ldb_message);
618                 if (msg == NULL) {
619                         return -1;
620                 }
621
622                 ret = ltdb_search_dn1(module, dn_list->dn[i], msg);
623                 if (ret == 0) {
624                         /* the record has disappeared? yes, this can happen */
625                         talloc_free(msg);
626                         continue;
627                 }
628
629                 if (ret == -1) {
630                         /* an internal error */
631                         talloc_free(msg);
632                         return -1;
633                 }
634
635                 ret = 0;
636                 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
637                         ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
638                 }
639                 talloc_free(msg);
640                 if (ret != 0) {
641                         return -1;
642                 }
643         }
644
645         return count;
646 }
647
648 /*
649   search the database with a LDAP-like expression using indexes
650   returns -1 if an indexed search is not possible, in which
651   case the caller should call ltdb_search_full() 
652 */
653 int ltdb_search_indexed(struct ldb_module *module, 
654                         const char *base,
655                         enum ldb_scope scope,
656                         struct ldb_parse_tree *tree,
657                         const char * const attrs[], struct ldb_message ***res)
658 {
659         struct ltdb_private *ltdb = module->private_data;
660         struct dn_list *dn_list;
661         int ret;
662
663         if (ltdb->cache->indexlist->num_elements == 0) {
664                 /* no index list? must do full search */
665                 return -1;
666         }
667
668         dn_list = talloc(module, struct dn_list);
669         if (dn_list == NULL) {
670                 return -1;
671         }
672
673         ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
674
675         if (ret == 1) {
676                 /* we've got a candidate list - now filter by the full tree
677                    and extract the needed attributes */
678                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
679                                        attrs, res);
680         }
681
682         talloc_free(dn_list);
683
684         return ret;
685 }
686
687 /*
688   add a index element where this is the first indexed DN for this value
689 */
690 static int ltdb_index_add1_new(struct ldb_context *ldb, 
691                                struct ldb_message *msg,
692                                struct ldb_message_element *el,
693                                char *dn)
694 {
695         struct ldb_message_element *el2;
696
697         /* add another entry */
698         el2 = talloc_realloc(msg, msg->elements, 
699                                struct ldb_message_element, msg->num_elements+1);
700         if (!el2) {
701                 return -1;
702         }
703
704         msg->elements = el2;
705         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
706         if (!msg->elements[msg->num_elements].name) {
707                 return -1;
708         }
709         msg->elements[msg->num_elements].num_values = 0;
710         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
711         if (!msg->elements[msg->num_elements].values) {
712                 return -1;
713         }
714         msg->elements[msg->num_elements].values[0].length = strlen(dn);
715         msg->elements[msg->num_elements].values[0].data = dn;
716         msg->elements[msg->num_elements].num_values = 1;
717         msg->num_elements++;
718
719         return 0;
720 }
721
722
723 /*
724   add a index element where this is not the first indexed DN for this
725   value
726 */
727 static int ltdb_index_add1_add(struct ldb_context *ldb, 
728                                struct ldb_message *msg,
729                                struct ldb_message_element *el,
730                                int idx,
731                                char *dn)
732 {
733         struct ldb_val *v2;
734         unsigned int i;
735
736         /* for multi-valued attributes we can end up with repeats */
737         for (i=0;i<msg->elements[idx].num_values;i++) {
738                 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
739                         return 0;
740                 }
741         }
742
743         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
744                               struct ldb_val, 
745                               msg->elements[idx].num_values+1);
746         if (!v2) {
747                 return -1;
748         }
749         msg->elements[idx].values = v2;
750
751         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
752         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
753         msg->elements[idx].num_values++;
754
755         return 0;
756 }
757
758 /*
759   add an index entry for one message element
760 */
761 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
762                            struct ldb_message_element *el, int v_idx)
763 {
764         struct ldb_context *ldb = module->ldb;
765         struct ldb_message *msg;
766         char *dn_key;
767         int ret;
768         unsigned int i;
769
770         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
771         if (!dn_key) {
772                 return -1;
773         }
774
775         msg = talloc(dn_key, struct ldb_message);
776         if (msg == NULL) {
777                 return -1;
778         }
779
780         ret = ltdb_search_dn1(module, dn_key, msg);
781         if (ret == -1) {
782                 talloc_free(dn_key);
783                 return -1;
784         }
785
786         if (ret == 0) {
787                 msg->dn = talloc_strdup(msg, dn_key);
788                 if (!msg->dn) {
789                         talloc_free(dn_key);
790                         errno = ENOMEM;
791                         return -1;
792                 }
793                 msg->num_elements = 0;
794                 msg->elements = NULL;
795         }
796
797         for (i=0;i<msg->num_elements;i++) {
798                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
799                         break;
800                 }
801         }
802
803         if (i == msg->num_elements) {
804                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
805         } else {
806                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
807         }
808
809         if (ret == 0) {
810                 ret = ltdb_store(module, msg, TDB_REPLACE);
811         }
812
813         talloc_free(dn_key);
814
815         return ret;
816 }
817
818 /*
819   add the index entries for a new record
820   return -1 on failure
821 */
822 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
823 {
824         struct ltdb_private *ltdb = module->private_data;
825         int ret;
826         unsigned int i, j;
827
828         if (msg->dn[0] == '@') {
829                 return 0;
830         }
831
832         if (ltdb->cache->indexlist->num_elements == 0) {
833                 /* no indexed fields */
834                 return 0;
835         }
836
837         for (i=0;i<msg->num_elements;i++) {
838                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
839                                        NULL, LTDB_IDXATTR);
840                 if (ret == -1) {
841                         continue;
842                 }
843                 for (j=0;j<msg->elements[i].num_values;j++) {
844                         ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
845                         if (ret == -1) {
846                                 return -1;
847                         }
848                 }
849         }
850
851         return 0;
852 }
853
854
855 /*
856   delete an index entry for one message element
857 */
858 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
859                          struct ldb_message_element *el, int v_idx)
860 {
861         struct ldb_context *ldb = module->ldb;
862         struct ldb_message *msg;
863         char *dn_key;
864         int ret, i;
865         unsigned int j;
866
867         if (dn[0] == '@') {
868                 return 0;
869         }
870
871         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
872         if (!dn_key) {
873                 return -1;
874         }
875
876         msg = talloc(dn_key, struct ldb_message);
877         if (msg == NULL) {
878                 talloc_free(dn_key);
879                 return -1;
880         }
881
882         ret = ltdb_search_dn1(module, dn_key, msg);
883         if (ret == -1) {
884                 talloc_free(dn_key);
885                 return -1;
886         }
887
888         if (ret == 0) {
889                 /* it wasn't indexed. Did we have an earlier error? If we did then
890                    its gone now */
891                 talloc_free(dn_key);
892                 return 0;
893         }
894
895         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
896         if (i == -1) {
897                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
898                 /* it ain't there. hmmm */
899                 talloc_free(dn_key);
900                 return 0;
901         }
902
903         if (j != msg->elements[i].num_values - 1) {
904                 memmove(&msg->elements[i].values[j], 
905                         &msg->elements[i].values[j+1], 
906                         (msg->elements[i].num_values-(j+1)) * 
907                         sizeof(msg->elements[i].values[0]));
908         }
909         msg->elements[i].num_values--;
910
911         if (msg->elements[i].num_values == 0) {
912                 ret = ltdb_delete_noindex(module, dn_key);
913         } else {
914                 ret = ltdb_store(module, msg, TDB_REPLACE);
915         }
916
917         talloc_free(dn_key);
918
919         return ret;
920 }
921
922 /*
923   delete the index entries for a record
924   return -1 on failure
925 */
926 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
927 {
928         struct ltdb_private *ltdb = module->private_data;
929         int ret;
930         unsigned int i, j;
931
932         if (msg->dn[0] == '@') {
933                 return 0;
934         }
935
936         /* find the list of indexed fields */   
937         if (ltdb->cache->indexlist->num_elements == 0) {
938                 /* no indexed fields */
939                 return 0;
940         }
941
942         for (i=0;i<msg->num_elements;i++) {
943                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
944                                        NULL, LTDB_IDXATTR);
945                 if (ret == -1) {
946                         continue;
947                 }
948                 for (j=0;j<msg->elements[i].num_values;j++) {
949                         ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
950                         if (ret == -1) {
951                                 return -1;
952                         }
953                 }
954         }
955
956         return 0;
957 }
958
959
960 /*
961   traversal function that deletes all @INDEX records
962 */
963 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
964 {
965         const char *dn = "DN=" LTDB_INDEX ":";
966         if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
967                 return tdb_delete(tdb, key);
968         }
969         return 0;
970 }
971
972 /*
973   traversal function that adds @INDEX records during a re index
974 */
975 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
976 {
977         struct ldb_module *module = state;
978         struct ldb_message *msg;
979         int ret;
980         TDB_DATA key2;
981
982         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
983             strncmp(key.dptr, "DN=", 3) != 0) {
984                 return 0;
985         }
986
987         msg = talloc(module, struct ldb_message);
988         if (msg == NULL) {
989                 return -1;
990         }
991
992         ret = ltdb_unpack_data(module, &data, msg);
993         if (ret != 0) {
994                 talloc_free(msg);
995                 return -1;
996         }
997
998         /* check if the DN key has changed, perhaps due to the 
999            case insensitivity of an element changing */
1000         key2 = ltdb_key(module, msg->dn);
1001         if (strcmp(key2.dptr, key.dptr) != 0) {
1002                 tdb_delete(tdb, key);
1003                 tdb_store(tdb, key2, data, 0);
1004         }
1005         talloc_free(key2.dptr);
1006
1007         if (!msg->dn) {
1008                 msg->dn = key.dptr+3;
1009         }
1010
1011         ret = ltdb_index_add(module, msg);
1012
1013         talloc_free(msg);
1014
1015         return ret;
1016 }
1017
1018 /*
1019   force a complete reindex of the database
1020 */
1021 int ltdb_reindex(struct ldb_module *module)
1022 {
1023         struct ltdb_private *ltdb = module->private_data;
1024         int ret;
1025
1026         if (ltdb_cache_reload(module) != 0) {
1027                 return -1;
1028         }
1029
1030         /* first traverse the database deleting any @INDEX records */
1031         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1032         if (ret == -1) {
1033                 errno = EIO;
1034                 return -1;
1035         }
1036
1037         /* now traverse adding any indexes for normal LDB records */
1038         ret = tdb_traverse(ltdb->tdb, re_index, module);
1039         if (ret == -1) {
1040                 errno = EIO;
1041                 return -1;
1042         }
1043
1044         return 0;
1045 }