r11353: a bit of an improvement to the ldb_tdb error handling
[gd/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         struct ldb_dn *ret;
104         char *dn;
105         struct ldb_val v;
106         const struct ldb_attrib_handler *h;
107         char *attr_folded;
108
109         attr_folded = ldb_casefold(ldb, attr);
110         if (!attr_folded) {
111                 return NULL;
112         }
113
114         h = ldb_attrib_handler(ldb, attr);
115         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116                 /* canonicalisation can be refused. For example, 
117                    a attribute that takes wildcards will refuse to canonicalise
118                    if the value contains a wildcard */
119                 talloc_free(attr_folded);
120                 return NULL;
121         }
122         if (ldb_should_b64_encode(&v)) {
123                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124                 if (!vstr) return NULL;
125                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
126                 talloc_free(vstr);
127                 if (v.data != value->data) {
128                         talloc_free(v.data);
129                 }
130                 talloc_free(attr_folded);
131                 if (dn == NULL) return NULL;
132                 goto done;
133         }
134
135         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
136                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137
138         if (v.data != value->data) {
139                 talloc_free(v.data);
140         }
141         talloc_free(attr_folded);
142
143 done:
144         ret = ldb_dn_explode(ldb, dn);
145         talloc_free(dn);
146         return ret;
147 }
148
149 /*
150   see if a attribute value is in the list of indexed attributes
151 */
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153                             unsigned int *v_idx, const char *key)
154 {
155         unsigned int i, j;
156         for (i=0;i<msg->num_elements;i++) {
157                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158                         const struct ldb_message_element *el = 
159                                 &msg->elements[i];
160                         for (j=0;j<el->num_values;j++) {
161                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
162                                         if (v_idx) {
163                                                 *v_idx = j;
164                                         }
165                                         return i;
166                                 }
167                         }
168                 }
169         }
170         return -1;
171 }
172
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
175 {
176         return strcmp(*s1, *s2);
177 }
178
179 /*
180   return a list of dn's that might match a simple indexed search or
181  */
182 static int ltdb_index_dn_simple(struct ldb_module *module, 
183                                 struct ldb_parse_tree *tree,
184                                 const struct ldb_message *index_list,
185                                 struct dn_list *list)
186 {
187         struct ldb_context *ldb = module->ldb;
188         struct ldb_dn *dn;
189         int ret;
190         unsigned int i, j;
191         struct ldb_message *msg;
192
193         list->count = 0;
194         list->dn = NULL;
195
196         /* if the attribute isn't in the list of indexed attributes then
197            this node needs a full search */
198         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
199                 return -1;
200         }
201
202         /* the attribute is indexed. Pull the list of DNs that match the 
203            search criterion */
204         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
205         if (!dn) return -1;
206
207         msg = talloc(list, struct ldb_message);
208         if (msg == NULL) {
209                 return -1;
210         }
211
212         ret = ltdb_search_dn1(module, dn, msg);
213         talloc_free(dn);
214         if (ret == 0 || ret == -1) {
215                 return ret;
216         }
217
218         for (i=0;i<msg->num_elements;i++) {
219                 struct ldb_message_element *el;
220
221                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
222                         continue;
223                 }
224
225                 el = &msg->elements[i];
226
227                 list->dn = talloc_array(list, char *, el->num_values);
228                 if (!list->dn) {
229                         break;          
230                 }
231
232                 for (j=0;j<el->num_values;j++) {
233                         list->dn[list->count] = 
234                                 talloc_strdup(list->dn, (char *)el->values[j].data);
235                         if (!list->dn[list->count]) {
236                                 return -1;
237                         }
238                         list->count++;
239                 }
240         }
241
242         talloc_free(msg);
243
244         qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
245
246         return 1;
247 }
248
249
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
251
252 /*
253   return a list of dn's that might match a simple indexed search on
254   the special objectclass attribute
255  */
256 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
257                                      struct ldb_parse_tree *tree,
258                                      const struct ldb_message *index_list,
259                                      struct dn_list *list)
260 {
261         struct ldb_context *ldb = module->ldb;
262         unsigned int i;
263         int ret;
264         const char *target = (const char *)tree->u.equality.value.data;
265         const char **subclasses;
266
267         list->count = 0;
268         list->dn = NULL;
269
270         ret = ltdb_index_dn_simple(module, tree, index_list, list);
271
272         subclasses = ldb_subclass_list(module->ldb, target);
273
274         if (subclasses == NULL) {
275                 return ret;
276         }
277
278         for (i=0;subclasses[i];i++) {
279                 struct ldb_parse_tree tree2;
280                 struct dn_list *list2;
281                 tree2.operation = LDB_OP_EQUALITY;
282                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
283                 if (!tree2.u.equality.attr) {
284                         return -1;
285                 }
286                 tree2.u.equality.value.data = 
287                         (uint8_t *)talloc_strdup(list, subclasses[i]);
288                 if (tree2.u.equality.value.data == NULL) {
289                         return -1;                      
290                 }
291                 tree2.u.equality.value.length = strlen(subclasses[i]);
292                 list2 = talloc(list, struct dn_list);
293                 if (list2 == NULL) {
294                         talloc_free(tree2.u.equality.value.data);
295                         return -1;
296                 }
297                 if (ltdb_index_dn_objectclass(module, &tree2, 
298                                               index_list, list2) == 1) {
299                         if (list->count == 0) {
300                                 *list = *list2;
301                                 ret = 1;
302                         } else {
303                                 list_union(ldb, list, list2);
304                                 talloc_free(list2);
305                         }
306                 }
307                 talloc_free(tree2.u.equality.value.data);
308         }
309
310         return ret;
311 }
312
313 /*
314   return a list of dn's that might match a leaf indexed search
315  */
316 static int ltdb_index_dn_leaf(struct ldb_module *module, 
317                               struct ldb_parse_tree *tree,
318                               const struct ldb_message *index_list,
319                               struct dn_list *list)
320 {
321         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
322                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
323         }
324         if (ldb_attr_cmp(tree->u.equality.attr, "distinguishedName") == 0 ||
325             ldb_attr_cmp(tree->u.equality.attr, "dn") == 0) {
326                 list->dn = talloc_array(list, char *, 1);
327                 if (list->dn == NULL) {
328                         ldb_oom(module->ldb);
329                         return -1;
330                 }
331                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
332                 if (list->dn[0] == NULL) {
333                         ldb_oom(module->ldb);
334                         return -1;
335                 }
336                 list->count = 1;
337                 return 1;
338         }
339         return ltdb_index_dn_simple(module, tree, index_list, list);
340 }
341
342
343 /*
344   list intersection
345   list = list & list2
346   relies on the lists being sorted
347 */
348 static int list_intersect(struct ldb_context *ldb,
349                           struct dn_list *list, const struct dn_list *list2)
350 {
351         struct dn_list *list3;
352         unsigned int i;
353
354         if (list->count == 0 || list2->count == 0) {
355                 /* 0 & X == 0 */
356                 return 0;
357         }
358
359         list3 = talloc(ldb, struct dn_list);
360         if (list3 == NULL) {
361                 return -1;
362         }
363
364         list3->dn = talloc_array(list3, char *, list->count);
365         if (!list3->dn) {
366                 talloc_free(list3);
367                 return -1;
368         }
369         list3->count = 0;
370
371         for (i=0;i<list->count;i++) {
372                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
373                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
374                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
375                         list3->count++;
376                 } else {
377                         talloc_free(list->dn[i]);
378                 }               
379         }
380
381         talloc_free(list->dn);
382         list->dn = talloc_steal(list, list3->dn);
383         list->count = list3->count;
384         talloc_free(list3);
385
386         return 0;
387 }
388
389
390 /*
391   list union
392   list = list | list2
393   relies on the lists being sorted
394 */
395 static int list_union(struct ldb_context *ldb, 
396                       struct dn_list *list, const struct dn_list *list2)
397 {
398         unsigned int i;
399         char **d;
400         unsigned int count = list->count;
401
402         if (list->count == 0 && list2->count == 0) {
403                 /* 0 | 0 == 0 */
404                 return 0;
405         }
406
407         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
408         if (!d) {
409                 return -1;
410         }
411         list->dn = d;
412
413         for (i=0;i<list2->count;i++) {
414                 if (ldb_list_find(list2->dn[i], list->dn, count, 
415                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
416                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
417                         if (!list->dn[list->count]) {
418                                 return -1;
419                         }
420                         list->count++;
421                 }               
422         }
423
424         if (list->count != count) {
425                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
426         }
427
428         return 0;
429 }
430
431 static int ltdb_index_dn(struct ldb_module *module, 
432                          struct ldb_parse_tree *tree,
433                          const struct ldb_message *index_list,
434                          struct dn_list *list);
435
436
437 /*
438   OR two index results
439  */
440 static int ltdb_index_dn_or(struct ldb_module *module, 
441                             struct ldb_parse_tree *tree,
442                             const struct ldb_message *index_list,
443                             struct dn_list *list)
444 {
445         struct ldb_context *ldb = module->ldb;
446         unsigned int i;
447         int ret;
448         
449         ret = -1;
450         list->dn = NULL;
451         list->count = 0;
452
453         for (i=0;i<tree->u.list.num_elements;i++) {
454                 struct dn_list *list2;
455                 int v;
456
457                 list2 = talloc(module, struct dn_list);
458                 if (list2 == NULL) {
459                         return -1;
460                 }
461
462                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
463
464                 if (v == 0) {
465                         /* 0 || X == X */
466                         if (ret == -1) {
467                                 ret = 0;
468                         }
469                         talloc_free(list2);
470                         continue;
471                 }
472
473                 if (v == -1) {
474                         /* 1 || X == 1 */
475                         talloc_free(list->dn);
476                         talloc_free(list2);
477                         return -1;
478                 }
479
480                 if (ret == -1) {
481                         ret = 1;
482                         list->dn = talloc_steal(list, list2->dn);
483                         list->count = list2->count;
484                 } else {
485                         if (list_union(ldb, list, list2) == -1) {
486                                 talloc_free(list2);
487                                 return -1;
488                         }
489                         ret = 1;
490                 }
491                 talloc_free(list2);
492         }
493
494         if (list->count == 0) {
495                 return 0;
496         }
497
498         return ret;
499 }
500
501
502 /*
503   NOT an index results
504  */
505 static int ltdb_index_dn_not(struct ldb_module *module, 
506                              struct ldb_parse_tree *tree,
507                              const struct ldb_message *index_list,
508                              struct dn_list *list)
509 {
510         /* the only way to do an indexed not would be if we could
511            negate the not via another not or if we knew the total
512            number of database elements so we could know that the
513            existing expression covered the whole database. 
514            
515            instead, we just give up, and rely on a full index scan
516            (unless an outer & manages to reduce the list)
517         */
518         return -1;
519 }
520
521 /*
522   AND two index results
523  */
524 static int ltdb_index_dn_and(struct ldb_module *module, 
525                              struct ldb_parse_tree *tree,
526                              const struct ldb_message *index_list,
527                              struct dn_list *list)
528 {
529         struct ldb_context *ldb = module->ldb;
530         unsigned int i;
531         int ret;
532         
533         ret = -1;
534         list->dn = NULL;
535         list->count = 0;
536
537         for (i=0;i<tree->u.list.num_elements;i++) {
538                 struct dn_list *list2;
539                 int v;
540
541                 list2 = talloc(module, struct dn_list);
542                 if (list2 == NULL) {
543                         return -1;
544                 }
545
546                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
547
548                 if (v == 0) {
549                         /* 0 && X == 0 */
550                         talloc_free(list->dn);
551                         talloc_free(list2);
552                         return 0;
553                 }
554
555                 if (v == -1) {
556                         talloc_free(list2);
557                         continue;
558                 }
559
560                 if (ret == -1) {
561                         ret = 1;
562                         talloc_free(list->dn);
563                         list->dn = talloc_steal(list, list2->dn);
564                         list->count = list2->count;
565                 } else {
566                         if (list_intersect(ldb, list, list2) == -1) {
567                                 talloc_free(list2);
568                                 return -1;
569                         }
570                 }
571
572                 talloc_free(list2);
573
574                 if (list->count == 0) {
575                         talloc_free(list->dn);
576                         return 0;
577                 }
578         }
579
580         return ret;
581 }
582
583 /*
584   return a list of dn's that might match a indexed search or
585   -1 if an error. return 0 for no matches, or 1 for matches
586  */
587 static int ltdb_index_dn(struct ldb_module *module, 
588                          struct ldb_parse_tree *tree,
589                          const struct ldb_message *index_list,
590                          struct dn_list *list)
591 {
592         int ret = -1;
593
594         switch (tree->operation) {
595         case LDB_OP_AND:
596                 ret = ltdb_index_dn_and(module, tree, index_list, list);
597                 break;
598
599         case LDB_OP_OR:
600                 ret = ltdb_index_dn_or(module, tree, index_list, list);
601                 break;
602
603         case LDB_OP_NOT:
604                 ret = ltdb_index_dn_not(module, tree, index_list, list);
605                 break;
606
607         case LDB_OP_EQUALITY:
608                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
609                 break;
610
611         case LDB_OP_SUBSTRING:
612         case LDB_OP_GREATER:
613         case LDB_OP_LESS:
614         case LDB_OP_PRESENT:
615         case LDB_OP_APPROX:
616         case LDB_OP_EXTENDED:
617                 /* we can't index with fancy bitops yet */
618                 ret = -1;
619                 break;
620         }
621
622         return ret;
623 }
624
625 /*
626   filter a candidate dn_list from an indexed search into a set of results
627   extracting just the given attributes
628 */
629 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
630                             const struct ldb_dn *base,
631                             enum ldb_scope scope,
632                             const struct dn_list *dn_list, 
633                             const char * const attrs[], struct ldb_message ***res)
634 {
635         unsigned int i;
636         int count = 0;
637
638         for (i = 0; i < dn_list->count; i++) {
639                 struct ldb_message *msg;
640                 struct ldb_dn *dn;
641                 int ret;
642
643                 msg = talloc(module, struct ldb_message);
644                 if (msg == NULL) {
645                         return -1;
646                 }
647
648                 dn = ldb_dn_explode(msg, dn_list->dn[i]);
649                 if (dn == NULL) {
650                         talloc_free(msg);
651                         return -1;
652                 }
653
654                 ret = ltdb_search_dn1(module, dn, msg);
655                 talloc_free(dn);
656                 if (ret == 0) {
657                         /* the record has disappeared? yes, this can happen */
658                         talloc_free(msg);
659                         continue;
660                 }
661
662                 if (ret == -1) {
663                         /* an internal error */
664                         talloc_free(msg);
665                         return -1;
666                 }
667
668                 ret = 0;
669                 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
670                         ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
671                 }
672                 talloc_free(msg);
673                 if (ret != 0) {
674                         return -1;
675                 }
676         }
677
678         return count;
679 }
680
681 /*
682   search the database with a LDAP-like expression using indexes
683   returns -1 if an indexed search is not possible, in which
684   case the caller should call ltdb_search_full() 
685 */
686 int ltdb_search_indexed(struct ldb_module *module, 
687                         const struct ldb_dn *base,
688                         enum ldb_scope scope,
689                         struct ldb_parse_tree *tree,
690                         const char * const attrs[], struct ldb_message ***res)
691 {
692         struct ltdb_private *ltdb = module->private_data;
693         struct dn_list *dn_list;
694         int ret;
695
696         if (ltdb->cache->indexlist->num_elements == 0 && 
697             scope != LDB_SCOPE_BASE) {
698                 /* no index list? must do full search */
699                 return -1;
700         }
701
702         dn_list = talloc(module, struct dn_list);
703         if (dn_list == NULL) {
704                 return -1;
705         }
706
707         if (scope == LDB_SCOPE_BASE) {
708                 /* with BASE searches only one DN can match */
709                 dn_list->dn = talloc_array(dn_list, char *, 1);
710                 if (dn_list->dn == NULL) {
711                         ldb_oom(module->ldb);
712                         return -1;
713                 }
714                 dn_list->dn[0] = ldb_dn_linearize(dn_list, base);
715                 if (dn_list->dn[0] == NULL) {
716                         ldb_oom(module->ldb);
717                         return -1;
718                 }
719                 dn_list->count = 1;
720                 ret = 1;
721         } else {
722                 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
723         }
724
725         if (ret == 1) {
726                 /* we've got a candidate list - now filter by the full tree
727                    and extract the needed attributes */
728                 ret = ldb_index_filter(module, tree, base, scope, dn_list, 
729                                        attrs, res);
730         }
731
732         talloc_free(dn_list);
733
734         return ret;
735 }
736
737 /*
738   add a index element where this is the first indexed DN for this value
739 */
740 static int ltdb_index_add1_new(struct ldb_context *ldb, 
741                                struct ldb_message *msg,
742                                struct ldb_message_element *el,
743                                char *dn)
744 {
745         struct ldb_message_element *el2;
746
747         /* add another entry */
748         el2 = talloc_realloc(msg, msg->elements, 
749                                struct ldb_message_element, msg->num_elements+1);
750         if (!el2) {
751                 return -1;
752         }
753
754         msg->elements = el2;
755         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
756         if (!msg->elements[msg->num_elements].name) {
757                 return -1;
758         }
759         msg->elements[msg->num_elements].num_values = 0;
760         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
761         if (!msg->elements[msg->num_elements].values) {
762                 return -1;
763         }
764         msg->elements[msg->num_elements].values[0].length = strlen(dn);
765         msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
766         msg->elements[msg->num_elements].num_values = 1;
767         msg->num_elements++;
768
769         return 0;
770 }
771
772
773 /*
774   add a index element where this is not the first indexed DN for this
775   value
776 */
777 static int ltdb_index_add1_add(struct ldb_context *ldb, 
778                                struct ldb_message *msg,
779                                struct ldb_message_element *el,
780                                int idx,
781                                char *dn)
782 {
783         struct ldb_val *v2;
784         unsigned int i;
785
786         /* for multi-valued attributes we can end up with repeats */
787         for (i=0;i<msg->elements[idx].num_values;i++) {
788                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
789                         return 0;
790                 }
791         }
792
793         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
794                               struct ldb_val, 
795                               msg->elements[idx].num_values+1);
796         if (!v2) {
797                 return -1;
798         }
799         msg->elements[idx].values = v2;
800
801         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
802         msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
803         msg->elements[idx].num_values++;
804
805         return 0;
806 }
807
808 /*
809   add an index entry for one message element
810 */
811 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
812                            struct ldb_message_element *el, int v_idx)
813 {
814         struct ldb_context *ldb = module->ldb;
815         struct ldb_message *msg;
816         struct ldb_dn *dn_key;
817         int ret;
818         unsigned int i;
819
820         msg = talloc(module, struct ldb_message);
821         if (msg == NULL) {
822                 errno = ENOMEM;
823                 return -1;
824         }
825
826         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
827         if (!dn_key) {
828                 talloc_free(msg);
829                 errno = ENOMEM;
830                 return -1;
831         }
832         talloc_steal(msg, dn_key);
833
834         ret = ltdb_search_dn1(module, dn_key, msg);
835         if (ret == -1) {
836                 talloc_free(msg);
837                 return -1;
838         }
839
840         if (ret == 0) {
841                 msg->dn = dn_key;
842                 msg->num_elements = 0;
843                 msg->elements = NULL;
844         }
845
846         for (i=0;i<msg->num_elements;i++) {
847                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
848                         break;
849                 }
850         }
851
852         if (i == msg->num_elements) {
853                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
854         } else {
855                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
856         }
857
858         if (ret == 0) {
859                 ret = ltdb_store(module, msg, TDB_REPLACE);
860         }
861
862         talloc_free(msg);
863
864         return ret;
865 }
866
867 static int ltdb_index_add0(struct ldb_module *module, char *dn,
868                            struct ldb_message_element *elements, int num_el)
869 {
870         struct ltdb_private *ltdb = module->private_data;
871         int ret;
872         unsigned int i, j;
873
874         if (dn[0] == '@') {
875                 return 0;
876         }
877
878         if (ltdb->cache->indexlist->num_elements == 0) {
879                 /* no indexed fields */
880                 return 0;
881         }
882
883         for (i = 0; i < num_el; i++) {
884                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
885                                        NULL, LTDB_IDXATTR);
886                 if (ret == -1) {
887                         continue;
888                 }
889                 for (j = 0; j < elements[i].num_values; j++) {
890                         ret = ltdb_index_add1(module, dn, &elements[i], j);
891                         if (ret == -1) {
892                                 talloc_free(dn);
893                                 return -1;
894                         }
895                 }
896         }
897
898         return 0;
899 }
900
901 /*
902   add the index entries for a new record
903   return -1 on failure
904 */
905 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
906 {
907         struct ltdb_private *ltdb = module->private_data;
908         char *dn;
909         int ret;
910
911         dn = ldb_dn_linearize(ltdb, msg->dn);
912         if (dn == NULL) {
913                 return -1;
914         }
915
916         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
917
918         talloc_free(dn);
919
920         return ret;
921 }
922
923
924 /*
925   delete an index entry for one message element
926 */
927 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
928                          struct ldb_message_element *el, int v_idx)
929 {
930         struct ldb_context *ldb = module->ldb;
931         struct ldb_message *msg;
932         struct ldb_dn *dn_key;
933         int ret, i;
934         unsigned int j;
935
936         if (dn[0] == '@') {
937                 return 0;
938         }
939
940         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
941         if (!dn_key) {
942                 return -1;
943         }
944
945         msg = talloc(dn_key, struct ldb_message);
946         if (msg == NULL) {
947                 talloc_free(dn_key);
948                 return -1;
949         }
950
951         ret = ltdb_search_dn1(module, dn_key, msg);
952         if (ret == -1) {
953                 talloc_free(dn_key);
954                 return -1;
955         }
956
957         if (ret == 0) {
958                 /* it wasn't indexed. Did we have an earlier error? If we did then
959                    its gone now */
960                 talloc_free(dn_key);
961                 return 0;
962         }
963
964         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
965         if (i == -1) {
966                 ldb_debug(ldb, LDB_DEBUG_ERROR,
967                                 "ERROR: dn %s not found in %s\n", dn,
968                                 ldb_dn_linearize(dn_key, dn_key));
969                 /* it ain't there. hmmm */
970                 talloc_free(dn_key);
971                 return 0;
972         }
973
974         if (j != msg->elements[i].num_values - 1) {
975                 memmove(&msg->elements[i].values[j], 
976                         &msg->elements[i].values[j+1], 
977                         (msg->elements[i].num_values-(j+1)) * 
978                         sizeof(msg->elements[i].values[0]));
979         }
980         msg->elements[i].num_values--;
981
982         if (msg->elements[i].num_values == 0) {
983                 ret = ltdb_delete_noindex(module, dn_key);
984         } else {
985                 ret = ltdb_store(module, msg, TDB_REPLACE);
986         }
987
988         talloc_free(dn_key);
989
990         return ret;
991 }
992
993 /*
994   delete the index entries for a record
995   return -1 on failure
996 */
997 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
998 {
999         struct ltdb_private *ltdb = module->private_data;
1000         int ret;
1001         char *dn;
1002         unsigned int i, j;
1003
1004         if (ldb_dn_is_special(msg->dn)) {
1005                 return 0;
1006         }
1007
1008         dn = ldb_dn_linearize(ltdb, msg->dn);
1009         if (dn == NULL) {
1010                 return -1;
1011         }
1012
1013         /* find the list of indexed fields */   
1014         if (ltdb->cache->indexlist->num_elements == 0) {
1015                 /* no indexed fields */
1016                 return 0;
1017         }
1018
1019         for (i = 0; i < msg->num_elements; i++) {
1020                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1021                                        NULL, LTDB_IDXATTR);
1022                 if (ret == -1) {
1023                         continue;
1024                 }
1025                 for (j = 0; j < msg->elements[i].num_values; j++) {
1026                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1027                         if (ret == -1) {
1028                                 talloc_free(dn);
1029                                 return -1;
1030                         }
1031                 }
1032         }
1033
1034         talloc_free(dn);
1035         return 0;
1036 }
1037
1038
1039 /*
1040   traversal function that deletes all @INDEX records
1041 */
1042 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1043 {
1044         const char *dn = "DN=" LTDB_INDEX ":";
1045         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1046                 return tdb_delete(tdb, key);
1047         }
1048         return 0;
1049 }
1050
1051 /*
1052   traversal function that adds @INDEX records during a re index
1053 */
1054 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1055 {
1056         struct ldb_module *module = state;
1057         struct ldb_message *msg;
1058         char *dn = NULL;
1059         int ret;
1060         TDB_DATA key2;
1061
1062         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1063             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1064                 return 0;
1065         }
1066
1067         msg = talloc(module, struct ldb_message);
1068         if (msg == NULL) {
1069                 return -1;
1070         }
1071
1072         ret = ltdb_unpack_data(module, &data, msg);
1073         if (ret != 0) {
1074                 talloc_free(msg);
1075                 return -1;
1076         }
1077
1078         /* check if the DN key has changed, perhaps due to the 
1079            case insensitivity of an element changing */
1080         key2 = ltdb_key(module, msg->dn);
1081         if (key2.dptr == NULL) {
1082                 /* probably a corrupt record ... darn */
1083                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1084                                                         ldb_dn_linearize(msg, msg->dn));
1085                 talloc_free(msg);
1086                 return 0;
1087         }
1088         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1089                 tdb_delete(tdb, key);
1090                 tdb_store(tdb, key2, data, 0);
1091         }
1092         talloc_free(key2.dptr);
1093
1094         if (msg->dn == NULL) {
1095                 dn = (char *)key.dptr + 3;
1096         } else {
1097                 dn = ldb_dn_linearize(msg->dn, msg->dn);
1098         }
1099
1100         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1101
1102         talloc_free(msg);
1103
1104         return ret;
1105 }
1106
1107 /*
1108   force a complete reindex of the database
1109 */
1110 int ltdb_reindex(struct ldb_module *module)
1111 {
1112         struct ltdb_private *ltdb = module->private_data;
1113         int ret;
1114
1115         if (ltdb_cache_reload(module) != 0) {
1116                 return -1;
1117         }
1118
1119         /* first traverse the database deleting any @INDEX records */
1120         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1121         if (ret == -1) {
1122                 return -1;
1123         }
1124
1125         /* now traverse adding any indexes for normal LDB records */
1126         ret = tdb_traverse(ltdb->tdb, re_index, module);
1127         if (ret == -1) {
1128                 return -1;
1129         }
1130
1131         return 0;
1132 }