r20101: Also rename a variable now that the unused parameter is gone
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 /* the following cast looks strange, but is
65                  correct. The key to understanding it is that base_p
66                  is a pointer to an array of pointers, so we have to
67                  dereference it after casting to void **. The strange
68                  const in the middle gives us the right type of pointer
69                  after the dereference  (tridge) */
70                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71                 if (r == 0) {
72                         /* scan back for first element */
73                         while (test_i > 0 &&
74                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
75                                 test_i--;
76                         }
77                         return test_i;
78                 }
79                 if (r < 0) {
80                         if (test_i == 0) {
81                                 return -1;
82                         }
83                         max_i = test_i - 1;
84                 }
85                 if (r > 0) {
86                         min_i = test_i + 1;
87                 }
88         }
89
90         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
91                 return min_i;
92         }
93
94         return -1;
95 }
96
97 struct dn_list {
98         unsigned int count;
99         char **dn;
100 };
101
102 /*
103   return the dn key to be used for an index
104   caller frees
105 */
106 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
107                                      const char *attr, const struct ldb_val *value)
108 {
109         struct ldb_dn *ret;
110         struct ldb_val v;
111         const struct ldb_attrib_handler *h;
112         char *attr_folded;
113         int r;
114
115         attr_folded = ldb_attr_casefold(ldb, attr);
116         if (!attr_folded) {
117                 return NULL;
118         }
119
120         h = ldb_attrib_handler(ldb, attr);
121         r = h->canonicalise_fn(ldb, ldb, value, &v);
122         if (r != LDB_SUCCESS) {
123                 const char *errstr = ldb_errstring(ldb);
124                 /* canonicalisation can be refused. For example, 
125                    a attribute that takes wildcards will refuse to canonicalise
126                    if the value contains a wildcard */
127                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
128                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
129                 talloc_free(attr_folded);
130                 return NULL;
131         }
132         if (ldb_should_b64_encode(&v)) {
133                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
134                 if (!vstr) return NULL;
135                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
136                 talloc_free(vstr);
137         } else {
138                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
139         }
140
141         if (v.data != value->data) {
142                 talloc_free(v.data);
143         }
144         talloc_free(attr_folded);
145
146         return ret;
147 }
148
149 /*
150   see if a attribute value is in the list of indexed attributes
151 */
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153                             unsigned int *v_idx, const char *key)
154 {
155         unsigned int i, j;
156         for (i=0;i<msg->num_elements;i++) {
157                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158                         const struct ldb_message_element *el = 
159                                 &msg->elements[i];
160                         for (j=0;j<el->num_values;j++) {
161                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
162                                         if (v_idx) {
163                                                 *v_idx = j;
164                                         }
165                                         return i;
166                                 }
167                         }
168                 }
169         }
170         return -1;
171 }
172
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
175 {
176         return strcmp(*s1, *s2);
177 }
178
179 /*
180   return a list of dn's that might match a simple indexed search or
181  */
182 static int ltdb_index_dn_simple(struct ldb_module *module, 
183                                 const struct ldb_parse_tree *tree,
184                                 const struct ldb_message *index_list,
185                                 struct dn_list *list)
186 {
187         struct ldb_context *ldb = module->ldb;
188         struct ldb_dn *dn;
189         int ret;
190         unsigned int i, j;
191         struct ldb_message *msg;
192
193         list->count = 0;
194         list->dn = NULL;
195
196         /* if the attribute isn't in the list of indexed attributes then
197            this node needs a full search */
198         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
199                 return -1;
200         }
201
202         /* the attribute is indexed. Pull the list of DNs that match the 
203            search criterion */
204         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
205         if (!dn) return -1;
206
207         msg = talloc(list, struct ldb_message);
208         if (msg == NULL) {
209                 return -1;
210         }
211
212         ret = ltdb_search_dn1(module, dn, msg);
213         talloc_free(dn);
214         if (ret == 0 || ret == -1) {
215                 return ret;
216         }
217
218         for (i=0;i<msg->num_elements;i++) {
219                 struct ldb_message_element *el;
220
221                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
222                         continue;
223                 }
224
225                 el = &msg->elements[i];
226
227                 list->dn = talloc_array(list, char *, el->num_values);
228                 if (!list->dn) {
229                         talloc_free(msg);
230                         return -1;
231                 }
232
233                 for (j=0;j<el->num_values;j++) {
234                         list->dn[list->count] = 
235                                 talloc_strdup(list->dn, (char *)el->values[j].data);
236                         if (!list->dn[list->count]) {
237                                 talloc_free(msg);
238                                 return -1;
239                         }
240                         list->count++;
241                 }
242         }
243
244         talloc_free(msg);
245
246         if (list->count > 1) {
247                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
248         }
249
250         return 1;
251 }
252
253
254 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
255
256 /*
257   return a list of dn's that might match a simple indexed search on
258   the special objectclass attribute
259  */
260 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
261                                      const struct ldb_parse_tree *tree,
262                                      const struct ldb_message *index_list,
263                                      struct dn_list *list)
264 {
265         struct ldb_context *ldb = module->ldb;
266         unsigned int i;
267         int ret;
268         const char *target = (const char *)tree->u.equality.value.data;
269         const char **subclasses;
270
271         list->count = 0;
272         list->dn = NULL;
273
274         ret = ltdb_index_dn_simple(module, tree, index_list, list);
275
276         subclasses = ldb_subclass_list(module->ldb, target);
277
278         if (subclasses == NULL) {
279                 return ret;
280         }
281
282         for (i=0;subclasses[i];i++) {
283                 struct ldb_parse_tree tree2;
284                 struct dn_list *list2;
285                 tree2.operation = LDB_OP_EQUALITY;
286                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
287                 if (!tree2.u.equality.attr) {
288                         return -1;
289                 }
290                 tree2.u.equality.value.data = 
291                         (uint8_t *)talloc_strdup(list, subclasses[i]);
292                 if (tree2.u.equality.value.data == NULL) {
293                         return -1;                      
294                 }
295                 tree2.u.equality.value.length = strlen(subclasses[i]);
296                 list2 = talloc(list, struct dn_list);
297                 if (list2 == NULL) {
298                         talloc_free(tree2.u.equality.value.data);
299                         return -1;
300                 }
301                 if (ltdb_index_dn_objectclass(module, &tree2, 
302                                               index_list, list2) == 1) {
303                         if (list->count == 0) {
304                                 *list = *list2;
305                                 ret = 1;
306                         } else {
307                                 list_union(ldb, list, list2);
308                                 talloc_free(list2);
309                         }
310                 }
311                 talloc_free(tree2.u.equality.value.data);
312         }
313
314         return ret;
315 }
316
317 /*
318   return a list of dn's that might match a leaf indexed search
319  */
320 static int ltdb_index_dn_leaf(struct ldb_module *module, 
321                               const struct ldb_parse_tree *tree,
322                               const struct ldb_message *index_list,
323                               struct dn_list *list)
324 {
325         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
326                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
327         }
328         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
329                 list->dn = talloc_array(list, char *, 1);
330                 if (list->dn == NULL) {
331                         ldb_oom(module->ldb);
332                         return -1;
333                 }
334                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
335                 if (list->dn[0] == NULL) {
336                         ldb_oom(module->ldb);
337                         return -1;
338                 }
339                 list->count = 1;
340                 return 1;
341         }
342         return ltdb_index_dn_simple(module, tree, index_list, list);
343 }
344
345
346 /*
347   list intersection
348   list = list & list2
349   relies on the lists being sorted
350 */
351 static int list_intersect(struct ldb_context *ldb,
352                           struct dn_list *list, const struct dn_list *list2)
353 {
354         struct dn_list *list3;
355         unsigned int i;
356
357         if (list->count == 0 || list2->count == 0) {
358                 /* 0 & X == 0 */
359                 return 0;
360         }
361
362         list3 = talloc(ldb, struct dn_list);
363         if (list3 == NULL) {
364                 return -1;
365         }
366
367         list3->dn = talloc_array(list3, char *, list->count);
368         if (!list3->dn) {
369                 talloc_free(list3);
370                 return -1;
371         }
372         list3->count = 0;
373
374         for (i=0;i<list->count;i++) {
375                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
376                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
377                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
378                         list3->count++;
379                 } else {
380                         talloc_free(list->dn[i]);
381                 }               
382         }
383
384         talloc_free(list->dn);
385         list->dn = talloc_move(list, &list3->dn);
386         list->count = list3->count;
387         talloc_free(list3);
388
389         return 0;
390 }
391
392
393 /*
394   list union
395   list = list | list2
396   relies on the lists being sorted
397 */
398 static int list_union(struct ldb_context *ldb, 
399                       struct dn_list *list, const struct dn_list *list2)
400 {
401         unsigned int i;
402         char **d;
403         unsigned int count = list->count;
404
405         if (list->count == 0 && list2->count == 0) {
406                 /* 0 | 0 == 0 */
407                 return 0;
408         }
409
410         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
411         if (!d) {
412                 return -1;
413         }
414         list->dn = d;
415
416         for (i=0;i<list2->count;i++) {
417                 if (ldb_list_find(list2->dn[i], list->dn, count, 
418                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
419                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
420                         if (!list->dn[list->count]) {
421                                 return -1;
422                         }
423                         list->count++;
424                 }               
425         }
426
427         if (list->count != count) {
428                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
429         }
430
431         return 0;
432 }
433
434 static int ltdb_index_dn(struct ldb_module *module, 
435                          const struct ldb_parse_tree *tree,
436                          const struct ldb_message *index_list,
437                          struct dn_list *list);
438
439
440 /*
441   OR two index results
442  */
443 static int ltdb_index_dn_or(struct ldb_module *module, 
444                             const struct ldb_parse_tree *tree,
445                             const struct ldb_message *index_list,
446                             struct dn_list *list)
447 {
448         struct ldb_context *ldb = module->ldb;
449         unsigned int i;
450         int ret;
451         
452         ret = -1;
453         list->dn = NULL;
454         list->count = 0;
455
456         for (i=0;i<tree->u.list.num_elements;i++) {
457                 struct dn_list *list2;
458                 int v;
459
460                 list2 = talloc(module, struct dn_list);
461                 if (list2 == NULL) {
462                         return -1;
463                 }
464
465                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
466
467                 if (v == 0) {
468                         /* 0 || X == X */
469                         if (ret == -1) {
470                                 ret = 0;
471                         }
472                         talloc_free(list2);
473                         continue;
474                 }
475
476                 if (v == -1) {
477                         /* 1 || X == 1 */
478                         talloc_free(list->dn);
479                         talloc_free(list2);
480                         return -1;
481                 }
482
483                 if (ret == -1) {
484                         ret = 1;
485                         list->dn = talloc_move(list, &list2->dn);
486                         list->count = list2->count;
487                 } else {
488                         if (list_union(ldb, list, list2) == -1) {
489                                 talloc_free(list2);
490                                 return -1;
491                         }
492                         ret = 1;
493                 }
494                 talloc_free(list2);
495         }
496
497         if (list->count == 0) {
498                 return 0;
499         }
500
501         return ret;
502 }
503
504
505 /*
506   NOT an index results
507  */
508 static int ltdb_index_dn_not(struct ldb_module *module, 
509                              const struct ldb_parse_tree *tree,
510                              const struct ldb_message *index_list,
511                              struct dn_list *list)
512 {
513         /* the only way to do an indexed not would be if we could
514            negate the not via another not or if we knew the total
515            number of database elements so we could know that the
516            existing expression covered the whole database. 
517            
518            instead, we just give up, and rely on a full index scan
519            (unless an outer & manages to reduce the list)
520         */
521         return -1;
522 }
523
524 /*
525   AND two index results
526  */
527 static int ltdb_index_dn_and(struct ldb_module *module, 
528                              const struct ldb_parse_tree *tree,
529                              const struct ldb_message *index_list,
530                              struct dn_list *list)
531 {
532         struct ldb_context *ldb = module->ldb;
533         unsigned int i;
534         int ret;
535         
536         ret = -1;
537         list->dn = NULL;
538         list->count = 0;
539
540         for (i=0;i<tree->u.list.num_elements;i++) {
541                 struct dn_list *list2;
542                 int v;
543
544                 list2 = talloc(module, struct dn_list);
545                 if (list2 == NULL) {
546                         return -1;
547                 }
548
549                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
550
551                 if (v == 0) {
552                         /* 0 && X == 0 */
553                         talloc_free(list->dn);
554                         talloc_free(list2);
555                         return 0;
556                 }
557
558                 if (v == -1) {
559                         talloc_free(list2);
560                         continue;
561                 }
562
563                 if (ret == -1) {
564                         ret = 1;
565                         talloc_free(list->dn);
566                         list->dn = talloc_move(list, &list2->dn);
567                         list->count = list2->count;
568                 } else {
569                         if (list_intersect(ldb, list, list2) == -1) {
570                                 talloc_free(list2);
571                                 return -1;
572                         }
573                 }
574
575                 talloc_free(list2);
576
577                 if (list->count == 0) {
578                         talloc_free(list->dn);
579                         return 0;
580                 }
581         }
582
583         return ret;
584 }
585
586 /*
587   return a list of dn's that might match a indexed search or
588   -1 if an error. return 0 for no matches, or 1 for matches
589  */
590 static int ltdb_index_dn(struct ldb_module *module, 
591                          const struct ldb_parse_tree *tree,
592                          const struct ldb_message *index_list,
593                          struct dn_list *list)
594 {
595         int ret = -1;
596
597         switch (tree->operation) {
598         case LDB_OP_AND:
599                 ret = ltdb_index_dn_and(module, tree, index_list, list);
600                 break;
601
602         case LDB_OP_OR:
603                 ret = ltdb_index_dn_or(module, tree, index_list, list);
604                 break;
605
606         case LDB_OP_NOT:
607                 ret = ltdb_index_dn_not(module, tree, index_list, list);
608                 break;
609
610         case LDB_OP_EQUALITY:
611                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
612                 break;
613
614         case LDB_OP_SUBSTRING:
615         case LDB_OP_GREATER:
616         case LDB_OP_LESS:
617         case LDB_OP_PRESENT:
618         case LDB_OP_APPROX:
619         case LDB_OP_EXTENDED:
620                 /* we can't index with fancy bitops yet */
621                 ret = -1;
622                 break;
623         }
624
625         return ret;
626 }
627
628 /*
629   filter a candidate dn_list from an indexed search into a set of results
630   extracting just the given attributes
631 */
632 static int ltdb_index_filter(const struct dn_list *dn_list, 
633                              struct ldb_handle *handle)
634 {
635         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
636         struct ldb_reply *ares = NULL;
637         unsigned int i;
638
639         for (i = 0; i < dn_list->count; i++) {
640                 struct ldb_dn *dn;
641                 int ret;
642
643                 ares = talloc_zero(ac, struct ldb_reply);
644                 if (!ares) {
645                         handle->status = LDB_ERR_OPERATIONS_ERROR;
646                         handle->state = LDB_ASYNC_DONE;
647                         return LDB_ERR_OPERATIONS_ERROR;
648                 }
649
650                 ares->message = ldb_msg_new(ares);
651                 if (!ares->message) {
652                         handle->status = LDB_ERR_OPERATIONS_ERROR;
653                         handle->state = LDB_ASYNC_DONE;
654                         talloc_free(ares);
655                         return LDB_ERR_OPERATIONS_ERROR;
656                 }
657
658
659                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
660                 if (dn == NULL) {
661                         talloc_free(ares);
662                         return LDB_ERR_OPERATIONS_ERROR;
663                 }
664
665                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
666                 talloc_free(dn);
667                 if (ret == 0) {
668                         /* the record has disappeared? yes, this can happen */
669                         talloc_free(ares);
670                         continue;
671                 }
672
673                 if (ret == -1) {
674                         /* an internal error */
675                         talloc_free(ares);
676                         return LDB_ERR_OPERATIONS_ERROR;
677                 }
678
679                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
680                         talloc_free(ares);
681                         continue;
682                 }
683
684                 /* filter the attributes that the user wants */
685                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
686
687                 if (ret == -1) {
688                         handle->status = LDB_ERR_OPERATIONS_ERROR;
689                         handle->state = LDB_ASYNC_DONE;
690                         talloc_free(ares);
691                         return LDB_ERR_OPERATIONS_ERROR;
692                 }
693
694                 ares->type = LDB_REPLY_ENTRY;
695                 handle->state = LDB_ASYNC_PENDING;
696                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
697
698                 if (handle->status != LDB_SUCCESS) {
699                         handle->state = LDB_ASYNC_DONE;
700                         return handle->status;
701                 }
702         }
703
704         return LDB_SUCCESS;
705 }
706
707 /*
708   search the database with a LDAP-like expression using indexes
709   returns -1 if an indexed search is not possible, in which
710   case the caller should call ltdb_search_full() 
711 */
712 int ltdb_search_indexed(struct ldb_handle *handle)
713 {
714         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
715         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
716         struct dn_list *dn_list;
717         int ret;
718
719         if (ltdb->cache->indexlist->num_elements == 0 && 
720             ac->scope != LDB_SCOPE_BASE) {
721                 /* no index list? must do full search */
722                 return -1;
723         }
724
725         dn_list = talloc(handle, struct dn_list);
726         if (dn_list == NULL) {
727                 return -1;
728         }
729
730         if (ac->scope == LDB_SCOPE_BASE) {
731                 /* with BASE searches only one DN can match */
732                 dn_list->dn = talloc_array(dn_list, char *, 1);
733                 if (dn_list->dn == NULL) {
734                         ldb_oom(ac->module->ldb);
735                         return -1;
736                 }
737                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
738                 if (dn_list->dn[0] == NULL) {
739                         ldb_oom(ac->module->ldb);
740                         return -1;
741                 }
742                 dn_list->count = 1;
743                 ret = 1;
744         } else {
745                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
746         }
747
748         if (ret == 1) {
749                 /* we've got a candidate list - now filter by the full tree
750                    and extract the needed attributes */
751                 ret = ltdb_index_filter(dn_list, handle);
752                 handle->status = ret;
753                 handle->state = LDB_ASYNC_DONE;
754         }
755
756         talloc_free(dn_list);
757
758         return ret;
759 }
760
761 /*
762   add a index element where this is the first indexed DN for this value
763 */
764 static int ltdb_index_add1_new(struct ldb_context *ldb, 
765                                struct ldb_message *msg,
766                                const char *dn)
767 {
768         struct ldb_message_element *el;
769
770         /* add another entry */
771         el = talloc_realloc(msg, msg->elements, 
772                                struct ldb_message_element, msg->num_elements+1);
773         if (!el) {
774                 return -1;
775         }
776
777         msg->elements = el;
778         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
779         if (!msg->elements[msg->num_elements].name) {
780                 return -1;
781         }
782         msg->elements[msg->num_elements].num_values = 0;
783         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
784         if (!msg->elements[msg->num_elements].values) {
785                 return -1;
786         }
787         msg->elements[msg->num_elements].values[0].length = strlen(dn);
788         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
789         msg->elements[msg->num_elements].num_values = 1;
790         msg->num_elements++;
791
792         return 0;
793 }
794
795
796 /*
797   add a index element where this is not the first indexed DN for this
798   value
799 */
800 static int ltdb_index_add1_add(struct ldb_context *ldb, 
801                                struct ldb_message *msg,
802                                int idx,
803                                const char *dn)
804 {
805         struct ldb_val *v2;
806         unsigned int i;
807
808         /* for multi-valued attributes we can end up with repeats */
809         for (i=0;i<msg->elements[idx].num_values;i++) {
810                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
811                         return 0;
812                 }
813         }
814
815         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
816                               struct ldb_val, 
817                               msg->elements[idx].num_values+1);
818         if (!v2) {
819                 return -1;
820         }
821         msg->elements[idx].values = v2;
822
823         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
824         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
825         msg->elements[idx].num_values++;
826
827         return 0;
828 }
829
830 /*
831   add an index entry for one message element
832 */
833 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
834                            struct ldb_message_element *el, int v_idx)
835 {
836         struct ldb_context *ldb = module->ldb;
837         struct ldb_message *msg;
838         struct ldb_dn *dn_key;
839         int ret;
840         unsigned int i;
841
842         msg = talloc(module, struct ldb_message);
843         if (msg == NULL) {
844                 errno = ENOMEM;
845                 return -1;
846         }
847
848         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
849         if (!dn_key) {
850                 talloc_free(msg);
851                 return -1;
852         }
853         talloc_steal(msg, dn_key);
854
855         ret = ltdb_search_dn1(module, dn_key, msg);
856         if (ret == -1) {
857                 talloc_free(msg);
858                 return -1;
859         }
860
861         if (ret == 0) {
862                 msg->dn = dn_key;
863                 msg->num_elements = 0;
864                 msg->elements = NULL;
865         }
866
867         for (i=0;i<msg->num_elements;i++) {
868                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
869                         break;
870                 }
871         }
872
873         if (i == msg->num_elements) {
874                 ret = ltdb_index_add1_new(ldb, msg, dn);
875         } else {
876                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
877         }
878
879         if (ret == 0) {
880                 ret = ltdb_store(module, msg, TDB_REPLACE);
881         }
882
883         talloc_free(msg);
884
885         return ret;
886 }
887
888 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
889                            struct ldb_message_element *elements, int num_el)
890 {
891         struct ltdb_private *ltdb = module->private_data;
892         int ret;
893         unsigned int i, j;
894
895         if (dn[0] == '@') {
896                 return 0;
897         }
898
899         if (ltdb->cache->indexlist->num_elements == 0) {
900                 /* no indexed fields */
901                 return 0;
902         }
903
904         for (i = 0; i < num_el; i++) {
905                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
906                                        NULL, LTDB_IDXATTR);
907                 if (ret == -1) {
908                         continue;
909                 }
910                 for (j = 0; j < elements[i].num_values; j++) {
911                         ret = ltdb_index_add1(module, dn, &elements[i], j);
912                         if (ret == -1) {
913                                 return -1;
914                         }
915                 }
916         }
917
918         return 0;
919 }
920
921 /*
922   add the index entries for a new record
923   return -1 on failure
924 */
925 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
926 {
927         const char *dn;
928         int ret;
929
930         dn = ldb_dn_get_linearized(msg->dn);
931         if (dn == NULL) {
932                 return -1;
933         }
934
935         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
936
937         return ret;
938 }
939
940
941 /*
942   delete an index entry for one message element
943 */
944 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
945                          struct ldb_message_element *el, int v_idx)
946 {
947         struct ldb_context *ldb = module->ldb;
948         struct ldb_message *msg;
949         struct ldb_dn *dn_key;
950         int ret, i;
951         unsigned int j;
952
953         if (dn[0] == '@') {
954                 return 0;
955         }
956
957         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
958         if (!dn_key) {
959                 return -1;
960         }
961
962         msg = talloc(dn_key, struct ldb_message);
963         if (msg == NULL) {
964                 talloc_free(dn_key);
965                 return -1;
966         }
967
968         ret = ltdb_search_dn1(module, dn_key, msg);
969         if (ret == -1) {
970                 talloc_free(dn_key);
971                 return -1;
972         }
973
974         if (ret == 0) {
975                 /* it wasn't indexed. Did we have an earlier error? If we did then
976                    its gone now */
977                 talloc_free(dn_key);
978                 return 0;
979         }
980
981         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
982         if (i == -1) {
983                 ldb_debug(ldb, LDB_DEBUG_ERROR,
984                                 "ERROR: dn %s not found in %s\n", dn,
985                                 ldb_dn_get_linearized(dn_key));
986                 /* it ain't there. hmmm */
987                 talloc_free(dn_key);
988                 return 0;
989         }
990
991         if (j != msg->elements[i].num_values - 1) {
992                 memmove(&msg->elements[i].values[j], 
993                         &msg->elements[i].values[j+1], 
994                         (msg->elements[i].num_values-(j+1)) * 
995                         sizeof(msg->elements[i].values[0]));
996         }
997         msg->elements[i].num_values--;
998
999         if (msg->elements[i].num_values == 0) {
1000                 ret = ltdb_delete_noindex(module, dn_key);
1001         } else {
1002                 ret = ltdb_store(module, msg, TDB_REPLACE);
1003         }
1004
1005         talloc_free(dn_key);
1006
1007         return ret;
1008 }
1009
1010 /*
1011   delete the index entries for a record
1012   return -1 on failure
1013 */
1014 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1015 {
1016         struct ltdb_private *ltdb = module->private_data;
1017         int ret;
1018         const char *dn;
1019         unsigned int i, j;
1020
1021         /* find the list of indexed fields */   
1022         if (ltdb->cache->indexlist->num_elements == 0) {
1023                 /* no indexed fields */
1024                 return 0;
1025         }
1026
1027         if (ldb_dn_is_special(msg->dn)) {
1028                 return 0;
1029         }
1030
1031         dn = ldb_dn_get_linearized(msg->dn);
1032         if (dn == NULL) {
1033                 return -1;
1034         }
1035
1036         for (i = 0; i < msg->num_elements; i++) {
1037                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1038                                        NULL, LTDB_IDXATTR);
1039                 if (ret == -1) {
1040                         continue;
1041                 }
1042                 for (j = 0; j < msg->elements[i].num_values; j++) {
1043                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1044                         if (ret == -1) {
1045                                 return -1;
1046                         }
1047                 }
1048         }
1049
1050         return 0;
1051 }
1052
1053
1054 /*
1055   traversal function that deletes all @INDEX records
1056 */
1057 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1058 {
1059         const char *dn = "DN=" LTDB_INDEX ":";
1060         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1061                 return tdb_delete(tdb, key);
1062         }
1063         return 0;
1064 }
1065
1066 /*
1067   traversal function that adds @INDEX records during a re index
1068 */
1069 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1070 {
1071         struct ldb_module *module = state;
1072         struct ldb_message *msg;
1073         const char *dn = NULL;
1074         int ret;
1075         TDB_DATA key2;
1076
1077         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1078             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1079                 return 0;
1080         }
1081
1082         msg = talloc(module, struct ldb_message);
1083         if (msg == NULL) {
1084                 return -1;
1085         }
1086
1087         ret = ltdb_unpack_data(module, &data, msg);
1088         if (ret != 0) {
1089                 talloc_free(msg);
1090                 return -1;
1091         }
1092
1093         /* check if the DN key has changed, perhaps due to the 
1094            case insensitivity of an element changing */
1095         key2 = ltdb_key(module, msg->dn);
1096         if (key2.dptr == NULL) {
1097                 /* probably a corrupt record ... darn */
1098                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1099                                                         ldb_dn_get_linearized(msg->dn));
1100                 talloc_free(msg);
1101                 return 0;
1102         }
1103         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1104                 tdb_delete(tdb, key);
1105                 tdb_store(tdb, key2, data, 0);
1106         }
1107         talloc_free(key2.dptr);
1108
1109         if (msg->dn == NULL) {
1110                 dn = (char *)key.dptr + 3;
1111         } else {
1112                 dn = ldb_dn_get_linearized(msg->dn);
1113         }
1114
1115         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1116
1117         talloc_free(msg);
1118
1119         return ret;
1120 }
1121
1122 /*
1123   force a complete reindex of the database
1124 */
1125 int ltdb_reindex(struct ldb_module *module)
1126 {
1127         struct ltdb_private *ltdb = module->private_data;
1128         int ret;
1129
1130         if (ltdb_cache_reload(module) != 0) {
1131                 return -1;
1132         }
1133
1134         /* first traverse the database deleting any @INDEX records */
1135         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1136         if (ret == -1) {
1137                 return -1;
1138         }
1139
1140         /* now traverse adding any indexes for normal LDB records */
1141         ret = tdb_traverse(ltdb->tdb, re_index, module);
1142         if (ret == -1) {
1143                 return -1;
1144         }
1145
1146         return 0;
1147 }