r23798: updated old Temple Place FSF addresses to new URL
[ira/wip.git] / source3 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "includes.h"
35 #include "ldb/include/includes.h"
36
37 #include "ldb/ldb_tdb/ldb_tdb.h"
38
39 /*
40   find an element in a list, using the given comparison function and
41   assuming that the list is already sorted using comp_fn
42
43   return -1 if not found, or the index of the first occurance of needle if found
44 */
45 static int ldb_list_find(const void *needle, 
46                          const void *base, size_t nmemb, size_t size, 
47                          comparison_fn_t comp_fn)
48 {
49         const char *base_p = (const char *)base;
50         size_t min_i, max_i, test_i;
51
52         if (nmemb == 0) {
53                 return -1;
54         }
55
56         min_i = 0;
57         max_i = nmemb-1;
58
59         while (min_i < max_i) {
60                 int r;
61
62                 test_i = (min_i + max_i) / 2;
63                 /* the following cast looks strange, but is
64                  correct. The key to understanding it is that base_p
65                  is a pointer to an array of pointers, so we have to
66                  dereference it after casting to void **. The strange
67                  const in the middle gives us the right type of pointer
68                  after the dereference  (tridge) */
69                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
70                 if (r == 0) {
71                         /* scan back for first element */
72                         while (test_i > 0 &&
73                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
74                                 test_i--;
75                         }
76                         return test_i;
77                 }
78                 if (r < 0) {
79                         if (test_i == 0) {
80                                 return -1;
81                         }
82                         max_i = test_i - 1;
83                 }
84                 if (r > 0) {
85                         min_i = test_i + 1;
86                 }
87         }
88
89         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
90                 return min_i;
91         }
92
93         return -1;
94 }
95
96 struct dn_list {
97         unsigned int count;
98         char **dn;
99 };
100
101 /*
102   return the dn key to be used for an index
103   caller frees
104 */
105 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
106                         const char *attr, const struct ldb_val *value)
107 {
108         struct ldb_dn *ret;
109         char *dn;
110         struct ldb_val v;
111         const struct ldb_attrib_handler *h;
112         char *attr_folded;
113
114         attr_folded = ldb_attr_casefold(ldb, attr);
115         if (!attr_folded) {
116                 return NULL;
117         }
118
119         h = ldb_attrib_handler(ldb, attr);
120         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
121                 /* canonicalisation can be refused. For example, 
122                    a attribute that takes wildcards will refuse to canonicalise
123                    if the value contains a wildcard */
124                 talloc_free(attr_folded);
125                 return NULL;
126         }
127         if (ldb_should_b64_encode(&v)) {
128                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
129                 if (!vstr) return NULL;
130                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
131                 talloc_free(vstr);
132                 if (v.data != value->data) {
133                         talloc_free(v.data);
134                 }
135                 talloc_free(attr_folded);
136                 if (dn == NULL) return NULL;
137                 goto done;
138         }
139
140         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
141                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
142
143         if (v.data != value->data) {
144                 talloc_free(v.data);
145         }
146         talloc_free(attr_folded);
147
148 done:
149         ret = ldb_dn_explode(ldb, dn);
150         talloc_free(dn);
151         return ret;
152 }
153
154 /*
155   see if a attribute value is in the list of indexed attributes
156 */
157 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
158                             unsigned int *v_idx, const char *key)
159 {
160         unsigned int i, j;
161         for (i=0;i<msg->num_elements;i++) {
162                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
163                         const struct ldb_message_element *el = 
164                                 &msg->elements[i];
165                         for (j=0;j<el->num_values;j++) {
166                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
167                                         if (v_idx) {
168                                                 *v_idx = j;
169                                         }
170                                         return i;
171                                 }
172                         }
173                 }
174         }
175         return -1;
176 }
177
178 /* used in sorting dn lists */
179 static int list_cmp(const char **s1, const char **s2)
180 {
181         return strcmp(*s1, *s2);
182 }
183
184 /*
185   return a list of dn's that might match a simple indexed search or
186  */
187 static int ltdb_index_dn_simple(struct ldb_module *module, 
188                                 const struct ldb_parse_tree *tree,
189                                 const struct ldb_message *index_list,
190                                 struct dn_list *list)
191 {
192         struct ldb_context *ldb = module->ldb;
193         struct ldb_dn *dn;
194         int ret;
195         unsigned int i, j;
196         struct ldb_message *msg;
197
198         list->count = 0;
199         list->dn = NULL;
200
201         /* if the attribute isn't in the list of indexed attributes then
202            this node needs a full search */
203         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
204                 return -1;
205         }
206
207         /* the attribute is indexed. Pull the list of DNs that match the 
208            search criterion */
209         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
210         if (!dn) return -1;
211
212         msg = talloc(list, struct ldb_message);
213         if (msg == NULL) {
214                 return -1;
215         }
216
217         ret = ltdb_search_dn1(module, dn, msg);
218         talloc_free(dn);
219         if (ret == 0 || ret == -1) {
220                 return ret;
221         }
222
223         for (i=0;i<msg->num_elements;i++) {
224                 struct ldb_message_element *el;
225
226                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
227                         continue;
228                 }
229
230                 el = &msg->elements[i];
231
232                 list->dn = talloc_array(list, char *, el->num_values);
233                 if (!list->dn) {
234                         talloc_free(msg);
235                         return -1;
236                 }
237
238                 for (j=0;j<el->num_values;j++) {
239                         list->dn[list->count] = 
240                                 talloc_strdup(list->dn, (char *)el->values[j].data);
241                         if (!list->dn[list->count]) {
242                                 talloc_free(msg);
243                                 return -1;
244                         }
245                         list->count++;
246                 }
247         }
248
249         talloc_free(msg);
250
251         if (list->count > 1) {
252                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
253         }
254
255         return 1;
256 }
257
258
259 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
260
261 /*
262   return a list of dn's that might match a simple indexed search on
263   the special objectclass attribute
264  */
265 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
266                                      const struct ldb_parse_tree *tree,
267                                      const struct ldb_message *index_list,
268                                      struct dn_list *list)
269 {
270         struct ldb_context *ldb = module->ldb;
271         unsigned int i;
272         int ret;
273         const char *target = (const char *)tree->u.equality.value.data;
274         const char **subclasses;
275
276         list->count = 0;
277         list->dn = NULL;
278
279         ret = ltdb_index_dn_simple(module, tree, index_list, list);
280
281         subclasses = ldb_subclass_list(module->ldb, target);
282
283         if (subclasses == NULL) {
284                 return ret;
285         }
286
287         for (i=0;subclasses[i];i++) {
288                 struct ldb_parse_tree tree2;
289                 struct dn_list *list2;
290                 tree2.operation = LDB_OP_EQUALITY;
291                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
292                 if (!tree2.u.equality.attr) {
293                         return -1;
294                 }
295                 tree2.u.equality.value.data = 
296                         (uint8_t *)talloc_strdup(list, subclasses[i]);
297                 if (tree2.u.equality.value.data == NULL) {
298                         return -1;                      
299                 }
300                 tree2.u.equality.value.length = strlen(subclasses[i]);
301                 list2 = talloc(list, struct dn_list);
302                 if (list2 == NULL) {
303                         talloc_free(tree2.u.equality.value.data);
304                         return -1;
305                 }
306                 if (ltdb_index_dn_objectclass(module, &tree2, 
307                                               index_list, list2) == 1) {
308                         if (list->count == 0) {
309                                 *list = *list2;
310                                 ret = 1;
311                         } else {
312                                 list_union(ldb, list, list2);
313                                 talloc_free(list2);
314                         }
315                 }
316                 talloc_free(tree2.u.equality.value.data);
317         }
318
319         return ret;
320 }
321
322 /*
323   return a list of dn's that might match a leaf indexed search
324  */
325 static int ltdb_index_dn_leaf(struct ldb_module *module, 
326                               const struct ldb_parse_tree *tree,
327                               const struct ldb_message *index_list,
328                               struct dn_list *list)
329 {
330         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
331                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
332         }
333         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
334                 list->dn = talloc_array(list, char *, 1);
335                 if (list->dn == NULL) {
336                         ldb_oom(module->ldb);
337                         return -1;
338                 }
339                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
340                 if (list->dn[0] == NULL) {
341                         ldb_oom(module->ldb);
342                         return -1;
343                 }
344                 list->count = 1;
345                 return 1;
346         }
347         return ltdb_index_dn_simple(module, tree, index_list, list);
348 }
349
350
351 /*
352   list intersection
353   list = list & list2
354   relies on the lists being sorted
355 */
356 static int list_intersect(struct ldb_context *ldb,
357                           struct dn_list *list, const struct dn_list *list2)
358 {
359         struct dn_list *list3;
360         unsigned int i;
361
362         if (list->count == 0 || list2->count == 0) {
363                 /* 0 & X == 0 */
364                 return 0;
365         }
366
367         list3 = talloc(ldb, struct dn_list);
368         if (list3 == NULL) {
369                 return -1;
370         }
371
372         list3->dn = talloc_array(list3, char *, list->count);
373         if (!list3->dn) {
374                 talloc_free(list3);
375                 return -1;
376         }
377         list3->count = 0;
378
379         for (i=0;i<list->count;i++) {
380                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
381                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
382                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
383                         list3->count++;
384                 } else {
385                         talloc_free(list->dn[i]);
386                 }               
387         }
388
389         talloc_free(list->dn);
390         list->dn = talloc_move(list, &list3->dn);
391         list->count = list3->count;
392         talloc_free(list3);
393
394         return 0;
395 }
396
397
398 /*
399   list union
400   list = list | list2
401   relies on the lists being sorted
402 */
403 static int list_union(struct ldb_context *ldb, 
404                       struct dn_list *list, const struct dn_list *list2)
405 {
406         unsigned int i;
407         char **d;
408         unsigned int count = list->count;
409
410         if (list->count == 0 && list2->count == 0) {
411                 /* 0 | 0 == 0 */
412                 return 0;
413         }
414
415         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
416         if (!d) {
417                 return -1;
418         }
419         list->dn = d;
420
421         for (i=0;i<list2->count;i++) {
422                 if (ldb_list_find(list2->dn[i], list->dn, count, 
423                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
424                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
425                         if (!list->dn[list->count]) {
426                                 return -1;
427                         }
428                         list->count++;
429                 }               
430         }
431
432         if (list->count != count) {
433                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
434         }
435
436         return 0;
437 }
438
439 static int ltdb_index_dn(struct ldb_module *module, 
440                          const struct ldb_parse_tree *tree,
441                          const struct ldb_message *index_list,
442                          struct dn_list *list);
443
444
445 /*
446   OR two index results
447  */
448 static int ltdb_index_dn_or(struct ldb_module *module, 
449                             const struct ldb_parse_tree *tree,
450                             const struct ldb_message *index_list,
451                             struct dn_list *list)
452 {
453         struct ldb_context *ldb = module->ldb;
454         unsigned int i;
455         int ret;
456         
457         ret = -1;
458         list->dn = NULL;
459         list->count = 0;
460
461         for (i=0;i<tree->u.list.num_elements;i++) {
462                 struct dn_list *list2;
463                 int v;
464
465                 list2 = talloc(module, struct dn_list);
466                 if (list2 == NULL) {
467                         return -1;
468                 }
469
470                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
471
472                 if (v == 0) {
473                         /* 0 || X == X */
474                         if (ret == -1) {
475                                 ret = 0;
476                         }
477                         talloc_free(list2);
478                         continue;
479                 }
480
481                 if (v == -1) {
482                         /* 1 || X == 1 */
483                         talloc_free(list->dn);
484                         talloc_free(list2);
485                         return -1;
486                 }
487
488                 if (ret == -1) {
489                         ret = 1;
490                         list->dn = talloc_move(list, &list2->dn);
491                         list->count = list2->count;
492                 } else {
493                         if (list_union(ldb, list, list2) == -1) {
494                                 talloc_free(list2);
495                                 return -1;
496                         }
497                         ret = 1;
498                 }
499                 talloc_free(list2);
500         }
501
502         if (list->count == 0) {
503                 return 0;
504         }
505
506         return ret;
507 }
508
509
510 /*
511   NOT an index results
512  */
513 static int ltdb_index_dn_not(struct ldb_module *module, 
514                              const struct ldb_parse_tree *tree,
515                              const struct ldb_message *index_list,
516                              struct dn_list *list)
517 {
518         /* the only way to do an indexed not would be if we could
519            negate the not via another not or if we knew the total
520            number of database elements so we could know that the
521            existing expression covered the whole database. 
522            
523            instead, we just give up, and rely on a full index scan
524            (unless an outer & manages to reduce the list)
525         */
526         return -1;
527 }
528
529 /*
530   AND two index results
531  */
532 static int ltdb_index_dn_and(struct ldb_module *module, 
533                              const struct ldb_parse_tree *tree,
534                              const struct ldb_message *index_list,
535                              struct dn_list *list)
536 {
537         struct ldb_context *ldb = module->ldb;
538         unsigned int i;
539         int ret;
540         
541         ret = -1;
542         list->dn = NULL;
543         list->count = 0;
544
545         for (i=0;i<tree->u.list.num_elements;i++) {
546                 struct dn_list *list2;
547                 int v;
548
549                 list2 = talloc(module, struct dn_list);
550                 if (list2 == NULL) {
551                         return -1;
552                 }
553
554                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
555
556                 if (v == 0) {
557                         /* 0 && X == 0 */
558                         talloc_free(list->dn);
559                         talloc_free(list2);
560                         return 0;
561                 }
562
563                 if (v == -1) {
564                         talloc_free(list2);
565                         continue;
566                 }
567
568                 if (ret == -1) {
569                         ret = 1;
570                         talloc_free(list->dn);
571                         list->dn = talloc_move(list, &list2->dn);
572                         list->count = list2->count;
573                 } else {
574                         if (list_intersect(ldb, list, list2) == -1) {
575                                 talloc_free(list2);
576                                 return -1;
577                         }
578                 }
579
580                 talloc_free(list2);
581
582                 if (list->count == 0) {
583                         talloc_free(list->dn);
584                         return 0;
585                 }
586         }
587
588         return ret;
589 }
590
591 /*
592   return a list of dn's that might match a indexed search or
593   -1 if an error. return 0 for no matches, or 1 for matches
594  */
595 static int ltdb_index_dn(struct ldb_module *module, 
596                          const struct ldb_parse_tree *tree,
597                          const struct ldb_message *index_list,
598                          struct dn_list *list)
599 {
600         int ret = -1;
601
602         switch (tree->operation) {
603         case LDB_OP_AND:
604                 ret = ltdb_index_dn_and(module, tree, index_list, list);
605                 break;
606
607         case LDB_OP_OR:
608                 ret = ltdb_index_dn_or(module, tree, index_list, list);
609                 break;
610
611         case LDB_OP_NOT:
612                 ret = ltdb_index_dn_not(module, tree, index_list, list);
613                 break;
614
615         case LDB_OP_EQUALITY:
616                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
617                 break;
618
619         case LDB_OP_SUBSTRING:
620         case LDB_OP_GREATER:
621         case LDB_OP_LESS:
622         case LDB_OP_PRESENT:
623         case LDB_OP_APPROX:
624         case LDB_OP_EXTENDED:
625                 /* we can't index with fancy bitops yet */
626                 ret = -1;
627                 break;
628         }
629
630         return ret;
631 }
632
633 /*
634   filter a candidate dn_list from an indexed search into a set of results
635   extracting just the given attributes
636 */
637 static int ltdb_index_filter(const struct dn_list *dn_list, 
638                              struct ldb_handle *handle)
639 {
640         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
641         struct ldb_reply *ares = NULL;
642         unsigned int i;
643
644         if (!ac) {
645                 return LDB_ERR_OPERATIONS_ERROR;
646         }
647
648         for (i = 0; i < dn_list->count; i++) {
649                 struct ldb_dn *dn;
650                 int ret;
651
652                 ares = talloc_zero(ac, struct ldb_reply);
653                 if (!ares) {
654                         handle->status = LDB_ERR_OPERATIONS_ERROR;
655                         handle->state = LDB_ASYNC_DONE;
656                         return LDB_ERR_OPERATIONS_ERROR;
657                 }
658
659                 ares->message = ldb_msg_new(ares);
660                 if (!ares->message) {
661                         handle->status = LDB_ERR_OPERATIONS_ERROR;
662                         handle->state = LDB_ASYNC_DONE;
663                         talloc_free(ares);
664                         return LDB_ERR_OPERATIONS_ERROR;
665                 }
666
667
668                 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
669                 if (dn == NULL) {
670                         talloc_free(ares);
671                         return LDB_ERR_OPERATIONS_ERROR;
672                 }
673
674                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
675                 talloc_free(dn);
676                 if (ret == 0) {
677                         /* the record has disappeared? yes, this can happen */
678                         talloc_free(ares);
679                         continue;
680                 }
681
682                 if (ret == -1) {
683                         /* an internal error */
684                         talloc_free(ares);
685                         return LDB_ERR_OPERATIONS_ERROR;
686                 }
687
688                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
689                         talloc_free(ares);
690                         continue;
691                 }
692
693                 /* filter the attributes that the user wants */
694                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
695
696                 if (ret == -1) {
697                         handle->status = LDB_ERR_OPERATIONS_ERROR;
698                         handle->state = LDB_ASYNC_DONE;
699                         talloc_free(ares);
700                         return LDB_ERR_OPERATIONS_ERROR;
701                 }
702
703                 ares->type = LDB_REPLY_ENTRY;
704                 handle->state = LDB_ASYNC_PENDING;
705                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
706
707                 if (handle->status != LDB_SUCCESS) {
708                         handle->state = LDB_ASYNC_DONE;
709                         return handle->status;
710                 }
711         }
712
713         return LDB_SUCCESS;
714 }
715
716 /*
717   search the database with a LDAP-like expression using indexes
718   returns -1 if an indexed search is not possible, in which
719   case the caller should call ltdb_search_full() 
720 */
721 int ltdb_search_indexed(struct ldb_handle *handle)
722 {
723         struct ltdb_context *ac;
724         struct ltdb_private *ltdb;
725         struct dn_list *dn_list;
726         int ret;
727
728         if (!(ac = talloc_get_type(handle->private_data,
729                                    struct ltdb_context)) ||
730             !(ltdb = talloc_get_type(ac->module->private_data,
731                                      struct ltdb_private))) {
732                 return -1;
733         }
734
735         if (ltdb->cache->indexlist->num_elements == 0 && 
736             ac->scope != LDB_SCOPE_BASE) {
737                 /* no index list? must do full search */
738                 return -1;
739         }
740
741         dn_list = talloc(handle, struct dn_list);
742         if (dn_list == NULL) {
743                 return -1;
744         }
745
746         if (ac->scope == LDB_SCOPE_BASE) {
747                 /* with BASE searches only one DN can match */
748                 dn_list->dn = talloc_array(dn_list, char *, 1);
749                 if (dn_list->dn == NULL) {
750                         ldb_oom(ac->module->ldb);
751                         return -1;
752                 }
753                 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
754                 if (dn_list->dn[0] == NULL) {
755                         ldb_oom(ac->module->ldb);
756                         return -1;
757                 }
758                 dn_list->count = 1;
759                 ret = 1;
760         } else {
761                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
762         }
763
764         if (ret == 1) {
765                 /* we've got a candidate list - now filter by the full tree
766                    and extract the needed attributes */
767                 ret = ltdb_index_filter(dn_list, handle);
768                 handle->status = ret;
769                 handle->state = LDB_ASYNC_DONE;
770         }
771
772         talloc_free(dn_list);
773
774         return ret;
775 }
776
777 /*
778   add a index element where this is the first indexed DN for this value
779 */
780 static int ltdb_index_add1_new(struct ldb_context *ldb, 
781                                struct ldb_message *msg,
782                                struct ldb_message_element *el,
783                                const char *dn)
784 {
785         struct ldb_message_element *el2;
786
787         /* add another entry */
788         el2 = talloc_realloc(msg, msg->elements, 
789                                struct ldb_message_element, msg->num_elements+1);
790         if (!el2) {
791                 return -1;
792         }
793
794         msg->elements = el2;
795         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
796         if (!msg->elements[msg->num_elements].name) {
797                 return -1;
798         }
799         msg->elements[msg->num_elements].num_values = 0;
800         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
801         if (!msg->elements[msg->num_elements].values) {
802                 return -1;
803         }
804         msg->elements[msg->num_elements].values[0].length = strlen(dn);
805         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
806         msg->elements[msg->num_elements].num_values = 1;
807         msg->num_elements++;
808
809         return 0;
810 }
811
812
813 /*
814   add a index element where this is not the first indexed DN for this
815   value
816 */
817 static int ltdb_index_add1_add(struct ldb_context *ldb, 
818                                struct ldb_message *msg,
819                                struct ldb_message_element *el,
820                                int idx,
821                                const char *dn)
822 {
823         struct ldb_val *v2;
824         unsigned int i;
825
826         /* for multi-valued attributes we can end up with repeats */
827         for (i=0;i<msg->elements[idx].num_values;i++) {
828                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
829                         return 0;
830                 }
831         }
832
833         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
834                               struct ldb_val, 
835                               msg->elements[idx].num_values+1);
836         if (!v2) {
837                 return -1;
838         }
839         msg->elements[idx].values = v2;
840
841         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
842         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
843         msg->elements[idx].num_values++;
844
845         return 0;
846 }
847
848 /*
849   add an index entry for one message element
850 */
851 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
852                            struct ldb_message_element *el, int v_idx)
853 {
854         struct ldb_context *ldb = module->ldb;
855         struct ldb_message *msg;
856         struct ldb_dn *dn_key;
857         int ret;
858         unsigned int i;
859
860         msg = talloc(module, struct ldb_message);
861         if (msg == NULL) {
862                 errno = ENOMEM;
863                 return -1;
864         }
865
866         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
867         if (!dn_key) {
868                 talloc_free(msg);
869                 errno = ENOMEM;
870                 return -1;
871         }
872         talloc_steal(msg, dn_key);
873
874         ret = ltdb_search_dn1(module, dn_key, msg);
875         if (ret == -1) {
876                 talloc_free(msg);
877                 return -1;
878         }
879
880         if (ret == 0) {
881                 msg->dn = dn_key;
882                 msg->num_elements = 0;
883                 msg->elements = NULL;
884         }
885
886         for (i=0;i<msg->num_elements;i++) {
887                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
888                         break;
889                 }
890         }
891
892         if (i == msg->num_elements) {
893                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
894         } else {
895                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
896         }
897
898         if (ret == 0) {
899                 ret = ltdb_store(module, msg, TDB_REPLACE);
900         }
901
902         talloc_free(msg);
903
904         return ret;
905 }
906
907 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
908                            struct ldb_message_element *elements, int num_el)
909 {
910         struct ltdb_private *ltdb =
911                 (struct ltdb_private *)module->private_data;
912         int ret;
913         unsigned int i, j;
914
915         if (dn[0] == '@') {
916                 return 0;
917         }
918
919         if (ltdb->cache->indexlist->num_elements == 0) {
920                 /* no indexed fields */
921                 return 0;
922         }
923
924         for (i = 0; i < num_el; i++) {
925                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
926                                        NULL, LTDB_IDXATTR);
927                 if (ret == -1) {
928                         continue;
929                 }
930                 for (j = 0; j < elements[i].num_values; j++) {
931                         ret = ltdb_index_add1(module, dn, &elements[i], j);
932                         if (ret == -1) {
933                                 return -1;
934                         }
935                 }
936         }
937
938         return 0;
939 }
940
941 /*
942   add the index entries for a new record
943   return -1 on failure
944 */
945 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
946 {
947         struct ltdb_private *ltdb =
948                 (struct ltdb_private *)module->private_data;
949         char *dn;
950         int ret;
951
952         dn = ldb_dn_linearize(ltdb, msg->dn);
953         if (dn == NULL) {
954                 return -1;
955         }
956
957         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
958
959         talloc_free(dn);
960
961         return ret;
962 }
963
964
965 /*
966   delete an index entry for one message element
967 */
968 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
969                          struct ldb_message_element *el, int v_idx)
970 {
971         struct ldb_context *ldb = module->ldb;
972         struct ldb_message *msg;
973         struct ldb_dn *dn_key;
974         int ret, i;
975         unsigned int j;
976
977         if (dn[0] == '@') {
978                 return 0;
979         }
980
981         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
982         if (!dn_key) {
983                 return -1;
984         }
985
986         msg = talloc(dn_key, struct ldb_message);
987         if (msg == NULL) {
988                 talloc_free(dn_key);
989                 return -1;
990         }
991
992         ret = ltdb_search_dn1(module, dn_key, msg);
993         if (ret == -1) {
994                 talloc_free(dn_key);
995                 return -1;
996         }
997
998         if (ret == 0) {
999                 /* it wasn't indexed. Did we have an earlier error? If we did then
1000                    its gone now */
1001                 talloc_free(dn_key);
1002                 return 0;
1003         }
1004
1005         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1006         if (i == -1) {
1007                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1008                                 "ERROR: dn %s not found in %s\n", dn,
1009                                 ldb_dn_linearize(dn_key, dn_key));
1010                 /* it ain't there. hmmm */
1011                 talloc_free(dn_key);
1012                 return 0;
1013         }
1014
1015         if (j != msg->elements[i].num_values - 1) {
1016                 memmove(&msg->elements[i].values[j], 
1017                         &msg->elements[i].values[j+1], 
1018                         (msg->elements[i].num_values-(j+1)) * 
1019                         sizeof(msg->elements[i].values[0]));
1020         }
1021         msg->elements[i].num_values--;
1022
1023         if (msg->elements[i].num_values == 0) {
1024                 ret = ltdb_delete_noindex(module, dn_key);
1025         } else {
1026                 ret = ltdb_store(module, msg, TDB_REPLACE);
1027         }
1028
1029         talloc_free(dn_key);
1030
1031         return ret;
1032 }
1033
1034 /*
1035   delete the index entries for a record
1036   return -1 on failure
1037 */
1038 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1039 {
1040         struct ltdb_private *ltdb =
1041                 (struct ltdb_private *)module->private_data;
1042         int ret;
1043         char *dn;
1044         unsigned int i, j;
1045
1046         /* find the list of indexed fields */   
1047         if (ltdb->cache->indexlist->num_elements == 0) {
1048                 /* no indexed fields */
1049                 return 0;
1050         }
1051
1052         if (ldb_dn_is_special(msg->dn)) {
1053                 return 0;
1054         }
1055
1056         dn = ldb_dn_linearize(ltdb, msg->dn);
1057         if (dn == NULL) {
1058                 return -1;
1059         }
1060
1061         for (i = 0; i < msg->num_elements; i++) {
1062                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1063                                        NULL, LTDB_IDXATTR);
1064                 if (ret == -1) {
1065                         continue;
1066                 }
1067                 for (j = 0; j < msg->elements[i].num_values; j++) {
1068                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1069                         if (ret == -1) {
1070                                 talloc_free(dn);
1071                                 return -1;
1072                         }
1073                 }
1074         }
1075
1076         talloc_free(dn);
1077         return 0;
1078 }
1079
1080
1081 /*
1082   traversal function that deletes all @INDEX records
1083 */
1084 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1085 {
1086         const char *dn = "DN=" LTDB_INDEX ":";
1087         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1088                 return tdb_delete(tdb, key);
1089         }
1090         return 0;
1091 }
1092
1093 /*
1094   traversal function that adds @INDEX records during a re index
1095 */
1096 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1097 {
1098         struct ldb_module *module = (struct ldb_module *)state;
1099         struct ldb_message *msg;
1100         char *dn = NULL;
1101         int ret;
1102         TDB_DATA key2;
1103
1104         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1105             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1106                 return 0;
1107         }
1108
1109         msg = talloc(module, struct ldb_message);
1110         if (msg == NULL) {
1111                 return -1;
1112         }
1113
1114         ret = ltdb_unpack_data(module, &data, msg);
1115         if (ret != 0) {
1116                 talloc_free(msg);
1117                 return -1;
1118         }
1119
1120         /* check if the DN key has changed, perhaps due to the 
1121            case insensitivity of an element changing */
1122         key2 = ltdb_key(module, msg->dn);
1123         if (key2.dptr == NULL) {
1124                 /* probably a corrupt record ... darn */
1125                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1126                                                         ldb_dn_linearize(msg, msg->dn));
1127                 talloc_free(msg);
1128                 return 0;
1129         }
1130         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1131                 tdb_delete(tdb, key);
1132                 tdb_store(tdb, key2, data, 0);
1133         }
1134         talloc_free(key2.dptr);
1135
1136         if (msg->dn == NULL) {
1137                 dn = (char *)key.dptr + 3;
1138         } else {
1139                 if (!(dn = ldb_dn_linearize(msg->dn, msg->dn))) {
1140                         talloc_free(msg);
1141                         return -1;
1142                 }
1143         }
1144
1145         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1146
1147         talloc_free(msg);
1148
1149         return ret;
1150 }
1151
1152 /*
1153   force a complete reindex of the database
1154 */
1155 int ltdb_reindex(struct ldb_module *module)
1156 {
1157         struct ltdb_private *ltdb =
1158                 (struct ltdb_private *)module->private_data;
1159         int ret;
1160
1161         if (ltdb_cache_reload(module) != 0) {
1162                 return -1;
1163         }
1164
1165         /* first traverse the database deleting any @INDEX records */
1166         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1167         if (ret == -1) {
1168                 return -1;
1169         }
1170
1171         /* now traverse adding any indexes for normal LDB records */
1172         ret = tdb_traverse(ltdb->tdb, re_index, module);
1173         if (ret == -1) {
1174                 return -1;
1175         }
1176
1177         return 0;
1178 }