r4477: expanded the test suite to increase code coverage a lot
[ira/wip.git] / source / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39 #include "ldb/include/ldb_parse.h"
40
41 /*
42   find an element in a list, using the given comparison function and
43   assuming that the list is already sorted using comp_fn
44
45   return -1 if not found, or the index of the first occurance of needle if found
46 */
47 static int ldb_list_find(const void *needle, 
48                          const void *base, size_t nmemb, size_t size, 
49                          comparison_fn_t comp_fn)
50 {
51         const char *base_p = base;
52         size_t min_i, max_i, test_i;
53
54         if (nmemb == 0) {
55                 return -1;
56         }
57
58         min_i = 0;
59         max_i = nmemb-1;
60
61         while (min_i < max_i) {
62                 int r;
63
64                 test_i = (min_i + max_i) / 2;
65                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66                 if (r == 0) {
67                         /* scan back for first element */
68                         while (test_i > 0 &&
69                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
70                                 test_i--;
71                         }
72                         return test_i;
73                 }
74                 if (r < 0) {
75                         if (test_i == 0) {
76                                 return -1;
77                         }
78                         max_i = test_i - 1;
79                 }
80                 if (r > 0) {
81                         min_i = test_i + 1;
82                 }
83         }
84
85         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
86                 return min_i;
87         }
88
89         return -1;
90 }
91
92 struct dn_list {
93         unsigned int count;
94         char **dn;
95 };
96
97 /*
98   return the dn key to be used for an index
99   caller frees
100 */
101 static char *ldb_dn_key(struct ldb_context *ldb,
102                         const char *attr, const struct ldb_val *value)
103 {
104         char *ret = NULL;
105
106         if (ldb_should_b64_encode(value)) {
107                 char *vstr = ldb_base64_encode(ldb, value->data, value->length);
108                 if (!vstr) return NULL;
109                 ret = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr, vstr);
110                 talloc_free(vstr);
111                 return ret;
112         }
113
114         return talloc_asprintf(ldb, "%s:%s:%.*s", 
115                                LTDB_INDEX, attr, value->length, (char *)value->data);
116 }
117
118 /*
119   see if a attribute value is in the list of indexed attributes
120 */
121 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
122                             unsigned int *v_idx, const char *key)
123 {
124         unsigned int i, j;
125         for (i=0;i<msg->num_elements;i++) {
126                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
127                         const struct ldb_message_element *el = 
128                                 &msg->elements[i];
129                         for (j=0;j<el->num_values;j++) {
130                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
131                                         if (v_idx) {
132                                                 *v_idx = j;
133                                         }
134                                         return i;
135                                 }
136                         }
137                 }
138         }
139         return -1;
140 }
141
142 /* used in sorting dn lists */
143 static int list_cmp(const char **s1, const char **s2)
144 {
145         return strcmp(*s1, *s2);
146 }
147
148 /*
149   return a list of dn's that might match a simple indexed search or
150  */
151 static int ltdb_index_dn_simple(struct ldb_module *module, 
152                                 struct ldb_parse_tree *tree,
153                                 const struct ldb_message *index_list,
154                                 struct dn_list *list)
155 {
156         struct ldb_context *ldb = module->ldb;
157         char *dn = NULL;
158         int ret;
159         unsigned int i, j;
160         struct ldb_message *msg;
161
162         list->count = 0;
163         list->dn = NULL;
164
165         /*
166           if the value is a wildcard then we can't do a match via indexing
167         */
168         if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
169                 return -1;
170         }
171
172         /* if the attribute isn't in the list of indexed attributes then
173            this node needs a full search */
174         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
175                 return -1;
176         }
177
178         /* the attribute is indexed. Pull the list of DNs that match the 
179            search criterion */
180         dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
181         if (!dn) return -1;
182
183         msg = talloc_p(list, struct ldb_message);
184         if (msg == NULL) {
185                 return -1;
186         }
187
188         ret = ltdb_search_dn1(module, dn, msg);
189         talloc_free(dn);
190         if (ret == 0 || ret == -1) {
191                 return ret;
192         }
193
194         for (i=0;i<msg->num_elements;i++) {
195                 struct ldb_message_element *el;
196
197                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
198                         continue;
199                 }
200
201                 el = &msg->elements[i];
202
203                 list->dn = talloc_array_p(list, char *, el->num_values);
204                 if (!list->dn) {
205                         break;          
206                 }
207
208                 for (j=0;j<el->num_values;j++) {
209                         list->dn[list->count] = 
210                                 talloc_strdup(list->dn, (char *)el->values[j].data);
211                         if (!list->dn[list->count]) {
212                                 talloc_free(list);
213                                 return -1;
214                         }
215                         list->count++;
216                 }
217         }
218
219         talloc_free(msg);
220
221         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
222
223         return 1;
224 }
225
226
227 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
228
229 /*
230   return a list of dn's that might match a simple indexed search on
231   the special objectclass attribute
232  */
233 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
234                                      struct ldb_parse_tree *tree,
235                                      const struct ldb_message *index_list,
236                                      struct dn_list *list)
237 {
238         struct ldb_context *ldb = module->ldb;
239         struct ltdb_private *ltdb = module->private_data;
240         unsigned int i;
241         int ret;
242         const char *target = tree->u.simple.value.data;
243
244         list->count = 0;
245         list->dn = NULL;
246
247         ret = ltdb_index_dn_simple(module, tree, index_list, list);
248
249         for (i=0;i<ltdb->cache->subclasses->num_elements;i++) {
250                 struct ldb_message_element *el = &ltdb->cache->subclasses->elements[i];
251                 if (ldb_attr_cmp(el->name, target) == 0) {
252                         unsigned int j;
253                         for (j=0;j<el->num_values;j++) {
254                                 struct ldb_parse_tree tree2;
255                                 struct dn_list *list2;
256                                 tree2.operation = LDB_OP_SIMPLE;
257                                 tree2.u.simple.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
258                                 if (!tree2.u.simple.attr) {
259                                         return -1;
260                                 }
261                                 tree2.u.simple.value = el->values[j];
262                                 list2 = talloc_p(list, struct dn_list);
263                                 if (list2 == NULL) {
264                                         return -1;
265                                 }
266                                 if (ltdb_index_dn_objectclass(module, &tree2, 
267                                                               index_list, list2) == 1) {
268                                         if (list->count == 0) {
269                                                 *list = *list2;
270                                                 ret = 1;
271                                         } else {
272                                                 list_union(ldb, list, list2);
273                                                 talloc_free(list2);
274                                         }
275                                 }
276                                 talloc_free(tree2.u.simple.attr);
277                         }
278                 }
279         }
280
281         return ret;
282 }
283
284 /*
285   return a list of dn's that might match a leaf indexed search
286  */
287 static int ltdb_index_dn_leaf(struct ldb_module *module, 
288                               struct ldb_parse_tree *tree,
289                               const struct ldb_message *index_list,
290                               struct dn_list *list)
291 {
292         if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
293                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
294         }
295         return ltdb_index_dn_simple(module, tree, index_list, list);
296 }
297
298
299 /*
300   list intersection
301   list = list & list2
302   relies on the lists being sorted
303 */
304 static int list_intersect(struct ldb_context *ldb,
305                           struct dn_list *list, const struct dn_list *list2)
306 {
307         struct dn_list *list3;
308         unsigned int i;
309
310         if (list->count == 0 || list2->count == 0) {
311                 /* 0 & X == 0 */
312                 talloc_free(list);
313                 return 0;
314         }
315
316         list3 = talloc_p(ldb, struct dn_list);
317         if (list3 == NULL) {
318                 return -1;
319         }
320
321         list3->dn = talloc_array_p(list3, char *, list->count);
322         if (!list3->dn) {
323                 talloc_free(list);
324                 talloc_free(list3);
325                 return -1;
326         }
327         list3->count = 0;
328
329         for (i=0;i<list->count;i++) {
330                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
331                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
332                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
333                         list3->count++;
334                 } else {
335                         talloc_free(list->dn[i]);
336                 }               
337         }
338
339         talloc_free(list->dn);
340         list->dn = talloc_steal(list, list3->dn);
341         list->count = list3->count;
342         talloc_free(list3);
343
344         return 0;
345 }
346
347
348 /*
349   list union
350   list = list | list2
351   relies on the lists being sorted
352 */
353 static int list_union(struct ldb_context *ldb, 
354                       struct dn_list *list, const struct dn_list *list2)
355 {
356         unsigned int i;
357         char **d;
358         unsigned int count = list->count;
359
360         if (list->count == 0 && list2->count == 0) {
361                 /* 0 | 0 == 0 */
362                 talloc_free(list);
363                 return 0;
364         }
365
366         d = talloc_realloc_p(list, list->dn, char *, list->count + list2->count);
367         if (!d) {
368                 talloc_free(list);
369                 return -1;
370         }
371         list->dn = d;
372
373         for (i=0;i<list2->count;i++) {
374                 if (ldb_list_find(list2->dn[i], list->dn, count, 
375                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
376                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
377                         if (!list->dn[list->count]) {
378                                 talloc_free(list);
379                                 return -1;
380                         }
381                         list->count++;
382                 }               
383         }
384
385         if (list->count != count) {
386                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
387         }
388
389         return 0;
390 }
391
392 static int ltdb_index_dn(struct ldb_module *module, 
393                          struct ldb_parse_tree *tree,
394                          const struct ldb_message *index_list,
395                          struct dn_list *list);
396
397
398 /*
399   OR two index results
400  */
401 static int ltdb_index_dn_or(struct ldb_module *module, 
402                             struct ldb_parse_tree *tree,
403                             const struct ldb_message *index_list,
404                             struct dn_list *list)
405 {
406         struct ldb_context *ldb = module->ldb;
407         unsigned int i;
408         int ret;
409         
410         ret = -1;
411         list->dn = NULL;
412         list->count = 0;
413
414         for (i=0;i<tree->u.list.num_elements;i++) {
415                 struct dn_list *list2;
416                 int v;
417
418                 list2 = talloc_p(module, struct dn_list);
419                 if (list2 == NULL) {
420                         return -1;
421                 }
422
423                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
424
425                 if (v == 0) {
426                         /* 0 || X == X */
427                         if (ret == -1) {
428                                 ret = 0;
429                         }
430                         talloc_free(list2);
431                         continue;
432                 }
433
434                 if (v == -1) {
435                         /* 1 || X == 1 */
436                         talloc_free(list->dn);
437                         talloc_free(list2);
438                         return -1;
439                 }
440
441                 if (ret == -1) {
442                         ret = 1;
443                         list->dn = talloc_steal(list, list2->dn);
444                         list->count = list2->count;
445                 } else {
446                         if (list_union(ldb, list, list2) == -1) {
447                                 talloc_free(list2);
448                                 return -1;
449                         }
450                         ret = 1;
451                 }
452                 talloc_free(list2);
453         }
454
455         if (list->count == 0) {
456                 talloc_free(list);
457                 return 0;
458         }
459
460         return ret;
461 }
462
463
464 /*
465   NOT an index results
466  */
467 static int ltdb_index_dn_not(struct ldb_module *module, 
468                              struct ldb_parse_tree *tree,
469                              const struct ldb_message *index_list,
470                              struct dn_list *list)
471 {
472         /* the only way to do an indexed not would be if we could
473            negate the not via another not or if we knew the total
474            number of database elements so we could know that the
475            existing expression covered the whole database. 
476            
477            instead, we just give up, and rely on a full index scan
478            (unless an outer & manages to reduce the list)
479         */
480         return -1;
481 }
482
483 /*
484   AND two index results
485  */
486 static int ltdb_index_dn_and(struct ldb_module *module, 
487                              struct ldb_parse_tree *tree,
488                              const struct ldb_message *index_list,
489                              struct dn_list *list)
490 {
491         struct ldb_context *ldb = module->ldb;
492         unsigned int i;
493         int ret;
494         
495         ret = -1;
496         list->dn = NULL;
497         list->count = 0;
498
499         for (i=0;i<tree->u.list.num_elements;i++) {
500                 struct dn_list *list2;
501                 int v;
502
503                 list2 = talloc_p(module, struct dn_list);
504                 if (list2 == NULL) {
505                         return -1;
506                 }
507
508                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
509
510                 if (v == 0) {
511                         /* 0 && X == 0 */
512                         talloc_free(list->dn);
513                         talloc_free(list2);
514                         return 0;
515                 }
516
517                 if (v == -1) {
518                         talloc_free(list2);
519                         continue;
520                 }
521
522                 if (ret == -1) {
523                         ret = 1;
524                         talloc_free(list->dn);
525                         list->dn = talloc_steal(list, list2->dn);
526                         list->count = list2->count;
527                 } else {
528                         if (list_intersect(ldb, list, list2) == -1) {
529                                 talloc_free(list2);
530                                 return -1;
531                         }
532                 }
533
534                 talloc_free(list2);
535
536                 if (list->count == 0) {
537                         talloc_free(list->dn);
538                         return 0;
539                 }
540         }
541
542         return ret;
543 }
544
545 /*
546   return a list of dn's that might match a indexed search or
547   -1 if an error. return 0 for no matches, or 1 for matches
548  */
549 static int ltdb_index_dn(struct ldb_module *module, 
550                          struct ldb_parse_tree *tree,
551                          const struct ldb_message *index_list,
552                          struct dn_list *list)
553 {
554         int ret = -1;
555
556         switch (tree->operation) {
557         case LDB_OP_SIMPLE:
558                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
559                 break;
560
561         case LDB_OP_AND:
562                 ret = ltdb_index_dn_and(module, tree, index_list, list);
563                 break;
564
565         case LDB_OP_OR:
566                 ret = ltdb_index_dn_or(module, tree, index_list, list);
567                 break;
568
569         case LDB_OP_NOT:
570                 ret = ltdb_index_dn_not(module, tree, index_list, list);
571                 break;
572         }
573
574         return ret;
575 }
576
577 /*
578   filter a candidate dn_list from an indexed search into a set of results
579   extracting just the given attributes
580 */
581 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
582                             const char *base,
583                             enum ldb_scope scope,
584                             const struct dn_list *dn_list, 
585                             const char * const attrs[], struct ldb_message ***res)
586 {
587         unsigned int i;
588         int count = 0;
589
590         for (i=0;i<dn_list->count;i++) {
591                 struct ldb_message *msg;
592                 int ret;
593
594                 msg = talloc_p(module, struct ldb_message);
595                 if (msg == NULL) {
596                         return -1;
597                 }
598
599                 ret = ltdb_search_dn1(module, dn_list->dn[i], msg);
600                 if (ret == 0) {
601                         /* the record has disappeared? yes, this can happen */
602                         talloc_free(msg);
603                         continue;
604                 }
605
606                 if (ret == -1) {
607                         /* an internal error */
608                         talloc_free(msg);
609                         return -1;
610                 }
611
612                 ret = 0;
613                 if (ltdb_message_match(module, msg, tree, base, scope) == 1) {
614                         ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
615                 }
616                 talloc_free(msg);
617                 if (ret != 0) {
618                         return -1;
619                 }
620         }
621
622         return count;
623 }
624
625 /*
626   search the database with a LDAP-like expression using indexes
627   returns -1 if an indexed search is not possible, in which
628   case the caller should call ltdb_search_full() 
629 */
630 int ltdb_search_indexed(struct ldb_module *module, 
631                         const char *base,
632                         enum ldb_scope scope,
633                         struct ldb_parse_tree *tree,
634                         const char * const attrs[], struct ldb_message ***res)
635 {
636         struct ltdb_private *ltdb = module->private_data;
637         struct dn_list *dn_list;
638         int ret;
639
640         if (ltdb->cache->indexlist->num_elements == 0) {
641                 /* no index list? must do full search */
642                 return -1;
643         }
644
645         dn_list = talloc_p(module, struct dn_list);
646         if (dn_list == NULL) {
647                 return -1;
648         }
649
650         ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
651
652         if (ret == 1) {
653                 /* we've got a candidate list - now filter by the full tree
654                    and extract the needed attributes */
655                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
656                                        attrs, res);
657         }
658
659         talloc_free(dn_list);
660
661         return ret;
662 }
663
664 /*
665   add a index element where this is the first indexed DN for this value
666 */
667 static int ltdb_index_add1_new(struct ldb_context *ldb, 
668                                struct ldb_message *msg,
669                                struct ldb_message_element *el,
670                                char *dn)
671 {
672         struct ldb_message_element *el2;
673
674         /* add another entry */
675         el2 = talloc_realloc_p(msg, msg->elements, 
676                                struct ldb_message_element, msg->num_elements+1);
677         if (!el2) {
678                 return -1;
679         }
680
681         msg->elements = el2;
682         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
683         if (!msg->elements[msg->num_elements].name) {
684                 return -1;
685         }
686         msg->elements[msg->num_elements].num_values = 0;
687         msg->elements[msg->num_elements].values = talloc_p(msg->elements, struct ldb_val);
688         if (!msg->elements[msg->num_elements].values) {
689                 return -1;
690         }
691         msg->elements[msg->num_elements].values[0].length = strlen(dn);
692         msg->elements[msg->num_elements].values[0].data = dn;
693         msg->elements[msg->num_elements].num_values = 1;
694         msg->num_elements++;
695
696         return 0;
697 }
698
699
700 /*
701   add a index element where this is not the first indexed DN for this
702   value
703 */
704 static int ltdb_index_add1_add(struct ldb_context *ldb, 
705                                struct ldb_message *msg,
706                                struct ldb_message_element *el,
707                                int idx,
708                                char *dn)
709 {
710         struct ldb_val *v2;
711         unsigned int i;
712
713         /* for multi-valued attributes we can end up with repeats */
714         for (i=0;i<msg->elements[idx].num_values;i++) {
715                 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
716                         return 0;
717                 }
718         }
719
720         v2 = talloc_realloc_p(msg->elements, msg->elements[idx].values,
721                               struct ldb_val, 
722                               msg->elements[idx].num_values+1);
723         if (!v2) {
724                 return -1;
725         }
726         msg->elements[idx].values = v2;
727
728         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
729         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
730         msg->elements[idx].num_values++;
731
732         return 0;
733 }
734
735 /*
736   add an index entry for one message element
737 */
738 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
739                            struct ldb_message_element *el, int v_idx)
740 {
741         struct ldb_context *ldb = module->ldb;
742         struct ldb_message *msg;
743         char *dn_key;
744         int ret;
745         unsigned int i;
746
747         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
748         if (!dn_key) {
749                 return -1;
750         }
751
752         msg = talloc_p(dn_key, struct ldb_message);
753         if (msg == NULL) {
754                 return -1;
755         }
756
757         ret = ltdb_search_dn1(module, dn_key, msg);
758         if (ret == -1) {
759                 talloc_free(dn_key);
760                 return -1;
761         }
762
763         if (ret == 0) {
764                 msg->dn = talloc_strdup(msg, dn_key);
765                 if (!msg->dn) {
766                         talloc_free(dn_key);
767                         errno = ENOMEM;
768                         return -1;
769                 }
770                 msg->num_elements = 0;
771                 msg->elements = NULL;
772         }
773
774         for (i=0;i<msg->num_elements;i++) {
775                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
776                         break;
777                 }
778         }
779
780         if (i == msg->num_elements) {
781                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
782         } else {
783                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
784         }
785
786         if (ret == 0) {
787                 ret = ltdb_store(module, msg, TDB_REPLACE);
788         }
789
790         talloc_free(dn_key);
791
792         return ret;
793 }
794
795 /*
796   add the index entries for a new record
797   return -1 on failure
798 */
799 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
800 {
801         struct ltdb_private *ltdb = module->private_data;
802         int ret;
803         unsigned int i, j;
804
805         if (ltdb->cache->indexlist->num_elements == 0) {
806                 /* no indexed fields */
807                 return 0;
808         }
809
810         for (i=0;i<msg->num_elements;i++) {
811                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
812                                        NULL, LTDB_IDXATTR);
813                 if (ret == -1) {
814                         continue;
815                 }
816                 for (j=0;j<msg->elements[i].num_values;j++) {
817                         ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
818                         if (ret == -1) {
819                                 return -1;
820                         }
821                 }
822         }
823
824         return 0;
825 }
826
827
828 /*
829   delete an index entry for one message element
830 */
831 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
832                          struct ldb_message_element *el, int v_idx)
833 {
834         struct ldb_context *ldb = module->ldb;
835         struct ldb_message *msg;
836         char *dn_key;
837         int ret, i;
838         unsigned int j;
839
840         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
841         if (!dn_key) {
842                 return -1;
843         }
844
845         msg = talloc_p(dn_key, struct ldb_message);
846         if (msg == NULL) {
847                 talloc_free(dn_key);
848                 return -1;
849         }
850
851         ret = ltdb_search_dn1(module, dn_key, msg);
852         if (ret == -1) {
853                 talloc_free(dn_key);
854                 return -1;
855         }
856
857         if (ret == 0) {
858                 /* it wasn't indexed. Did we have an earlier error? If we did then
859                    its gone now */
860                 talloc_free(dn_key);
861                 return 0;
862         }
863
864         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
865         if (i == -1) {
866                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
867                 /* it ain't there. hmmm */
868                 talloc_free(dn_key);
869                 return 0;
870         }
871
872         if (j != msg->elements[i].num_values - 1) {
873                 memmove(&msg->elements[i].values[j], 
874                         &msg->elements[i].values[j+1], 
875                         (msg->elements[i].num_values-(j+1)) * 
876                         sizeof(msg->elements[i].values[0]));
877         }
878         msg->elements[i].num_values--;
879
880         if (msg->elements[i].num_values == 0) {
881                 ret = ltdb_delete_noindex(module, dn_key);
882         } else {
883                 ret = ltdb_store(module, msg, TDB_REPLACE);
884         }
885
886         talloc_free(dn_key);
887
888         return ret;
889 }
890
891 /*
892   delete the index entries for a record
893   return -1 on failure
894 */
895 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
896 {
897         struct ltdb_private *ltdb = module->private_data;
898         int ret;
899         unsigned int i, j;
900
901         /* find the list of indexed fields */   
902         if (ltdb->cache->indexlist->num_elements == 0) {
903                 /* no indexed fields */
904                 return 0;
905         }
906
907         for (i=0;i<msg->num_elements;i++) {
908                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
909                                        NULL, LTDB_IDXATTR);
910                 if (ret == -1) {
911                         continue;
912                 }
913                 for (j=0;j<msg->elements[i].num_values;j++) {
914                         ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
915                         if (ret == -1) {
916                                 return -1;
917                         }
918                 }
919         }
920
921         return 0;
922 }
923
924
925 /*
926   traversal function that deletes all @INDEX records
927 */
928 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
929 {
930         const char *dn = "DN=" LTDB_INDEX ":";
931         if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
932                 return tdb_delete(tdb, key);
933         }
934         return 0;
935 }
936
937 /*
938   traversal function that adds @INDEX records during a re index
939 */
940 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
941 {
942         struct ldb_module *module = state;
943         struct ldb_message *msg;
944         int ret;
945
946         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
947             strncmp(key.dptr, "DN=", 3) != 0) {
948                 return 0;
949         }
950
951         msg = talloc_p(module, struct ldb_message);
952         if (msg == NULL) {
953                 return -1;
954         }
955
956         ret = ltdb_unpack_data(module, &data, msg);
957         if (ret != 0) {
958                 talloc_free(msg);
959                 return -1;
960         }
961
962         if (!msg->dn) {
963                 msg->dn = key.dptr+3;
964         }
965
966         ret = ltdb_index_add(module, msg);
967
968         talloc_free(msg);
969
970         return ret;
971 }
972
973 /*
974   force a complete reindex of the database
975 */
976 int ltdb_reindex(struct ldb_module *module)
977 {
978         struct ltdb_private *ltdb = module->private_data;
979         int ret;
980
981         if (ltdb_cache_reload(module) != 0) {
982                 return -1;
983         }
984
985         /* first traverse the database deleting any @INDEX records */
986         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
987         if (ret == -1) {
988                 errno = EIO;
989                 return -1;
990         }
991
992         /* now traverse adding any indexes for normal LDB records */
993         ret = tdb_traverse(ltdb->tdb, re_index, module);
994         if (ret == -1) {
995                 errno = EIO;
996                 return -1;
997         }
998
999         return 0;
1000 }