r19114: fixed another checker warning and a possible error on allocation
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 /* the following cast looks strange, but is
65                  correct. The key to understanding it is that base_p
66                  is a pointer to an array of pointers, so we have to
67                  dereference it after casting to void **. The strange
68                  const in the middle gives us the right type of pointer
69                  after the dereference  (tridge) */
70                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71                 if (r == 0) {
72                         /* scan back for first element */
73                         while (test_i > 0 &&
74                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
75                                 test_i--;
76                         }
77                         return test_i;
78                 }
79                 if (r < 0) {
80                         if (test_i == 0) {
81                                 return -1;
82                         }
83                         max_i = test_i - 1;
84                 }
85                 if (r > 0) {
86                         min_i = test_i + 1;
87                 }
88         }
89
90         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
91                 return min_i;
92         }
93
94         return -1;
95 }
96
97 struct dn_list {
98         unsigned int count;
99         char **dn;
100 };
101
102 /*
103   return the dn key to be used for an index
104   caller frees
105 */
106 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
107                         const char *attr, const struct ldb_val *value)
108 {
109         struct ldb_dn *ret;
110         char *dn;
111         struct ldb_val v;
112         const struct ldb_attrib_handler *h;
113         char *attr_folded;
114
115         attr_folded = ldb_attr_casefold(ldb, attr);
116         if (!attr_folded) {
117                 return NULL;
118         }
119
120         h = ldb_attrib_handler(ldb, attr);
121         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
122                 /* canonicalisation can be refused. For example, 
123                    a attribute that takes wildcards will refuse to canonicalise
124                    if the value contains a wildcard */
125                 talloc_free(attr_folded);
126                 return NULL;
127         }
128         if (ldb_should_b64_encode(&v)) {
129                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
130                 if (!vstr) return NULL;
131                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
132                 talloc_free(vstr);
133                 if (v.data != value->data) {
134                         talloc_free(v.data);
135                 }
136                 talloc_free(attr_folded);
137                 if (dn == NULL) return NULL;
138                 goto done;
139         }
140
141         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
142                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
143
144         if (v.data != value->data) {
145                 talloc_free(v.data);
146         }
147         talloc_free(attr_folded);
148
149 done:
150         ret = ldb_dn_explode(ldb, dn);
151         talloc_free(dn);
152         return ret;
153 }
154
155 /*
156   see if a attribute value is in the list of indexed attributes
157 */
158 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
159                             unsigned int *v_idx, const char *key)
160 {
161         unsigned int i, j;
162         for (i=0;i<msg->num_elements;i++) {
163                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
164                         const struct ldb_message_element *el = 
165                                 &msg->elements[i];
166                         for (j=0;j<el->num_values;j++) {
167                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
168                                         if (v_idx) {
169                                                 *v_idx = j;
170                                         }
171                                         return i;
172                                 }
173                         }
174                 }
175         }
176         return -1;
177 }
178
179 /* used in sorting dn lists */
180 static int list_cmp(const char **s1, const char **s2)
181 {
182         return strcmp(*s1, *s2);
183 }
184
185 /*
186   return a list of dn's that might match a simple indexed search or
187  */
188 static int ltdb_index_dn_simple(struct ldb_module *module, 
189                                 const struct ldb_parse_tree *tree,
190                                 const struct ldb_message *index_list,
191                                 struct dn_list *list)
192 {
193         struct ldb_context *ldb = module->ldb;
194         struct ldb_dn *dn;
195         int ret;
196         unsigned int i, j;
197         struct ldb_message *msg;
198
199         list->count = 0;
200         list->dn = NULL;
201
202         /* if the attribute isn't in the list of indexed attributes then
203            this node needs a full search */
204         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
205                 return -1;
206         }
207
208         /* the attribute is indexed. Pull the list of DNs that match the 
209            search criterion */
210         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
211         if (!dn) return -1;
212
213         msg = talloc(list, struct ldb_message);
214         if (msg == NULL) {
215                 return -1;
216         }
217
218         ret = ltdb_search_dn1(module, dn, msg);
219         talloc_free(dn);
220         if (ret == 0 || ret == -1) {
221                 return ret;
222         }
223
224         for (i=0;i<msg->num_elements;i++) {
225                 struct ldb_message_element *el;
226
227                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
228                         continue;
229                 }
230
231                 el = &msg->elements[i];
232
233                 list->dn = talloc_array(list, char *, el->num_values);
234                 if (!list->dn) {
235                         talloc_free(msg);
236                         return -1;
237                 }
238
239                 for (j=0;j<el->num_values;j++) {
240                         list->dn[list->count] = 
241                                 talloc_strdup(list->dn, (char *)el->values[j].data);
242                         if (!list->dn[list->count]) {
243                                 talloc_free(msg);
244                                 return -1;
245                         }
246                         list->count++;
247                 }
248         }
249
250         talloc_free(msg);
251
252         if (list->count > 1) {
253                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
254         }
255
256         return 1;
257 }
258
259
260 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
261
262 /*
263   return a list of dn's that might match a simple indexed search on
264   the special objectclass attribute
265  */
266 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
267                                      const struct ldb_parse_tree *tree,
268                                      const struct ldb_message *index_list,
269                                      struct dn_list *list)
270 {
271         struct ldb_context *ldb = module->ldb;
272         unsigned int i;
273         int ret;
274         const char *target = (const char *)tree->u.equality.value.data;
275         const char **subclasses;
276
277         list->count = 0;
278         list->dn = NULL;
279
280         ret = ltdb_index_dn_simple(module, tree, index_list, list);
281
282         subclasses = ldb_subclass_list(module->ldb, target);
283
284         if (subclasses == NULL) {
285                 return ret;
286         }
287
288         for (i=0;subclasses[i];i++) {
289                 struct ldb_parse_tree tree2;
290                 struct dn_list *list2;
291                 tree2.operation = LDB_OP_EQUALITY;
292                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
293                 if (!tree2.u.equality.attr) {
294                         return -1;
295                 }
296                 tree2.u.equality.value.data = 
297                         (uint8_t *)talloc_strdup(list, subclasses[i]);
298                 if (tree2.u.equality.value.data == NULL) {
299                         return -1;                      
300                 }
301                 tree2.u.equality.value.length = strlen(subclasses[i]);
302                 list2 = talloc(list, struct dn_list);
303                 if (list2 == NULL) {
304                         talloc_free(tree2.u.equality.value.data);
305                         return -1;
306                 }
307                 if (ltdb_index_dn_objectclass(module, &tree2, 
308                                               index_list, list2) == 1) {
309                         if (list->count == 0) {
310                                 *list = *list2;
311                                 ret = 1;
312                         } else {
313                                 list_union(ldb, list, list2);
314                                 talloc_free(list2);
315                         }
316                 }
317                 talloc_free(tree2.u.equality.value.data);
318         }
319
320         return ret;
321 }
322
323 /*
324   return a list of dn's that might match a leaf indexed search
325  */
326 static int ltdb_index_dn_leaf(struct ldb_module *module, 
327                               const struct ldb_parse_tree *tree,
328                               const struct ldb_message *index_list,
329                               struct dn_list *list)
330 {
331         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
332                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
333         }
334         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
335                 list->dn = talloc_array(list, char *, 1);
336                 if (list->dn == NULL) {
337                         ldb_oom(module->ldb);
338                         return -1;
339                 }
340                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
341                 if (list->dn[0] == NULL) {
342                         ldb_oom(module->ldb);
343                         return -1;
344                 }
345                 list->count = 1;
346                 return 1;
347         }
348         return ltdb_index_dn_simple(module, tree, index_list, list);
349 }
350
351
352 /*
353   list intersection
354   list = list & list2
355   relies on the lists being sorted
356 */
357 static int list_intersect(struct ldb_context *ldb,
358                           struct dn_list *list, const struct dn_list *list2)
359 {
360         struct dn_list *list3;
361         unsigned int i;
362
363         if (list->count == 0 || list2->count == 0) {
364                 /* 0 & X == 0 */
365                 return 0;
366         }
367
368         list3 = talloc(ldb, struct dn_list);
369         if (list3 == NULL) {
370                 return -1;
371         }
372
373         list3->dn = talloc_array(list3, char *, list->count);
374         if (!list3->dn) {
375                 talloc_free(list3);
376                 return -1;
377         }
378         list3->count = 0;
379
380         for (i=0;i<list->count;i++) {
381                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
382                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
383                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
384                         list3->count++;
385                 } else {
386                         talloc_free(list->dn[i]);
387                 }               
388         }
389
390         talloc_free(list->dn);
391         list->dn = talloc_move(list, &list3->dn);
392         list->count = list3->count;
393         talloc_free(list3);
394
395         return 0;
396 }
397
398
399 /*
400   list union
401   list = list | list2
402   relies on the lists being sorted
403 */
404 static int list_union(struct ldb_context *ldb, 
405                       struct dn_list *list, const struct dn_list *list2)
406 {
407         unsigned int i;
408         char **d;
409         unsigned int count = list->count;
410
411         if (list->count == 0 && list2->count == 0) {
412                 /* 0 | 0 == 0 */
413                 return 0;
414         }
415
416         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
417         if (!d) {
418                 return -1;
419         }
420         list->dn = d;
421
422         for (i=0;i<list2->count;i++) {
423                 if (ldb_list_find(list2->dn[i], list->dn, count, 
424                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
425                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
426                         if (!list->dn[list->count]) {
427                                 return -1;
428                         }
429                         list->count++;
430                 }               
431         }
432
433         if (list->count != count) {
434                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
435         }
436
437         return 0;
438 }
439
440 static int ltdb_index_dn(struct ldb_module *module, 
441                          const struct ldb_parse_tree *tree,
442                          const struct ldb_message *index_list,
443                          struct dn_list *list);
444
445
446 /*
447   OR two index results
448  */
449 static int ltdb_index_dn_or(struct ldb_module *module, 
450                             const struct ldb_parse_tree *tree,
451                             const struct ldb_message *index_list,
452                             struct dn_list *list)
453 {
454         struct ldb_context *ldb = module->ldb;
455         unsigned int i;
456         int ret;
457         
458         ret = -1;
459         list->dn = NULL;
460         list->count = 0;
461
462         for (i=0;i<tree->u.list.num_elements;i++) {
463                 struct dn_list *list2;
464                 int v;
465
466                 list2 = talloc(module, struct dn_list);
467                 if (list2 == NULL) {
468                         return -1;
469                 }
470
471                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
472
473                 if (v == 0) {
474                         /* 0 || X == X */
475                         if (ret == -1) {
476                                 ret = 0;
477                         }
478                         talloc_free(list2);
479                         continue;
480                 }
481
482                 if (v == -1) {
483                         /* 1 || X == 1 */
484                         talloc_free(list->dn);
485                         talloc_free(list2);
486                         return -1;
487                 }
488
489                 if (ret == -1) {
490                         ret = 1;
491                         list->dn = talloc_move(list, &list2->dn);
492                         list->count = list2->count;
493                 } else {
494                         if (list_union(ldb, list, list2) == -1) {
495                                 talloc_free(list2);
496                                 return -1;
497                         }
498                         ret = 1;
499                 }
500                 talloc_free(list2);
501         }
502
503         if (list->count == 0) {
504                 return 0;
505         }
506
507         return ret;
508 }
509
510
511 /*
512   NOT an index results
513  */
514 static int ltdb_index_dn_not(struct ldb_module *module, 
515                              const struct ldb_parse_tree *tree,
516                              const struct ldb_message *index_list,
517                              struct dn_list *list)
518 {
519         /* the only way to do an indexed not would be if we could
520            negate the not via another not or if we knew the total
521            number of database elements so we could know that the
522            existing expression covered the whole database. 
523            
524            instead, we just give up, and rely on a full index scan
525            (unless an outer & manages to reduce the list)
526         */
527         return -1;
528 }
529
530 /*
531   AND two index results
532  */
533 static int ltdb_index_dn_and(struct ldb_module *module, 
534                              const struct ldb_parse_tree *tree,
535                              const struct ldb_message *index_list,
536                              struct dn_list *list)
537 {
538         struct ldb_context *ldb = module->ldb;
539         unsigned int i;
540         int ret;
541         
542         ret = -1;
543         list->dn = NULL;
544         list->count = 0;
545
546         for (i=0;i<tree->u.list.num_elements;i++) {
547                 struct dn_list *list2;
548                 int v;
549
550                 list2 = talloc(module, struct dn_list);
551                 if (list2 == NULL) {
552                         return -1;
553                 }
554
555                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
556
557                 if (v == 0) {
558                         /* 0 && X == 0 */
559                         talloc_free(list->dn);
560                         talloc_free(list2);
561                         return 0;
562                 }
563
564                 if (v == -1) {
565                         talloc_free(list2);
566                         continue;
567                 }
568
569                 if (ret == -1) {
570                         ret = 1;
571                         talloc_free(list->dn);
572                         list->dn = talloc_move(list, &list2->dn);
573                         list->count = list2->count;
574                 } else {
575                         if (list_intersect(ldb, list, list2) == -1) {
576                                 talloc_free(list2);
577                                 return -1;
578                         }
579                 }
580
581                 talloc_free(list2);
582
583                 if (list->count == 0) {
584                         talloc_free(list->dn);
585                         return 0;
586                 }
587         }
588
589         return ret;
590 }
591
592 /*
593   return a list of dn's that might match a indexed search or
594   -1 if an error. return 0 for no matches, or 1 for matches
595  */
596 static int ltdb_index_dn(struct ldb_module *module, 
597                          const struct ldb_parse_tree *tree,
598                          const struct ldb_message *index_list,
599                          struct dn_list *list)
600 {
601         int ret = -1;
602
603         switch (tree->operation) {
604         case LDB_OP_AND:
605                 ret = ltdb_index_dn_and(module, tree, index_list, list);
606                 break;
607
608         case LDB_OP_OR:
609                 ret = ltdb_index_dn_or(module, tree, index_list, list);
610                 break;
611
612         case LDB_OP_NOT:
613                 ret = ltdb_index_dn_not(module, tree, index_list, list);
614                 break;
615
616         case LDB_OP_EQUALITY:
617                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
618                 break;
619
620         case LDB_OP_SUBSTRING:
621         case LDB_OP_GREATER:
622         case LDB_OP_LESS:
623         case LDB_OP_PRESENT:
624         case LDB_OP_APPROX:
625         case LDB_OP_EXTENDED:
626                 /* we can't index with fancy bitops yet */
627                 ret = -1;
628                 break;
629         }
630
631         return ret;
632 }
633
634 /*
635   filter a candidate dn_list from an indexed search into a set of results
636   extracting just the given attributes
637 */
638 static int ltdb_index_filter(const struct dn_list *dn_list, 
639                              struct ldb_handle *handle)
640 {
641         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
642         struct ldb_reply *ares = NULL;
643         unsigned int i;
644
645         for (i = 0; i < dn_list->count; i++) {
646                 struct ldb_dn *dn;
647                 int ret;
648
649                 ares = talloc_zero(ac, struct ldb_reply);
650                 if (!ares) {
651                         handle->status = LDB_ERR_OPERATIONS_ERROR;
652                         handle->state = LDB_ASYNC_DONE;
653                         return LDB_ERR_OPERATIONS_ERROR;
654                 }
655
656                 ares->message = ldb_msg_new(ares);
657                 if (!ares->message) {
658                         handle->status = LDB_ERR_OPERATIONS_ERROR;
659                         handle->state = LDB_ASYNC_DONE;
660                         talloc_free(ares);
661                         return LDB_ERR_OPERATIONS_ERROR;
662                 }
663
664
665                 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
666                 if (dn == NULL) {
667                         talloc_free(ares);
668                         return LDB_ERR_OPERATIONS_ERROR;
669                 }
670
671                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
672                 talloc_free(dn);
673                 if (ret == 0) {
674                         /* the record has disappeared? yes, this can happen */
675                         talloc_free(ares);
676                         continue;
677                 }
678
679                 if (ret == -1) {
680                         /* an internal error */
681                         talloc_free(ares);
682                         return LDB_ERR_OPERATIONS_ERROR;
683                 }
684
685                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
686                         talloc_free(ares);
687                         continue;
688                 }
689
690                 /* filter the attributes that the user wants */
691                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
692
693                 if (ret == -1) {
694                         handle->status = LDB_ERR_OPERATIONS_ERROR;
695                         handle->state = LDB_ASYNC_DONE;
696                         talloc_free(ares);
697                         return LDB_ERR_OPERATIONS_ERROR;
698                 }
699
700                 ares->type = LDB_REPLY_ENTRY;
701                 handle->state = LDB_ASYNC_PENDING;
702                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
703
704                 if (handle->status != LDB_SUCCESS) {
705                         handle->state = LDB_ASYNC_DONE;
706                         return handle->status;
707                 }
708         }
709
710         return LDB_SUCCESS;
711 }
712
713 /*
714   search the database with a LDAP-like expression using indexes
715   returns -1 if an indexed search is not possible, in which
716   case the caller should call ltdb_search_full() 
717 */
718 int ltdb_search_indexed(struct ldb_handle *handle)
719 {
720         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
721         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
722         struct dn_list *dn_list;
723         int ret;
724
725         if (ltdb->cache->indexlist->num_elements == 0 && 
726             ac->scope != LDB_SCOPE_BASE) {
727                 /* no index list? must do full search */
728                 return -1;
729         }
730
731         dn_list = talloc(handle, struct dn_list);
732         if (dn_list == NULL) {
733                 return -1;
734         }
735
736         if (ac->scope == LDB_SCOPE_BASE) {
737                 /* with BASE searches only one DN can match */
738                 dn_list->dn = talloc_array(dn_list, char *, 1);
739                 if (dn_list->dn == NULL) {
740                         ldb_oom(ac->module->ldb);
741                         return -1;
742                 }
743                 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
744                 if (dn_list->dn[0] == NULL) {
745                         ldb_oom(ac->module->ldb);
746                         return -1;
747                 }
748                 dn_list->count = 1;
749                 ret = 1;
750         } else {
751                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
752         }
753
754         if (ret == 1) {
755                 /* we've got a candidate list - now filter by the full tree
756                    and extract the needed attributes */
757                 ret = ltdb_index_filter(dn_list, handle);
758                 handle->status = ret;
759                 handle->state = LDB_ASYNC_DONE;
760         }
761
762         talloc_free(dn_list);
763
764         return ret;
765 }
766
767 /*
768   add a index element where this is the first indexed DN for this value
769 */
770 static int ltdb_index_add1_new(struct ldb_context *ldb, 
771                                struct ldb_message *msg,
772                                struct ldb_message_element *el,
773                                const char *dn)
774 {
775         struct ldb_message_element *el2;
776
777         /* add another entry */
778         el2 = talloc_realloc(msg, msg->elements, 
779                                struct ldb_message_element, msg->num_elements+1);
780         if (!el2) {
781                 return -1;
782         }
783
784         msg->elements = el2;
785         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
786         if (!msg->elements[msg->num_elements].name) {
787                 return -1;
788         }
789         msg->elements[msg->num_elements].num_values = 0;
790         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
791         if (!msg->elements[msg->num_elements].values) {
792                 return -1;
793         }
794         msg->elements[msg->num_elements].values[0].length = strlen(dn);
795         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
796         msg->elements[msg->num_elements].num_values = 1;
797         msg->num_elements++;
798
799         return 0;
800 }
801
802
803 /*
804   add a index element where this is not the first indexed DN for this
805   value
806 */
807 static int ltdb_index_add1_add(struct ldb_context *ldb, 
808                                struct ldb_message *msg,
809                                struct ldb_message_element *el,
810                                int idx,
811                                const char *dn)
812 {
813         struct ldb_val *v2;
814         unsigned int i;
815
816         /* for multi-valued attributes we can end up with repeats */
817         for (i=0;i<msg->elements[idx].num_values;i++) {
818                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
819                         return 0;
820                 }
821         }
822
823         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
824                               struct ldb_val, 
825                               msg->elements[idx].num_values+1);
826         if (!v2) {
827                 return -1;
828         }
829         msg->elements[idx].values = v2;
830
831         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
832         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
833         msg->elements[idx].num_values++;
834
835         return 0;
836 }
837
838 /*
839   add an index entry for one message element
840 */
841 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
842                            struct ldb_message_element *el, int v_idx)
843 {
844         struct ldb_context *ldb = module->ldb;
845         struct ldb_message *msg;
846         struct ldb_dn *dn_key;
847         int ret;
848         unsigned int i;
849
850         msg = talloc(module, struct ldb_message);
851         if (msg == NULL) {
852                 errno = ENOMEM;
853                 return -1;
854         }
855
856         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
857         if (!dn_key) {
858                 talloc_free(msg);
859                 errno = ENOMEM;
860                 return -1;
861         }
862         talloc_steal(msg, dn_key);
863
864         ret = ltdb_search_dn1(module, dn_key, msg);
865         if (ret == -1) {
866                 talloc_free(msg);
867                 return -1;
868         }
869
870         if (ret == 0) {
871                 msg->dn = dn_key;
872                 msg->num_elements = 0;
873                 msg->elements = NULL;
874         }
875
876         for (i=0;i<msg->num_elements;i++) {
877                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
878                         break;
879                 }
880         }
881
882         if (i == msg->num_elements) {
883                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
884         } else {
885                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
886         }
887
888         if (ret == 0) {
889                 ret = ltdb_store(module, msg, TDB_REPLACE);
890         }
891
892         talloc_free(msg);
893
894         return ret;
895 }
896
897 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
898                            struct ldb_message_element *elements, int num_el)
899 {
900         struct ltdb_private *ltdb = module->private_data;
901         int ret;
902         unsigned int i, j;
903
904         if (dn[0] == '@') {
905                 return 0;
906         }
907
908         if (ltdb->cache->indexlist->num_elements == 0) {
909                 /* no indexed fields */
910                 return 0;
911         }
912
913         for (i = 0; i < num_el; i++) {
914                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
915                                        NULL, LTDB_IDXATTR);
916                 if (ret == -1) {
917                         continue;
918                 }
919                 for (j = 0; j < elements[i].num_values; j++) {
920                         ret = ltdb_index_add1(module, dn, &elements[i], j);
921                         if (ret == -1) {
922                                 return -1;
923                         }
924                 }
925         }
926
927         return 0;
928 }
929
930 /*
931   add the index entries for a new record
932   return -1 on failure
933 */
934 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
935 {
936         struct ltdb_private *ltdb = module->private_data;
937         char *dn;
938         int ret;
939
940         dn = ldb_dn_linearize(ltdb, msg->dn);
941         if (dn == NULL) {
942                 return -1;
943         }
944
945         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
946
947         talloc_free(dn);
948
949         return ret;
950 }
951
952
953 /*
954   delete an index entry for one message element
955 */
956 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
957                          struct ldb_message_element *el, int v_idx)
958 {
959         struct ldb_context *ldb = module->ldb;
960         struct ldb_message *msg;
961         struct ldb_dn *dn_key;
962         int ret, i;
963         unsigned int j;
964
965         if (dn[0] == '@') {
966                 return 0;
967         }
968
969         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
970         if (!dn_key) {
971                 return -1;
972         }
973
974         msg = talloc(dn_key, struct ldb_message);
975         if (msg == NULL) {
976                 talloc_free(dn_key);
977                 return -1;
978         }
979
980         ret = ltdb_search_dn1(module, dn_key, msg);
981         if (ret == -1) {
982                 talloc_free(dn_key);
983                 return -1;
984         }
985
986         if (ret == 0) {
987                 /* it wasn't indexed. Did we have an earlier error? If we did then
988                    its gone now */
989                 talloc_free(dn_key);
990                 return 0;
991         }
992
993         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
994         if (i == -1) {
995                 ldb_debug(ldb, LDB_DEBUG_ERROR,
996                                 "ERROR: dn %s not found in %s\n", dn,
997                                 ldb_dn_linearize(dn_key, dn_key));
998                 /* it ain't there. hmmm */
999                 talloc_free(dn_key);
1000                 return 0;
1001         }
1002
1003         if (j != msg->elements[i].num_values - 1) {
1004                 memmove(&msg->elements[i].values[j], 
1005                         &msg->elements[i].values[j+1], 
1006                         (msg->elements[i].num_values-(j+1)) * 
1007                         sizeof(msg->elements[i].values[0]));
1008         }
1009         msg->elements[i].num_values--;
1010
1011         if (msg->elements[i].num_values == 0) {
1012                 ret = ltdb_delete_noindex(module, dn_key);
1013         } else {
1014                 ret = ltdb_store(module, msg, TDB_REPLACE);
1015         }
1016
1017         talloc_free(dn_key);
1018
1019         return ret;
1020 }
1021
1022 /*
1023   delete the index entries for a record
1024   return -1 on failure
1025 */
1026 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1027 {
1028         struct ltdb_private *ltdb = module->private_data;
1029         int ret;
1030         char *dn;
1031         unsigned int i, j;
1032
1033         if (ldb_dn_is_special(msg->dn)) {
1034                 return 0;
1035         }
1036
1037         dn = ldb_dn_linearize(ltdb, msg->dn);
1038         if (dn == NULL) {
1039                 return -1;
1040         }
1041
1042         /* find the list of indexed fields */   
1043         if (ltdb->cache->indexlist->num_elements == 0) {
1044                 /* no indexed fields */
1045                 return 0;
1046         }
1047
1048         for (i = 0; i < msg->num_elements; i++) {
1049                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1050                                        NULL, LTDB_IDXATTR);
1051                 if (ret == -1) {
1052                         continue;
1053                 }
1054                 for (j = 0; j < msg->elements[i].num_values; j++) {
1055                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1056                         if (ret == -1) {
1057                                 talloc_free(dn);
1058                                 return -1;
1059                         }
1060                 }
1061         }
1062
1063         talloc_free(dn);
1064         return 0;
1065 }
1066
1067
1068 /*
1069   traversal function that deletes all @INDEX records
1070 */
1071 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1072 {
1073         const char *dn = "DN=" LTDB_INDEX ":";
1074         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1075                 return tdb_delete(tdb, key);
1076         }
1077         return 0;
1078 }
1079
1080 /*
1081   traversal function that adds @INDEX records during a re index
1082 */
1083 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1084 {
1085         struct ldb_module *module = state;
1086         struct ldb_message *msg;
1087         char *dn = NULL;
1088         int ret;
1089         TDB_DATA key2;
1090
1091         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1092             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1093                 return 0;
1094         }
1095
1096         msg = talloc(module, struct ldb_message);
1097         if (msg == NULL) {
1098                 return -1;
1099         }
1100
1101         ret = ltdb_unpack_data(module, &data, msg);
1102         if (ret != 0) {
1103                 talloc_free(msg);
1104                 return -1;
1105         }
1106
1107         /* check if the DN key has changed, perhaps due to the 
1108            case insensitivity of an element changing */
1109         key2 = ltdb_key(module, msg->dn);
1110         if (key2.dptr == NULL) {
1111                 /* probably a corrupt record ... darn */
1112                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1113                                                         ldb_dn_linearize(msg, msg->dn));
1114                 talloc_free(msg);
1115                 return 0;
1116         }
1117         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1118                 tdb_delete(tdb, key);
1119                 tdb_store(tdb, key2, data, 0);
1120         }
1121         talloc_free(key2.dptr);
1122
1123         if (msg->dn == NULL) {
1124                 dn = (char *)key.dptr + 3;
1125         } else {
1126                 dn = ldb_dn_linearize(msg->dn, msg->dn);
1127         }
1128
1129         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1130
1131         talloc_free(msg);
1132
1133         return ret;
1134 }
1135
1136 /*
1137   force a complete reindex of the database
1138 */
1139 int ltdb_reindex(struct ldb_module *module)
1140 {
1141         struct ltdb_private *ltdb = module->private_data;
1142         int ret;
1143
1144         if (ltdb_cache_reload(module) != 0) {
1145                 return -1;
1146         }
1147
1148         /* first traverse the database deleting any @INDEX records */
1149         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1150         if (ret == -1) {
1151                 return -1;
1152         }
1153
1154         /* now traverse adding any indexes for normal LDB records */
1155         ret = tdb_traverse(ltdb->tdb, re_index, module);
1156         if (ret == -1) {
1157                 return -1;
1158         }
1159
1160         return 0;
1161 }