r7558: added support in ldb for extended ldap search requests. These are
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static char *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         char *ret = NULL;
104
105         if (ldb_should_b64_encode(value)) {
106                 char *vstr = ldb_base64_encode(ldb, value->data, value->length);
107                 if (!vstr) return NULL;
108                 ret = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr, vstr);
109                 talloc_free(vstr);
110                 return ret;
111         }
112
113         return talloc_asprintf(ldb, "%s:%s:%.*s", 
114                                LTDB_INDEX, attr, value->length, (char *)value->data);
115 }
116
117 /*
118   see if a attribute value is in the list of indexed attributes
119 */
120 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
121                             unsigned int *v_idx, const char *key)
122 {
123         unsigned int i, j;
124         for (i=0;i<msg->num_elements;i++) {
125                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
126                         const struct ldb_message_element *el = 
127                                 &msg->elements[i];
128                         for (j=0;j<el->num_values;j++) {
129                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
130                                         if (v_idx) {
131                                                 *v_idx = j;
132                                         }
133                                         return i;
134                                 }
135                         }
136                 }
137         }
138         return -1;
139 }
140
141 /* used in sorting dn lists */
142 static int list_cmp(const char **s1, const char **s2)
143 {
144         return strcmp(*s1, *s2);
145 }
146
147 /*
148   return a list of dn's that might match a simple indexed search or
149  */
150 static int ltdb_index_dn_simple(struct ldb_module *module, 
151                                 struct ldb_parse_tree *tree,
152                                 const struct ldb_message *index_list,
153                                 struct dn_list *list)
154 {
155         struct ldb_context *ldb = module->ldb;
156         char *dn = NULL;
157         int ret;
158         unsigned int i, j;
159         struct ldb_message *msg;
160
161         list->count = 0;
162         list->dn = NULL;
163
164         /*
165           if the value is a wildcard then we can't do a match via indexing
166         */
167         if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
168                 return -1;
169         }
170
171         /* if the attribute isn't in the list of indexed attributes then
172            this node needs a full search */
173         if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
174                 return -1;
175         }
176
177         /* the attribute is indexed. Pull the list of DNs that match the 
178            search criterion */
179         dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
180         if (!dn) return -1;
181
182         msg = talloc(list, struct ldb_message);
183         if (msg == NULL) {
184                 return -1;
185         }
186
187         ret = ltdb_search_dn1(module, dn, msg);
188         talloc_free(dn);
189         if (ret == 0 || ret == -1) {
190                 return ret;
191         }
192
193         for (i=0;i<msg->num_elements;i++) {
194                 struct ldb_message_element *el;
195
196                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
197                         continue;
198                 }
199
200                 el = &msg->elements[i];
201
202                 list->dn = talloc_array(list, char *, el->num_values);
203                 if (!list->dn) {
204                         break;          
205                 }
206
207                 for (j=0;j<el->num_values;j++) {
208                         list->dn[list->count] = 
209                                 talloc_strdup(list->dn, (char *)el->values[j].data);
210                         if (!list->dn[list->count]) {
211                                 return -1;
212                         }
213                         list->count++;
214                 }
215         }
216
217         talloc_free(msg);
218
219         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
220
221         return 1;
222 }
223
224
225 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
226
227 /*
228   return a list of dn's that might match a simple indexed search on
229   the special objectclass attribute
230  */
231 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
232                                      struct ldb_parse_tree *tree,
233                                      const struct ldb_message *index_list,
234                                      struct dn_list *list)
235 {
236         struct ldb_context *ldb = module->ldb;
237         struct ltdb_private *ltdb = module->private_data;
238         unsigned int i;
239         int ret;
240         const char *target = tree->u.simple.value.data;
241
242         list->count = 0;
243         list->dn = NULL;
244
245         ret = ltdb_index_dn_simple(module, tree, index_list, list);
246
247         for (i=0;i<ltdb->cache->subclasses->num_elements;i++) {
248                 struct ldb_message_element *el = &ltdb->cache->subclasses->elements[i];
249                 if (ldb_attr_cmp(el->name, target) == 0) {
250                         unsigned int j;
251                         for (j=0;j<el->num_values;j++) {
252                                 struct ldb_parse_tree tree2;
253                                 struct dn_list *list2;
254                                 tree2.operation = LDB_OP_SIMPLE;
255                                 tree2.u.simple.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
256                                 if (!tree2.u.simple.attr) {
257                                         return -1;
258                                 }
259                                 tree2.u.simple.value = el->values[j];
260                                 list2 = talloc(list, struct dn_list);
261                                 if (list2 == NULL) {
262                                         return -1;
263                                 }
264                                 if (ltdb_index_dn_objectclass(module, &tree2, 
265                                                               index_list, list2) == 1) {
266                                         if (list->count == 0) {
267                                                 *list = *list2;
268                                                 ret = 1;
269                                         } else {
270                                                 list_union(ldb, list, list2);
271                                                 talloc_free(list2);
272                                         }
273                                 }
274                                 talloc_free(tree2.u.simple.attr);
275                         }
276                 }
277         }
278
279         return ret;
280 }
281
282 /*
283   return a list of dn's that might match a leaf indexed search
284  */
285 static int ltdb_index_dn_leaf(struct ldb_module *module, 
286                               struct ldb_parse_tree *tree,
287                               const struct ldb_message *index_list,
288                               struct dn_list *list)
289 {
290         if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
291                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
292         }
293         return ltdb_index_dn_simple(module, tree, index_list, list);
294 }
295
296
297 /*
298   list intersection
299   list = list & list2
300   relies on the lists being sorted
301 */
302 static int list_intersect(struct ldb_context *ldb,
303                           struct dn_list *list, const struct dn_list *list2)
304 {
305         struct dn_list *list3;
306         unsigned int i;
307
308         if (list->count == 0 || list2->count == 0) {
309                 /* 0 & X == 0 */
310                 return 0;
311         }
312
313         list3 = talloc(ldb, struct dn_list);
314         if (list3 == NULL) {
315                 return -1;
316         }
317
318         list3->dn = talloc_array(list3, char *, list->count);
319         if (!list3->dn) {
320                 talloc_free(list3);
321                 return -1;
322         }
323         list3->count = 0;
324
325         for (i=0;i<list->count;i++) {
326                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
327                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
328                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
329                         list3->count++;
330                 } else {
331                         talloc_free(list->dn[i]);
332                 }               
333         }
334
335         talloc_free(list->dn);
336         list->dn = talloc_steal(list, list3->dn);
337         list->count = list3->count;
338         talloc_free(list3);
339
340         return 0;
341 }
342
343
344 /*
345   list union
346   list = list | list2
347   relies on the lists being sorted
348 */
349 static int list_union(struct ldb_context *ldb, 
350                       struct dn_list *list, const struct dn_list *list2)
351 {
352         unsigned int i;
353         char **d;
354         unsigned int count = list->count;
355
356         if (list->count == 0 && list2->count == 0) {
357                 /* 0 | 0 == 0 */
358                 return 0;
359         }
360
361         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
362         if (!d) {
363                 return -1;
364         }
365         list->dn = d;
366
367         for (i=0;i<list2->count;i++) {
368                 if (ldb_list_find(list2->dn[i], list->dn, count, 
369                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
370                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
371                         if (!list->dn[list->count]) {
372                                 return -1;
373                         }
374                         list->count++;
375                 }               
376         }
377
378         if (list->count != count) {
379                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
380         }
381
382         return 0;
383 }
384
385 static int ltdb_index_dn(struct ldb_module *module, 
386                          struct ldb_parse_tree *tree,
387                          const struct ldb_message *index_list,
388                          struct dn_list *list);
389
390
391 /*
392   OR two index results
393  */
394 static int ltdb_index_dn_or(struct ldb_module *module, 
395                             struct ldb_parse_tree *tree,
396                             const struct ldb_message *index_list,
397                             struct dn_list *list)
398 {
399         struct ldb_context *ldb = module->ldb;
400         unsigned int i;
401         int ret;
402         
403         ret = -1;
404         list->dn = NULL;
405         list->count = 0;
406
407         for (i=0;i<tree->u.list.num_elements;i++) {
408                 struct dn_list *list2;
409                 int v;
410
411                 list2 = talloc(module, struct dn_list);
412                 if (list2 == NULL) {
413                         return -1;
414                 }
415
416                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
417
418                 if (v == 0) {
419                         /* 0 || X == X */
420                         if (ret == -1) {
421                                 ret = 0;
422                         }
423                         talloc_free(list2);
424                         continue;
425                 }
426
427                 if (v == -1) {
428                         /* 1 || X == 1 */
429                         talloc_free(list->dn);
430                         talloc_free(list2);
431                         return -1;
432                 }
433
434                 if (ret == -1) {
435                         ret = 1;
436                         list->dn = talloc_steal(list, list2->dn);
437                         list->count = list2->count;
438                 } else {
439                         if (list_union(ldb, list, list2) == -1) {
440                                 talloc_free(list2);
441                                 return -1;
442                         }
443                         ret = 1;
444                 }
445                 talloc_free(list2);
446         }
447
448         if (list->count == 0) {
449                 return 0;
450         }
451
452         return ret;
453 }
454
455
456 /*
457   NOT an index results
458  */
459 static int ltdb_index_dn_not(struct ldb_module *module, 
460                              struct ldb_parse_tree *tree,
461                              const struct ldb_message *index_list,
462                              struct dn_list *list)
463 {
464         /* the only way to do an indexed not would be if we could
465            negate the not via another not or if we knew the total
466            number of database elements so we could know that the
467            existing expression covered the whole database. 
468            
469            instead, we just give up, and rely on a full index scan
470            (unless an outer & manages to reduce the list)
471         */
472         return -1;
473 }
474
475 /*
476   AND two index results
477  */
478 static int ltdb_index_dn_and(struct ldb_module *module, 
479                              struct ldb_parse_tree *tree,
480                              const struct ldb_message *index_list,
481                              struct dn_list *list)
482 {
483         struct ldb_context *ldb = module->ldb;
484         unsigned int i;
485         int ret;
486         
487         ret = -1;
488         list->dn = NULL;
489         list->count = 0;
490
491         for (i=0;i<tree->u.list.num_elements;i++) {
492                 struct dn_list *list2;
493                 int v;
494
495                 list2 = talloc(module, struct dn_list);
496                 if (list2 == NULL) {
497                         return -1;
498                 }
499
500                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
501
502                 if (v == 0) {
503                         /* 0 && X == 0 */
504                         talloc_free(list->dn);
505                         talloc_free(list2);
506                         return 0;
507                 }
508
509                 if (v == -1) {
510                         talloc_free(list2);
511                         continue;
512                 }
513
514                 if (ret == -1) {
515                         ret = 1;
516                         talloc_free(list->dn);
517                         list->dn = talloc_steal(list, list2->dn);
518                         list->count = list2->count;
519                 } else {
520                         if (list_intersect(ldb, list, list2) == -1) {
521                                 talloc_free(list2);
522                                 return -1;
523                         }
524                 }
525
526                 talloc_free(list2);
527
528                 if (list->count == 0) {
529                         talloc_free(list->dn);
530                         return 0;
531                 }
532         }
533
534         return ret;
535 }
536
537 /*
538   return a list of dn's that might match a indexed search or
539   -1 if an error. return 0 for no matches, or 1 for matches
540  */
541 static int ltdb_index_dn(struct ldb_module *module, 
542                          struct ldb_parse_tree *tree,
543                          const struct ldb_message *index_list,
544                          struct dn_list *list)
545 {
546         int ret = -1;
547
548         switch (tree->operation) {
549         case LDB_OP_SIMPLE:
550                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
551                 break;
552
553         case LDB_OP_EXTENDED:
554                 /* we can't index with fancy bitops yet */
555                 ret = -1;
556                 break;
557
558         case LDB_OP_AND:
559                 ret = ltdb_index_dn_and(module, tree, index_list, list);
560                 break;
561
562         case LDB_OP_OR:
563                 ret = ltdb_index_dn_or(module, tree, index_list, list);
564                 break;
565
566         case LDB_OP_NOT:
567                 ret = ltdb_index_dn_not(module, tree, index_list, list);
568                 break;
569         }
570
571         return ret;
572 }
573
574 /*
575   filter a candidate dn_list from an indexed search into a set of results
576   extracting just the given attributes
577 */
578 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
579                             const char *base,
580                             enum ldb_scope scope,
581                             const struct dn_list *dn_list, 
582                             const char * const attrs[], struct ldb_message ***res)
583 {
584         unsigned int i;
585         int count = 0;
586
587         for (i=0;i<dn_list->count;i++) {
588                 struct ldb_message *msg;
589                 int ret;
590
591                 msg = talloc(module, struct ldb_message);
592                 if (msg == NULL) {
593                         return -1;
594                 }
595
596                 ret = ltdb_search_dn1(module, dn_list->dn[i], msg);
597                 if (ret == 0) {
598                         /* the record has disappeared? yes, this can happen */
599                         talloc_free(msg);
600                         continue;
601                 }
602
603                 if (ret == -1) {
604                         /* an internal error */
605                         talloc_free(msg);
606                         return -1;
607                 }
608
609                 ret = 0;
610                 if (ltdb_message_match(module, msg, tree, base, scope) == 1) {
611                         ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
612                 }
613                 talloc_free(msg);
614                 if (ret != 0) {
615                         return -1;
616                 }
617         }
618
619         return count;
620 }
621
622 /*
623   search the database with a LDAP-like expression using indexes
624   returns -1 if an indexed search is not possible, in which
625   case the caller should call ltdb_search_full() 
626 */
627 int ltdb_search_indexed(struct ldb_module *module, 
628                         const char *base,
629                         enum ldb_scope scope,
630                         struct ldb_parse_tree *tree,
631                         const char * const attrs[], struct ldb_message ***res)
632 {
633         struct ltdb_private *ltdb = module->private_data;
634         struct dn_list *dn_list;
635         int ret;
636
637         if (ltdb->cache->indexlist->num_elements == 0) {
638                 /* no index list? must do full search */
639                 return -1;
640         }
641
642         dn_list = talloc(module, struct dn_list);
643         if (dn_list == NULL) {
644                 return -1;
645         }
646
647         ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
648
649         if (ret == 1) {
650                 /* we've got a candidate list - now filter by the full tree
651                    and extract the needed attributes */
652                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
653                                        attrs, res);
654         }
655
656         talloc_free(dn_list);
657
658         return ret;
659 }
660
661 /*
662   add a index element where this is the first indexed DN for this value
663 */
664 static int ltdb_index_add1_new(struct ldb_context *ldb, 
665                                struct ldb_message *msg,
666                                struct ldb_message_element *el,
667                                char *dn)
668 {
669         struct ldb_message_element *el2;
670
671         /* add another entry */
672         el2 = talloc_realloc(msg, msg->elements, 
673                                struct ldb_message_element, msg->num_elements+1);
674         if (!el2) {
675                 return -1;
676         }
677
678         msg->elements = el2;
679         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
680         if (!msg->elements[msg->num_elements].name) {
681                 return -1;
682         }
683         msg->elements[msg->num_elements].num_values = 0;
684         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
685         if (!msg->elements[msg->num_elements].values) {
686                 return -1;
687         }
688         msg->elements[msg->num_elements].values[0].length = strlen(dn);
689         msg->elements[msg->num_elements].values[0].data = dn;
690         msg->elements[msg->num_elements].num_values = 1;
691         msg->num_elements++;
692
693         return 0;
694 }
695
696
697 /*
698   add a index element where this is not the first indexed DN for this
699   value
700 */
701 static int ltdb_index_add1_add(struct ldb_context *ldb, 
702                                struct ldb_message *msg,
703                                struct ldb_message_element *el,
704                                int idx,
705                                char *dn)
706 {
707         struct ldb_val *v2;
708         unsigned int i;
709
710         /* for multi-valued attributes we can end up with repeats */
711         for (i=0;i<msg->elements[idx].num_values;i++) {
712                 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
713                         return 0;
714                 }
715         }
716
717         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
718                               struct ldb_val, 
719                               msg->elements[idx].num_values+1);
720         if (!v2) {
721                 return -1;
722         }
723         msg->elements[idx].values = v2;
724
725         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
726         msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
727         msg->elements[idx].num_values++;
728
729         return 0;
730 }
731
732 /*
733   add an index entry for one message element
734 */
735 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
736                            struct ldb_message_element *el, int v_idx)
737 {
738         struct ldb_context *ldb = module->ldb;
739         struct ldb_message *msg;
740         char *dn_key;
741         int ret;
742         unsigned int i;
743
744         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
745         if (!dn_key) {
746                 return -1;
747         }
748
749         msg = talloc(dn_key, struct ldb_message);
750         if (msg == NULL) {
751                 return -1;
752         }
753
754         ret = ltdb_search_dn1(module, dn_key, msg);
755         if (ret == -1) {
756                 talloc_free(dn_key);
757                 return -1;
758         }
759
760         if (ret == 0) {
761                 msg->dn = talloc_strdup(msg, dn_key);
762                 if (!msg->dn) {
763                         talloc_free(dn_key);
764                         errno = ENOMEM;
765                         return -1;
766                 }
767                 msg->num_elements = 0;
768                 msg->elements = NULL;
769         }
770
771         for (i=0;i<msg->num_elements;i++) {
772                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
773                         break;
774                 }
775         }
776
777         if (i == msg->num_elements) {
778                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
779         } else {
780                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
781         }
782
783         if (ret == 0) {
784                 ret = ltdb_store(module, msg, TDB_REPLACE);
785         }
786
787         talloc_free(dn_key);
788
789         return ret;
790 }
791
792 /*
793   add the index entries for a new record
794   return -1 on failure
795 */
796 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
797 {
798         struct ltdb_private *ltdb = module->private_data;
799         int ret;
800         unsigned int i, j;
801
802         if (ltdb->cache->indexlist->num_elements == 0) {
803                 /* no indexed fields */
804                 return 0;
805         }
806
807         for (i=0;i<msg->num_elements;i++) {
808                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
809                                        NULL, LTDB_IDXATTR);
810                 if (ret == -1) {
811                         continue;
812                 }
813                 for (j=0;j<msg->elements[i].num_values;j++) {
814                         ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
815                         if (ret == -1) {
816                                 return -1;
817                         }
818                 }
819         }
820
821         return 0;
822 }
823
824
825 /*
826   delete an index entry for one message element
827 */
828 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
829                          struct ldb_message_element *el, int v_idx)
830 {
831         struct ldb_context *ldb = module->ldb;
832         struct ldb_message *msg;
833         char *dn_key;
834         int ret, i;
835         unsigned int j;
836
837         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
838         if (!dn_key) {
839                 return -1;
840         }
841
842         msg = talloc(dn_key, struct ldb_message);
843         if (msg == NULL) {
844                 talloc_free(dn_key);
845                 return -1;
846         }
847
848         ret = ltdb_search_dn1(module, dn_key, msg);
849         if (ret == -1) {
850                 talloc_free(dn_key);
851                 return -1;
852         }
853
854         if (ret == 0) {
855                 /* it wasn't indexed. Did we have an earlier error? If we did then
856                    its gone now */
857                 talloc_free(dn_key);
858                 return 0;
859         }
860
861         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
862         if (i == -1) {
863                 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
864                 /* it ain't there. hmmm */
865                 talloc_free(dn_key);
866                 return 0;
867         }
868
869         if (j != msg->elements[i].num_values - 1) {
870                 memmove(&msg->elements[i].values[j], 
871                         &msg->elements[i].values[j+1], 
872                         (msg->elements[i].num_values-(j+1)) * 
873                         sizeof(msg->elements[i].values[0]));
874         }
875         msg->elements[i].num_values--;
876
877         if (msg->elements[i].num_values == 0) {
878                 ret = ltdb_delete_noindex(module, dn_key);
879         } else {
880                 ret = ltdb_store(module, msg, TDB_REPLACE);
881         }
882
883         talloc_free(dn_key);
884
885         return ret;
886 }
887
888 /*
889   delete the index entries for a record
890   return -1 on failure
891 */
892 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
893 {
894         struct ltdb_private *ltdb = module->private_data;
895         int ret;
896         unsigned int i, j;
897
898         /* find the list of indexed fields */   
899         if (ltdb->cache->indexlist->num_elements == 0) {
900                 /* no indexed fields */
901                 return 0;
902         }
903
904         for (i=0;i<msg->num_elements;i++) {
905                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
906                                        NULL, LTDB_IDXATTR);
907                 if (ret == -1) {
908                         continue;
909                 }
910                 for (j=0;j<msg->elements[i].num_values;j++) {
911                         ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
912                         if (ret == -1) {
913                                 return -1;
914                         }
915                 }
916         }
917
918         return 0;
919 }
920
921
922 /*
923   traversal function that deletes all @INDEX records
924 */
925 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
926 {
927         const char *dn = "DN=" LTDB_INDEX ":";
928         if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
929                 return tdb_delete(tdb, key);
930         }
931         return 0;
932 }
933
934 /*
935   traversal function that adds @INDEX records during a re index
936 */
937 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
938 {
939         struct ldb_module *module = state;
940         struct ldb_message *msg;
941         int ret;
942         TDB_DATA key2;
943
944         if (strncmp(key.dptr, "DN=@", 4) == 0 ||
945             strncmp(key.dptr, "DN=", 3) != 0) {
946                 return 0;
947         }
948
949         msg = talloc(module, struct ldb_message);
950         if (msg == NULL) {
951                 return -1;
952         }
953
954         ret = ltdb_unpack_data(module, &data, msg);
955         if (ret != 0) {
956                 talloc_free(msg);
957                 return -1;
958         }
959
960         /* check if the DN key has changed, perhaps due to the 
961            case insensitivity of an element changing */
962         key2 = ltdb_key(module, msg->dn);
963         if (strcmp(key2.dptr, key.dptr) != 0) {
964                 tdb_delete(tdb, key);
965                 tdb_store(tdb, key2, data, 0);
966         }
967         talloc_free(key2.dptr);
968
969         if (!msg->dn) {
970                 msg->dn = key.dptr+3;
971         }
972
973         ret = ltdb_index_add(module, msg);
974
975         talloc_free(msg);
976
977         return ret;
978 }
979
980 /*
981   force a complete reindex of the database
982 */
983 int ltdb_reindex(struct ldb_module *module)
984 {
985         struct ltdb_private *ltdb = module->private_data;
986         int ret;
987
988         if (ltdb_cache_reload(module) != 0) {
989                 return -1;
990         }
991
992         /* first traverse the database deleting any @INDEX records */
993         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
994         if (ret == -1) {
995                 errno = EIO;
996                 return -1;
997         }
998
999         /* now traverse adding any indexes for normal LDB records */
1000         ret = tdb_traverse(ltdb->tdb, re_index, module);
1001         if (ret == -1) {
1002                 errno = EIO;
1003                 return -1;
1004         }
1005
1006         return 0;
1007 }