r7527: - added a ldb_search_bytree() interface, which takes a ldb_parse_tree
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static char *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         char *ret = NULL;
104
105         if (ldb_should_b64_encode(value)) {
106                 char *vstr = ldb_base64_encode(ldb, value->data, value->length);
107                 if (!vstr) return NULL;
108                 ret = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr, vstr);
109                 talloc_free(vstr);
110                 return ret;
111         }
112
113         return talloc_asprintf(ldb, "%s:%s:%.*s", 
114                                LTDB_INDEX, attr, value->length, (char *)value->data);
115 }
116
117 /*
118   see if a attribute value is in the list of indexed attributes
119 */
120 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
121                             unsigned int *v_idx, const char *key)
122 {
123         unsigned int i, j;
124         for (i=0;i<msg->num_elements;i++) {
125                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
126                         const struct ldb_message_element *el = 
127                                 &msg->elements[i];
128                         for (j=0;j<el->num_values;j++) {
129                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
130                                         if (v_idx) {
131                                                 *v_idx = j;
132                                         }
133                                         return i;
134                                 }
135                         }
136                 }
137         }
138         return -1;
139 }
140
141 /* used in sorting dn lists */
142 static int list_cmp(const char **s1, const char **s2)
143 {
144         return strcmp(*s1, *s2);
145 }
146
147 /*
148   return a list of dn's that might match a simple indexed search or
149  */
150 static int ltdb_index_dn_simple(struct ldb_module *module, 
151                                 struct ldb_parse_tree *tree,
152                                 const struct ldb_message *index_list,
153                                 struct dn_list *list)
154 {
155         struct ldb_context *ldb = module->ldb;
156         char *dn = NULL;
157         int ret;
158         unsigned int i, j;
159         struct ldb_message *msg;
160
161         list->count = 0;
162         list->dn = NULL;
163
164         /*
165           if the value is a wildcard then we can't do a match via indexing
166         */
167         if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
168                 return -1;
169         }
170
171         /* if the attribute isn't in the list of indexed attributes then
172            this node needs a full search */
173         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
174                 return -1;
175         }
176
177         /* the attribute is indexed. Pull the list of DNs that match the 
178            search criterion */
179         dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
180         if (!dn) return -1;
181
182         msg = talloc(list, struct ldb_message);
183         if (msg == NULL) {
184                 return -1;
185         }
186
187         ret = ltdb_search_dn1(module, dn, msg);
188         talloc_free(dn);
189         if (ret == 0 || ret == -1) {
190                 return ret;
191         }
192
193         for (i=0;i<msg->num_elements;i++) {
194                 struct ldb_message_element *el;
195
196                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
197                         continue;
198                 }
199
200                 el = &msg->elements[i];
201
202                 list->dn = talloc_array(list, char *, el->num_values);
203                 if (!list->dn) {
204                         break;          
205                 }
206
207                 for (j=0;j<el->num_values;j++) {
208                         list->dn[list->count] = 
209                                 talloc_strdup(list->dn, (char *)el->values[j].data);
210                         if (!list->dn[list->count]) {
211                                 return -1;
212                         }
213                         list->count++;
214                 }
215         }
216
217         talloc_free(msg);
218
219         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
220
221         return 1;
222 }
223
224
225 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
226
227 /*
228   return a list of dn's that might match a simple indexed search on
229   the special objectclass attribute
230  */
231 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
232                                      struct ldb_parse_tree *tree,
233                                      const struct ldb_message *index_list,
234                                      struct dn_list *list)
235 {
236         struct ldb_context *ldb = module->ldb;
237         struct ltdb_private *ltdb = module->private_data;
238         unsigned int i;
239         int ret;
240         const char *target = tree->u.simple.value.data;
241
242         list->count = 0;
243         list->dn = NULL;
244
245         ret = ltdb_index_dn_simple(module, tree, index_list, list);
246
247         for (i=0;i<ltdb->cache->subclasses->num_elements;i++) {
248                 struct ldb_message_element *el = &ltdb->cache->subclasses->elements[i];
249                 if (ldb_attr_cmp(el->name, target) == 0) {
250                         unsigned int j;
251                         for (j=0;j<el->num_values;j++) {
252                                 struct ldb_parse_tree tree2;
253                                 struct dn_list *list2;
254                                 tree2.operation = LDB_OP_SIMPLE;
255                                 tree2.u.simple.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
256                                 if (!tree2.u.simple.attr) {
257                                         return -1;
258                                 }
259                                 tree2.u.simple.value = el->values[j];
260                                 list2 = talloc(list, struct dn_list);
261                                 if (list2 == NULL) {
262                                         return -1;
263                                 }
264                                 if (ltdb_index_dn_objectclass(module, &tree2, 
265                                                               index_list, list2) == 1) {
266                                         if (list->count == 0) {
267                                                 *list = *list2;
268                                                 ret = 1;
269                                         } else {
270                                                 list_union(ldb, list, list2);
271                                                 talloc_free(list2);
272                                         }
273                                 }
274                                 talloc_free(tree2.u.simple.attr);
275                         }
276                 }
277         }
278
279         return ret;
280 }
281
282 /*
283   return a list of dn's that might match a leaf indexed search
284  */
285 static int ltdb_index_dn_leaf(struct ldb_module *module, 
286                               struct ldb_parse_tree *tree,
287                               const struct ldb_message *index_list,
288                               struct dn_list *list)
289 {
290         if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
291                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
292         }
293         return ltdb_index_dn_simple(module, tree, index_list, list);
294 }
295
296
297 /*
298   list intersection
299   list = list & list2
300   relies on the lists being sorted
301 */
302 static int list_intersect(struct ldb_context *ldb,
303                           struct dn_list *list, const struct dn_list *list2)
304 {
305         struct dn_list *list3;
306         unsigned int i;
307
308         if (list->count == 0 || list2->count == 0) {
309                 /* 0 & X == 0 */
310                 return 0;
311         }
312
313         list3 = talloc(ldb, struct dn_list);
314         if (list3 == NULL) {
315                 return -1;
316         }
317
318         list3->dn = talloc_array(list3, char *, list->count);
319         if (!list3->dn) {
320                 talloc_free(list3);
321                 return -1;
322         }
323         list3->count = 0;
324
325         for (i=0;i<list->count;i++) {
326                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
327                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
328                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
329                         list3->count++;
330                 } else {
331                         talloc_free(list->dn[i]);
332                 }               
333         }
334
335         talloc_free(list->dn);
336         list->dn = talloc_steal(list, list3->dn);
337         list->count = list3->count;
338         talloc_free(list3);
339
340         return 0;
341 }
342
343
344 /*
345   list union
346   list = list | list2
347   relies on the lists being sorted
348 */
349 static int list_union(struct ldb_context *ldb, 
350                       struct dn_list *list, const struct dn_list *list2)
351 {
352         unsigned int i;
353         char **d;
354         unsigned int count = list->count;
355
356         if (list->count == 0 && list2->count == 0) {
357                 /* 0 | 0 == 0 */
358                 return 0;
359         }
360
361         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
362         if (!d) {
363                 return -1;
364         }
365         list->dn = d;
366
367         for (i=0;i<list2->count;i++) {
368                 if (ldb_list_find(list2->dn[i], list->dn, count, 
369                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
370                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
371                         if (!list->dn[list->count]) {
372                                 return -1;
373                         }
374                         list->count++;
375                 }               
376         }
377
378         if (list->count != count) {
379                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
380         }
381
382         return 0;
383 }
384
385 static int ltdb_index_dn(struct ldb_module *module, 
386                          struct ldb_parse_tree *tree,
387                          const struct ldb_message *index_list,
388                          struct dn_list *list);
389
390
391 /*
392   OR two index results
393  */
394 static int ltdb_index_dn_or(struct ldb_module *module, 
395                             struct ldb_parse_tree *tree,
396                             const struct ldb_message *index_list,
397                             struct dn_list *list)
398 {
399         struct ldb_context *ldb = module->ldb;
400         unsigned int i;
401         int ret;
402         
403         ret = -1;
404         list->dn = NULL;
405         list->count = 0;
406
407         for (i=0;i<tree->u.list.num_elements;i++) {
408                 struct dn_list *list2;
409                 int v;
410
411                 list2 = talloc(module, struct dn_list);
412                 if (list2 == NULL) {
413                         return -1;
414                 }
415
416                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
417
418                 if (v == 0) {
419                         /* 0 || X == X */
420                         if (ret == -1) {
421                                 ret = 0;
422                         }
423                         talloc_free(list2);
424                         continue;
425                 }
426
427                 if (v == -1) {
428                         /* 1 || X == 1 */
429                         talloc_free(list->dn);
430                         talloc_free(list2);
431                         return -1;
432                 }
433
434                 if (ret == -1) {
435                         ret = 1;
436                         list->dn = talloc_steal(list, list2->dn);
437                         list->count = list2->count;
438                 } else {
439                         if (list_union(ldb, list, list2) == -1) {
440                                 talloc_free(list2);
441                                 return -1;
442                         }
443                         ret = 1;
444                 }
445                 talloc_free(list2);
446         }
447
448         if (list->count == 0) {
449                 return 0;
450         }
451
452         return ret;
453 }
454
455
456 /*
457   NOT an index results
458  */
459 static int ltdb_index_dn_not(struct ldb_module *module, 
460                              struct ldb_parse_tree *tree,
461                              const struct ldb_message *index_list,
462                              struct dn_list *list)
463 {
464         /* the only way to do an indexed not would be if we could
465            negate the not via another not or if we knew the total
466            number of database elements so we could know that the
467            existing expression covered the whole database. 
468            
469            instead, we just give up, and rely on a full index scan
470            (unless an outer & manages to reduce the list)
471         */
472         return -1;
473 }
474
475 /*
476   AND two index results
477  */
478 static int ltdb_index_dn_and(struct ldb_module *module, 
479                              struct ldb_parse_tree *tree,
480                              const struct ldb_message *index_list,
481                              struct dn_list *list)
482 {
483         struct ldb_context *ldb = module->ldb;
484         unsigned int i;
485         int ret;
486         
487         ret = -1;
488         list->dn = NULL;
489         list->count = 0;
490
491         for (i=0;i<tree->u.list.num_elements;i++) {
492                 struct dn_list *list2;
493                 int v;
494
495                 list2 = talloc(module, struct dn_list);
496                 if (list2 == NULL) {
497                         return -1;
498                 }
499
500                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
501
502                 if (v == 0) {
503                         /* 0 && X == 0 */
504                         talloc_free(list->dn);
505                         talloc_free(list2);
506                         return 0;
507                 }
508
509                 if (v == -1) {
510                         talloc_free(list2);
511                         continue;
512                 }
513
514                 if (ret == -1) {
515                         ret = 1;
516                         talloc_free(list->dn);
517                         list->dn = talloc_steal(list, list2->dn);
518                         list->count = list2->count;
519                 } else {
520                         if (list_intersect(ldb, list, list2) == -1) {
521                                 talloc_free(list2);
522                                 return -1;
523                         }
524                 }
525
526                 talloc_free(list2);
527
528                 if (list->count == 0) {
529                         talloc_free(list->dn);
530                         return 0;
531                 }
532         }
533
534         return ret;
535 }
536
537 /*
538   return a list of dn's that might match a indexed search or
539   -1 if an error. return 0 for no matches, or 1 for matches
540  */
541 static int ltdb_index_dn(struct ldb_module *module, 
542                          struct ldb_parse_tree *tree,
543                          const struct ldb_message *index_list,
544                          struct dn_list *list)
545 {
546         int ret = -1;
547
548         switch (tree->operation) {
549         case LDB_OP_SIMPLE:
550                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
551                 break;
552
553         case LDB_OP_AND:
554                 ret = ltdb_index_dn_and(module, tree, index_list, list);
555                 break;
556
557         case LDB_OP_OR:
558                 ret = ltdb_index_dn_or(module, tree, index_list, list);
559                 break;
560
561         case LDB_OP_NOT:
562                 ret = ltdb_index_dn_not(module, tree, index_list, list);
563                 break;
564         }
565
566         return ret;
567 }
568
569 /*
570   filter a candidate dn_list from an indexed search into a set of results
571   extracting just the given attributes
572 */
573 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
574                             const char *base,
575                             enum ldb_scope scope,
576                             const struct dn_list *dn_list, 
577                             const char * const attrs[], struct ldb_message ***res)
578 {
579         unsigned int i;
580         int count = 0;
581
582         for (i=0;i<dn_list->count;i++) {
583                 struct ldb_message *msg;
584                 int ret;
585
586                 msg = talloc(module, struct ldb_message);
587                 if (msg == NULL) {
588                         return -1;
589                 }
590
591                 ret = ltdb_search_dn1(module, dn_list->dn[i], msg);
592                 if (ret == 0) {
593                         /* the record has disappeared? yes, this can happen */
594                         talloc_free(msg);
595                         continue;
596                 }
597
598                 if (ret == -1) {
599                         /* an internal error */
600                         talloc_free(msg);
601                         return -1;
602                 }
603
604                 ret = 0;
605                 if (ltdb_message_match(module, msg, tree, base, scope) == 1) {
606                         ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
607                 }
608                 talloc_free(msg);
609                 if (ret != 0) {
610                         return -1;
611                 }
612         }
613
614         return count;
615 }
616
617 /*
618   search the database with a LDAP-like expression using indexes
619   returns -1 if an indexed search is not possible, in which
620   case the caller should call ltdb_search_full() 
621 */
622 int ltdb_search_indexed(struct ldb_module *module, 
623                         const char *base,
624                         enum ldb_scope scope,
625                         struct ldb_parse_tree *tree,
626                         const char * const attrs[], struct ldb_message ***res)
627 {
628         struct ltdb_private *ltdb = module->private_data;
629         struct dn_list *dn_list;
630         int ret;
631
632         if (ltdb->cache->indexlist->num_elements == 0) {
633                 /* no index list? must do full search */
634                 return -1;
635         }
636
637         dn_list = talloc(module, struct dn_list);
638         if (dn_list == NULL) {
639                 return -1;
640         }
641
642         ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
643
644         if (ret == 1) {
645                 /* we've got a candidate list - now filter by the full tree
646                    and extract the needed attributes */
647                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
648                                        attrs, res);
649         }
650
651         talloc_free(dn_list);
652
653         return ret;
654 }
655
656 /*
657   add a index element where this is the first indexed DN for this value
658 */
659 static int ltdb_index_add1_new(struct ldb_context *ldb, 
660                                struct ldb_message *msg,
661                                struct ldb_message_element *el,
662                                char *dn)
663 {
664         struct ldb_message_element *el2;
665
666         /* add another entry */
667         el2 = talloc_realloc(msg, msg->elements, 
668                                struct ldb_message_element, msg->num_elements+1);
669         if (!el2) {
670                 return -1;
671         }
672
673         msg->elements = el2;
674         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
675         if (!msg->elements[msg->num_elements].name) {
676                 return -1;
677         }
678         msg->elements[msg->num_elements].num_values = 0;
679         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
680         if (!msg->elements[msg->num_elements].values) {
681                 return -1;
682         }
683         msg->elements[msg->num_elements].values[0].length = strlen(dn);
684         msg->elements[msg->num_elements].values[0].data = dn;
685         msg->elements[msg->num_elements].num_values = 1;
686         msg->num_elements++;
687
688         return 0;
689 }
690
691
692 /*
693   add a index element where this is not the first indexed DN for this
694   value
695 */
696 static int ltdb_index_add1_add(struct ldb_context *ldb, 
697                                struct ldb_message *msg,
698                                struct ldb_message_element *el,
699                                int idx,
700                                char *dn)
701 {
702         struct ldb_val *v2;
703         unsigned int i;
704
705         /* for multi-valued attributes we can end up with repeats */
706         for (i=0;i<msg->elements[idx].num_values;i++) {
707                 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
708                         return 0;
709                 }
710         }
711
712         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
713                               struct ldb_val, 
714                               msg->elements[idx].num_values+1);
715         if (!v2) {
716                 return -1;
717         }
718         msg->elements[idx].values = v2;
719
720         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
721         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
722         msg->elements[idx].num_values++;
723
724         return 0;
725 }
726
727 /*
728   add an index entry for one message element
729 */
730 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
731                            struct ldb_message_element *el, int v_idx)
732 {
733         struct ldb_context *ldb = module->ldb;
734         struct ldb_message *msg;
735         char *dn_key;
736         int ret;
737         unsigned int i;
738
739         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
740         if (!dn_key) {
741                 return -1;
742         }
743
744         msg = talloc(dn_key, struct ldb_message);
745         if (msg == NULL) {
746                 return -1;
747         }
748
749         ret = ltdb_search_dn1(module, dn_key, msg);
750         if (ret == -1) {
751                 talloc_free(dn_key);
752                 return -1;
753         }
754
755         if (ret == 0) {
756                 msg->dn = talloc_strdup(msg, dn_key);
757                 if (!msg->dn) {
758                         talloc_free(dn_key);
759                         errno = ENOMEM;
760                         return -1;
761                 }
762                 msg->num_elements = 0;
763                 msg->elements = NULL;
764         }
765
766         for (i=0;i<msg->num_elements;i++) {
767                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
768                         break;
769                 }
770         }
771
772         if (i == msg->num_elements) {
773                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
774         } else {
775                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
776         }
777
778         if (ret == 0) {
779                 ret = ltdb_store(module, msg, TDB_REPLACE);
780         }
781
782         talloc_free(dn_key);
783
784         return ret;
785 }
786
787 /*
788   add the index entries for a new record
789   return -1 on failure
790 */
791 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
792 {
793         struct ltdb_private *ltdb = module->private_data;
794         int ret;
795         unsigned int i, j;
796
797         if (ltdb->cache->indexlist->num_elements == 0) {
798                 /* no indexed fields */
799                 return 0;
800         }
801
802         for (i=0;i<msg->num_elements;i++) {
803                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
804                                        NULL, LTDB_IDXATTR);
805                 if (ret == -1) {
806                         continue;
807                 }
808                 for (j=0;j<msg->elements[i].num_values;j++) {
809                         ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
810                         if (ret == -1) {
811                                 return -1;
812                         }
813                 }
814         }
815
816         return 0;
817 }
818
819
820 /*
821   delete an index entry for one message element
822 */
823 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
824                          struct ldb_message_element *el, int v_idx)
825 {
826         struct ldb_context *ldb = module->ldb;
827         struct ldb_message *msg;
828         char *dn_key;
829         int ret, i;
830         unsigned int j;
831
832         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
833         if (!dn_key) {
834                 return -1;
835         }
836
837         msg = talloc(dn_key, struct ldb_message);
838         if (msg == NULL) {
839                 talloc_free(dn_key);
840                 return -1;
841         }
842
843         ret = ltdb_search_dn1(module, dn_key, msg);
844         if (ret == -1) {
845                 talloc_free(dn_key);
846                 return -1;
847         }
848
849         if (ret == 0) {
850                 /* it wasn't indexed. Did we have an earlier error? If we did then
851                    its gone now */
852                 talloc_free(dn_key);
853                 return 0;
854         }
855
856         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
857         if (i == -1) {
858                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
859                 /* it ain't there. hmmm */
860                 talloc_free(dn_key);
861                 return 0;
862         }
863
864         if (j != msg->elements[i].num_values - 1) {
865                 memmove(&msg->elements[i].values[j], 
866                         &msg->elements[i].values[j+1], 
867                         (msg->elements[i].num_values-(j+1)) * 
868                         sizeof(msg->elements[i].values[0]));
869         }
870         msg->elements[i].num_values--;
871
872         if (msg->elements[i].num_values == 0) {
873                 ret = ltdb_delete_noindex(module, dn_key);
874         } else {
875                 ret = ltdb_store(module, msg, TDB_REPLACE);
876         }
877
878         talloc_free(dn_key);
879
880         return ret;
881 }
882
883 /*
884   delete the index entries for a record
885   return -1 on failure
886 */
887 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
888 {
889         struct ltdb_private *ltdb = module->private_data;
890         int ret;
891         unsigned int i, j;
892
893         /* find the list of indexed fields */   
894         if (ltdb->cache->indexlist->num_elements == 0) {
895                 /* no indexed fields */
896                 return 0;
897         }
898
899         for (i=0;i<msg->num_elements;i++) {
900                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
901                                        NULL, LTDB_IDXATTR);
902                 if (ret == -1) {
903                         continue;
904                 }
905                 for (j=0;j<msg->elements[i].num_values;j++) {
906                         ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
907                         if (ret == -1) {
908                                 return -1;
909                         }
910                 }
911         }
912
913         return 0;
914 }
915
916
917 /*
918   traversal function that deletes all @INDEX records
919 */
920 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
921 {
922         const char *dn = "DN=" LTDB_INDEX ":";
923         if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
924                 return tdb_delete(tdb, key);
925         }
926         return 0;
927 }
928
929 /*
930   traversal function that adds @INDEX records during a re index
931 */
932 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
933 {
934         struct ldb_module *module = state;
935         struct ldb_message *msg;
936         int ret;
937         TDB_DATA key2;
938
939         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
940             strncmp(key.dptr, "DN=", 3) != 0) {
941                 return 0;
942         }
943
944         msg = talloc(module, struct ldb_message);
945         if (msg == NULL) {
946                 return -1;
947         }
948
949         ret = ltdb_unpack_data(module, &data, msg);
950         if (ret != 0) {
951                 talloc_free(msg);
952                 return -1;
953         }
954
955         /* check if the DN key has changed, perhaps due to the 
956            case insensitivity of an element changing */
957         key2 = ltdb_key(module, msg->dn);
958         if (strcmp(key2.dptr, key.dptr) != 0) {
959                 tdb_delete(tdb, key);
960                 tdb_store(tdb, key2, data, 0);
961         }
962         talloc_free(key2.dptr);
963
964         if (!msg->dn) {
965                 msg->dn = key.dptr+3;
966         }
967
968         ret = ltdb_index_add(module, msg);
969
970         talloc_free(msg);
971
972         return ret;
973 }
974
975 /*
976   force a complete reindex of the database
977 */
978 int ltdb_reindex(struct ldb_module *module)
979 {
980         struct ltdb_private *ltdb = module->private_data;
981         int ret;
982
983         if (ltdb_cache_reload(module) != 0) {
984                 return -1;
985         }
986
987         /* first traverse the database deleting any @INDEX records */
988         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
989         if (ret == -1) {
990                 errno = EIO;
991                 return -1;
992         }
993
994         /* now traverse adding any indexes for normal LDB records */
995         ret = tdb_traverse(ltdb->tdb, re_index, module);
996         if (ret == -1) {
997                 errno = EIO;
998                 return -1;
999         }
1000
1001         return 0;
1002 }