r3783: - don't use make proto for ldb anymore
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39 #include "ldb/include/ldb_parse.h"
40
41 struct dn_list {
42         unsigned int count;
43         char **dn;
44 };
45
46 /*
47   free a struct dn_list
48 */
49 static void dn_list_free(struct ldb_context *ldb, struct dn_list *list)
50 {
51         unsigned int i;
52         for (i=0;i<list->count;i++) {
53                 ldb_free(ldb, list->dn[i]);
54         }
55         ldb_free(ldb, list->dn);
56 }
57
58 /*
59   return the dn key to be used for an index
60   caller frees
61 */
62 static char *ldb_dn_key(struct ldb_context *ldb,
63                         const char *attr, const struct ldb_val *value)
64 {
65         char *ret = NULL;
66
67         if (ldb_should_b64_encode(value)) {
68                 char *vstr = ldb_base64_encode(ldb, value->data, value->length);
69                 if (!vstr) return NULL;
70                 ldb_asprintf(ldb, &ret, "%s:%s::%s", LTDB_INDEX, attr, vstr);
71                 ldb_free(ldb, vstr);
72                 return ret;
73         }
74
75         ldb_asprintf(ldb, &ret, "%s:%s:%.*s", LTDB_INDEX, attr, value->length, (char *)value->data);
76         return ret;
77 }
78
79 /*
80   see if a attribute value is in the list of indexed attributes
81 */
82 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
83                             int *v_idx, const char *key)
84 {
85         unsigned int i, j;
86         for (i=0;i<msg->num_elements;i++) {
87                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
88                         const struct ldb_message_element *el = 
89                                 &msg->elements[i];
90                         for (j=0;j<el->num_values;j++) {
91                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
92                                         if (v_idx) {
93                                                 *v_idx = j;
94                                         }
95                                         return i;
96                                 }
97                         }
98                 }
99         }
100         return -1;
101 }
102
103 /* used in sorting dn lists */
104 static int list_cmp(const char **s1, const char **s2)
105 {
106         return strcmp(*s1, *s2);
107 }
108
109 /*
110   return a list of dn's that might match a simple indexed search or
111  */
112 static int ltdb_index_dn_simple(struct ldb_module *module, 
113                                 struct ldb_parse_tree *tree,
114                                 const struct ldb_message *index_list,
115                                 struct dn_list *list)
116 {
117         struct ldb_context *ldb = module->ldb;
118         char *dn = NULL;
119         int ret;
120         unsigned int i, j;
121         struct ldb_message msg;
122
123         list->count = 0;
124         list->dn = NULL;
125
126         /*
127           if the value is a wildcard then we can't do a match via indexing
128         */
129         if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
130                 return -1;
131         }
132
133         /* if the attribute isn't in the list of indexed attributes then
134            this node needs a full search */
135         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
136                 return -1;
137         }
138
139         /* the attribute is indexed. Pull the list of DNs that match the 
140            search criterion */
141         dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
142         if (!dn) return -1;
143
144         ret = ltdb_search_dn1(module, dn, &msg);
145         ldb_free(ldb, dn);
146         if (ret == 0 || ret == -1) {
147                 return ret;
148         }
149
150         for (i=0;i<msg.num_elements;i++) {
151                 struct ldb_message_element *el;
152
153                 if (strcmp(msg.elements[i].name, LTDB_IDX) != 0) {
154                         continue;
155                 }
156
157                 el = &msg.elements[i];
158
159                 list->dn = ldb_malloc_array_p(ldb, char *, el->num_values);
160                 if (!list->dn) {
161                         break;          
162                 }
163
164                 for (j=0;j<el->num_values;j++) {
165                         list->dn[list->count] = 
166                                 ldb_strdup(ldb, (char *)el->values[j].data);
167                         if (!list->dn[list->count]) {
168                                 dn_list_free(ldb, list);
169                                 ltdb_search_dn1_free(module, &msg);
170                                 return -1;
171                         }
172                         list->count++;
173                 }
174         }
175
176         ltdb_search_dn1_free(module, &msg);
177
178         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
179
180         return 1;
181 }
182
183
184 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
185
186 /*
187   return a list of dn's that might match a simple indexed search on
188   the special objectclass attribute
189  */
190 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
191                                      struct ldb_parse_tree *tree,
192                                      const struct ldb_message *index_list,
193                                      struct dn_list *list)
194 {
195         struct ldb_context *ldb = module->ldb;
196         struct ltdb_private *ltdb = module->private_data;
197         unsigned int i;
198         int ret;
199         const char *target = tree->u.simple.value.data;
200
201         list->count = 0;
202         list->dn = NULL;
203
204         ret = ltdb_index_dn_simple(module, tree, index_list, list);
205
206         for (i=0;i<ltdb->cache.subclasses.num_elements;i++) {
207                 struct ldb_message_element *el = &ltdb->cache.subclasses.elements[i];
208                 if (ldb_attr_cmp(el->name, target) == 0) {
209                         unsigned int j;
210                         for (j=0;j<el->num_values;j++) {
211                                 struct ldb_parse_tree tree2;
212                                 struct dn_list list2;
213                                 tree2.operation = LDB_OP_SIMPLE;
214                                 tree2.u.simple.attr = ldb_strdup(ldb, LTDB_OBJECTCLASS);
215                                 if (!tree2.u.simple.attr) {
216                                         return -1;
217                                 }
218                                 tree2.u.simple.value = el->values[j];
219                                 if (ltdb_index_dn_objectclass(module, &tree2, 
220                                                               index_list, &list2) == 1) {
221                                         if (list->count == 0) {
222                                                 *list = list2;
223                                                 ret = 1;
224                                         } else {
225                                                 list_union(ldb, list, &list2);
226                                                 dn_list_free(ldb, &list2);
227                                         }
228                                 }
229                                 ldb_free(ldb, tree2.u.simple.attr);
230                         }
231                 }
232         }
233
234         return ret;
235 }
236
237 /*
238   return a list of dn's that might match a leaf indexed search
239  */
240 static int ltdb_index_dn_leaf(struct ldb_module *module, 
241                               struct ldb_parse_tree *tree,
242                               const struct ldb_message *index_list,
243                               struct dn_list *list)
244 {
245         if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
246                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
247         }
248         return ltdb_index_dn_simple(module, tree, index_list, list);
249 }
250
251
252 /*
253   list intersection
254   list = list & list2
255   relies on the lists being sorted
256 */
257 static int list_intersect(struct ldb_context *ldb,
258                           struct dn_list *list, const struct dn_list *list2)
259 {
260         struct dn_list list3;
261         unsigned int i;
262
263         if (list->count == 0 || list2->count == 0) {
264                 /* 0 & X == 0 */
265                 dn_list_free(ldb, list);
266                 return 0;
267         }
268
269         list3.dn = ldb_malloc_array_p(ldb, char *, list->count);
270         if (!list3.dn) {
271                 dn_list_free(ldb, list);
272                 return -1;
273         }
274         list3.count = 0;
275
276         for (i=0;i<list->count;i++) {
277                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
278                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
279                         list3.dn[list3.count] = list->dn[i];
280                         list3.count++;
281                 } else {
282                         ldb_free(ldb, list->dn[i]);
283                 }               
284         }
285
286         ldb_free(ldb, list->dn);
287         list->dn = list3.dn;
288         list->count = list3.count;
289
290         return 0;
291 }
292
293
294 /*
295   list union
296   list = list | list2
297   relies on the lists being sorted
298 */
299 static int list_union(struct ldb_context *ldb, 
300                       struct dn_list *list, const struct dn_list *list2)
301 {
302         unsigned int i;
303         char **d;
304         unsigned int count = list->count;
305
306         if (list->count == 0 && list2->count == 0) {
307                 /* 0 | 0 == 0 */
308                 dn_list_free(ldb, list);
309                 return 0;
310         }
311
312         d = ldb_realloc_p(ldb, list->dn, char *, list->count + list2->count);
313         if (!d) {
314                 dn_list_free(ldb, list);
315                 return -1;
316         }
317         list->dn = d;
318
319         for (i=0;i<list2->count;i++) {
320                 if (ldb_list_find(list2->dn[i], list->dn, count, 
321                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
322                         list->dn[list->count] = ldb_strdup(ldb, list2->dn[i]);
323                         if (!list->dn[list->count]) {
324                                 dn_list_free(ldb, list);
325                                 return -1;
326                         }
327                         list->count++;
328                 }               
329         }
330
331         if (list->count != count) {
332                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
333         }
334
335         return 0;
336 }
337
338 static int ltdb_index_dn(struct ldb_module *module, 
339                          struct ldb_parse_tree *tree,
340                          const struct ldb_message *index_list,
341                          struct dn_list *list);
342
343
344 /*
345   OR two index results
346  */
347 static int ltdb_index_dn_or(struct ldb_module *module, 
348                             struct ldb_parse_tree *tree,
349                             const struct ldb_message *index_list,
350                             struct dn_list *list)
351 {
352         struct ldb_context *ldb = module->ldb;
353         unsigned int i;
354         int ret;
355         
356         ret = -1;
357         list->dn = NULL;
358         list->count = 0;
359
360         for (i=0;i<tree->u.list.num_elements;i++) {
361                 struct dn_list list2;
362                 int v;
363                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, &list2);
364
365                 if (v == 0) {
366                         /* 0 || X == X */
367                         if (ret == -1) {
368                                 ret = 0;
369                         }
370                         continue;
371                 }
372
373                 if (v == -1) {
374                         /* 1 || X == 1 */
375                         dn_list_free(ldb, list);
376                         return -1;
377                 }
378
379                 if (ret == -1) {
380                         ret = 1;
381                         *list = list2;
382                 } else {
383                         if (list_union(ldb, list, &list2) == -1) {
384                                 dn_list_free(ldb, &list2);
385                                 return -1;
386                         }
387                         dn_list_free(ldb, &list2);
388                 }
389         }
390
391         if (list->count == 0) {
392                 dn_list_free(ldb, list);
393                 return 0;
394         }
395
396         return ret;
397 }
398
399
400 /*
401   NOT an index results
402  */
403 static int ltdb_index_dn_not(struct ldb_module *module, 
404                              struct ldb_parse_tree *tree,
405                              const struct ldb_message *index_list,
406                              struct dn_list *list)
407 {
408         /* the only way to do an indexed not would be if we could
409            negate the not via another not or if we knew the total
410            number of database elements so we could know that the
411            existing expression covered the whole database. 
412            
413            instead, we just give up, and rely on a full index scan
414            (unless an outer & manages to reduce the list)
415         */
416         return -1;
417 }
418
419 /*
420   AND two index results
421  */
422 static int ltdb_index_dn_and(struct ldb_module *module, 
423                              struct ldb_parse_tree *tree,
424                              const struct ldb_message *index_list,
425                              struct dn_list *list)
426 {
427         struct ldb_context *ldb = module->ldb;
428         unsigned int i;
429         int ret;
430         
431         ret = -1;
432         list->dn = NULL;
433         list->count = 0;
434
435         for (i=0;i<tree->u.list.num_elements;i++) {
436                 struct dn_list list2;
437                 int v;
438                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, &list2);
439
440                 if (v == 0) {
441                         /* 0 && X == 0 */
442                         dn_list_free(ldb, list);
443                         return 0;
444                 }
445
446                 if (v == -1) {
447                         continue;
448                 }
449
450                 if (ret == -1) {
451                         ret = 1;
452                         *list = list2;
453                 } else {
454                         if (list_intersect(ldb, list, &list2) == -1) {
455                                 dn_list_free(ldb, &list2);
456                                 return -1;
457                         }
458                         dn_list_free(ldb, &list2);
459                 }
460
461                 if (list->count == 0) {
462                         if (list->dn) ldb_free(ldb, list->dn);
463                         return 0;
464                 }
465         }
466
467         return ret;
468 }
469
470 /*
471   return a list of dn's that might match a indexed search or
472   -1 if an error. return 0 for no matches, or 1 for matches
473  */
474 static int ltdb_index_dn(struct ldb_module *module, 
475                          struct ldb_parse_tree *tree,
476                          const struct ldb_message *index_list,
477                          struct dn_list *list)
478 {
479         int ret = -1;
480
481         switch (tree->operation) {
482         case LDB_OP_SIMPLE:
483                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
484                 break;
485
486         case LDB_OP_AND:
487                 ret = ltdb_index_dn_and(module, tree, index_list, list);
488                 break;
489
490         case LDB_OP_OR:
491                 ret = ltdb_index_dn_or(module, tree, index_list, list);
492                 break;
493
494         case LDB_OP_NOT:
495                 ret = ltdb_index_dn_not(module, tree, index_list, list);
496                 break;
497         }
498
499         return ret;
500 }
501
502 /*
503   filter a candidate dn_list from an indexed search into a set of results
504   extracting just the given attributes
505 */
506 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
507                             const char *base,
508                             enum ldb_scope scope,
509                             const struct dn_list *dn_list, 
510                             const char * const attrs[], struct ldb_message ***res)
511 {
512         unsigned int count = 0, i;
513
514         for (i=0;i<dn_list->count;i++) {
515                 struct ldb_message msg;
516                 int ret;
517                 ret = ltdb_search_dn1(module, dn_list->dn[i], &msg);
518                 if (ret == 0) {
519                         /* the record has disappeared? yes, this can happen */
520                         continue;
521                 }
522
523                 if (ret == -1) {
524                         /* an internal error */
525                         return -1;
526                 }
527
528                 if (ltdb_message_match(module, &msg, tree, base, scope) == 1) {
529                         ret = ltdb_add_attr_results(module, &msg, attrs, &count, res);
530                 }
531                 ltdb_search_dn1_free(module, &msg);
532                 if (ret != 0) {
533                         return -1;
534                 }
535         }
536
537         return count;
538 }
539
540 /*
541   search the database with a LDAP-like expression using indexes
542   returns -1 if an indexed search is not possible, in which
543   case the caller should call ltdb_search_full() 
544 */
545 int ltdb_search_indexed(struct ldb_module *module, 
546                         const char *base,
547                         enum ldb_scope scope,
548                         struct ldb_parse_tree *tree,
549                         const char * const attrs[], struct ldb_message ***res)
550 {
551         struct ldb_context *ldb = module->ldb;
552         struct ltdb_private *ltdb = module->private_data;
553         struct dn_list dn_list;
554         int ret;
555
556         if (ltdb->cache.indexlist.num_elements == 0) {
557                 /* no index list? must do full search */
558                 return -1;
559         }
560
561         ret = ltdb_index_dn(module, tree, &ltdb->cache.indexlist, &dn_list);
562
563         if (ret == 1) {
564                 /* we've got a candidate list - now filter by the full tree
565                    and extract the needed attributes */
566                 ret = ldb_index_filter(module, tree, base, scope, &dn_list, 
567                                        attrs, res);
568                 dn_list_free(ldb, &dn_list);
569         }
570
571         return ret;
572 }
573
574 /*
575   add a index element where this is the first indexed DN for this value
576 */
577 static int ltdb_index_add1_new(struct ldb_context *ldb, 
578                                struct ldb_message *msg,
579                                struct ldb_message_element *el,
580                                char *dn)
581 {
582         struct ldb_message_element *el2;
583
584         /* add another entry */
585         el2 = ldb_realloc_p(ldb, msg->elements, 
586                             struct ldb_message_element, msg->num_elements+1);
587         if (!el2) {
588                 return -1;
589         }
590
591         msg->elements = el2;
592         msg->elements[msg->num_elements].name = ldb_strdup(ldb, LTDB_IDX);
593         if (!msg->elements[msg->num_elements].name) {
594                 return -1;
595         }
596         msg->elements[msg->num_elements].num_values = 0;
597         msg->elements[msg->num_elements].values = ldb_malloc_p(ldb, struct ldb_val);
598         if (!msg->elements[msg->num_elements].values) {
599                 return -1;
600         }
601         msg->elements[msg->num_elements].values[0].length = strlen(dn);
602         msg->elements[msg->num_elements].values[0].data = dn;
603         msg->elements[msg->num_elements].num_values = 1;
604         msg->num_elements++;
605
606         return 0;
607 }
608
609
610 /*
611   add a index element where this is not the first indexed DN for this
612   value
613 */
614 static int ltdb_index_add1_add(struct ldb_context *ldb, 
615                                struct ldb_message *msg,
616                                struct ldb_message_element *el,
617                                int idx,
618                                char *dn)
619 {
620         struct ldb_val *v2;
621         unsigned int i;
622
623         /* for multi-valued attributes we can end up with repeats */
624         for (i=0;i<msg->elements[idx].num_values;i++) {
625                 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
626                         return 0;
627                 }
628         }
629
630         v2 = ldb_realloc_p(ldb, msg->elements[idx].values,
631                            struct ldb_val, 
632                            msg->elements[idx].num_values+1);
633         if (!v2) {
634                 return -1;
635         }
636         msg->elements[idx].values = v2;
637
638         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
639         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
640         msg->elements[idx].num_values++;
641
642         return 0;
643 }
644
645 /*
646   add an index entry for one message element
647 */
648 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
649                            struct ldb_message_element *el, int v_idx)
650 {
651         struct ldb_context *ldb = module->ldb;
652         struct ldb_message msg;
653         char *dn_key;
654         int ret, added=0, added_dn=0;
655         unsigned int i;
656
657         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
658         if (!dn_key) {
659                 return -1;
660         }
661
662         ret = ltdb_search_dn1(module, dn_key, &msg);
663         if (ret == -1) {
664                 ldb_free(ldb, dn_key);
665                 return -1;
666         }
667
668         if (ret == 0) {
669                 added_dn = 1;
670                 msg.dn = ldb_strdup(ldb, dn_key);
671                 if (!msg.dn) {
672                         ldb_free(ldb, dn_key);
673                         errno = ENOMEM;
674                         return -1;
675                 }
676                 msg.num_elements = 0;
677                 msg.elements = NULL;
678                 msg.private_data = NULL;
679         }
680
681         ldb_free(ldb, dn_key);
682
683         for (i=0;i<msg.num_elements;i++) {
684                 if (strcmp(LTDB_IDX, msg.elements[i].name) == 0) {
685                         break;
686                 }
687         }
688
689         if (i == msg.num_elements) {
690                 added = 1;
691                 ret = ltdb_index_add1_new(ldb, &msg, el, dn);
692         } else {
693                 ret = ltdb_index_add1_add(ldb, &msg, el, i, dn);
694         }
695
696         if (ret == 0) {
697                 ret = ltdb_store(module, &msg, TDB_REPLACE);
698         }
699
700         if (added) {
701                 ldb_free(ldb, msg.elements[i].name);
702         }
703         if (added_dn) {
704                 ldb_free(ldb, msg.dn);
705         }
706
707         ltdb_search_dn1_free(module, &msg);
708
709         return ret;
710 }
711
712 /*
713   add the index entries for a new record
714   return -1 on failure
715 */
716 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
717 {
718         struct ltdb_private *ltdb = module->private_data;
719         int ret;
720         unsigned int i, j;
721
722         if (ltdb->cache.indexlist.num_elements == 0) {
723                 /* no indexed fields */
724                 return 0;
725         }
726
727         for (i=0;i<msg->num_elements;i++) {
728                 ret = ldb_msg_find_idx(&ltdb->cache.indexlist, msg->elements[i].name, 
729                                        NULL, LTDB_IDXATTR);
730                 if (ret == -1) {
731                         continue;
732                 }
733                 for (j=0;j<msg->elements[i].num_values;j++) {
734                         ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
735                         if (ret == -1) {
736                                 return -1;
737                         }
738                 }
739         }
740
741         return 0;
742 }
743
744
745 /*
746   delete an index entry for one message element
747 */
748 static int ltdb_index_del1(struct ldb_module *module, const char *dn, 
749                            struct ldb_message_element *el, int v_idx)
750 {
751         struct ldb_context *ldb = module->ldb;
752         struct ldb_message msg;
753         char *dn_key;
754         int ret, i;
755         unsigned int j;
756
757         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
758         if (!dn_key) {
759                 return -1;
760         }
761
762         ret = ltdb_search_dn1(module, dn_key, &msg);
763         if (ret == -1) {
764                 ldb_free(ldb, dn_key);
765                 return -1;
766         }
767
768         if (ret == 0) {
769                 /* it wasn't indexed. Did we have an earlier error? If we did then
770                    its gone now */
771                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn_key %s was not indexed\n", dn_key);
772                 ldb_free(ldb, dn_key);
773                 return 0;
774         }
775
776         i = ldb_msg_find_idx(&msg, dn, &j, LTDB_IDX);
777         if (i == -1) {
778                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
779                 /* it ain't there. hmmm */
780                 ltdb_search_dn1_free(module, &msg);
781                 ldb_free(ldb, dn_key);
782                 return 0;
783         }
784
785         if (j != msg.elements[i].num_values - 1) {
786                 memmove(&msg.elements[i].values[j], 
787                         &msg.elements[i].values[j+1], 
788                         (msg.elements[i].num_values-(j+1)) * 
789                         sizeof(msg.elements[i].values[0]));
790         }
791         msg.elements[i].num_values--;
792
793         if (msg.elements[i].num_values == 0) {
794                 ret = ltdb_delete_noindex(module, dn_key);
795         } else {
796                 ret = ltdb_store(module, &msg, TDB_REPLACE);
797         }
798
799         ltdb_search_dn1_free(module, &msg);
800         ldb_free(ldb, dn_key);
801
802         return ret;
803 }
804
805 /*
806   delete the index entries for a record
807   return -1 on failure
808 */
809 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
810 {
811         struct ltdb_private *ltdb = module->private_data;
812         int ret;
813         unsigned int i, j;
814
815         /* find the list of indexed fields */   
816         if (ltdb->cache.indexlist.num_elements == 0) {
817                 /* no indexed fields */
818                 return 0;
819         }
820
821         for (i=0;i<msg->num_elements;i++) {
822                 ret = ldb_msg_find_idx(&ltdb->cache.indexlist, msg->elements[i].name, 
823                                        NULL, LTDB_IDXATTR);
824                 if (ret == -1) {
825                         continue;
826                 }
827                 for (j=0;j<msg->elements[i].num_values;j++) {
828                         ret = ltdb_index_del1(module, msg->dn, &msg->elements[i], j);
829                         if (ret == -1) {
830                                 return -1;
831                         }
832                 }
833         }
834
835         return 0;
836 }
837
838
839 /*
840   traversal function that deletes all @INDEX records
841 */
842 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
843 {
844         const char *dn = "DN=" LTDB_INDEX ":";
845         if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
846                 return tdb_delete(tdb, key);
847         }
848         return 0;
849 }
850
851 /*
852   traversal function that adds @INDEX records during a re index
853 */
854 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
855 {
856         struct ldb_module *module = state;
857         struct ldb_message msg;
858         int ret;
859
860         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
861             strncmp(key.dptr, "DN=", 3) != 0) {
862                 return 0;
863         }
864
865         ret = ltdb_unpack_data(module, &data, &msg);
866         if (ret != 0) {
867                 return -1;
868         }
869
870         if (!msg.dn) {
871                 msg.dn = key.dptr+3;
872         }
873
874         ret = ltdb_index_add(module, &msg);
875
876         ltdb_unpack_data_free(module, &msg);
877
878         return ret;
879 }
880
881 /*
882   force a complete reindex of the database
883 */
884 int ltdb_reindex(struct ldb_module *module)
885 {
886         struct ltdb_private *ltdb = module->private_data;
887         int ret;
888
889         ltdb_cache_free(module);
890
891         if (ltdb_cache_load(module) != 0) {
892                 return -1;
893         }
894
895         /* first traverse the database deleting any @INDEX records */
896         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
897         if (ret == -1) {
898                 errno = EIO;
899                 return -1;
900         }
901
902         /* now traverse adding any indexes for normal LDB records */
903         ret = tdb_traverse(ltdb->tdb, re_index, module);
904         if (ret == -1) {
905                 errno = EIO;
906                 return -1;
907         }
908
909         return 0;
910 }