r14431: don't call qsort with a null array
[abartlet/samba.git/.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
65                 if (r == 0) {
66                         /* scan back for first element */
67                         while (test_i > 0 &&
68                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
69                                 test_i--;
70                         }
71                         return test_i;
72                 }
73                 if (r < 0) {
74                         if (test_i == 0) {
75                                 return -1;
76                         }
77                         max_i = test_i - 1;
78                 }
79                 if (r > 0) {
80                         min_i = test_i + 1;
81                 }
82         }
83
84         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
85                 return min_i;
86         }
87
88         return -1;
89 }
90
91 struct dn_list {
92         unsigned int count;
93         char **dn;
94 };
95
96 /*
97   return the dn key to be used for an index
98   caller frees
99 */
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101                         const char *attr, const struct ldb_val *value)
102 {
103         struct ldb_dn *ret;
104         char *dn;
105         struct ldb_val v;
106         const struct ldb_attrib_handler *h;
107         char *attr_folded;
108
109         attr_folded = ldb_attr_casefold(ldb, attr);
110         if (!attr_folded) {
111                 return NULL;
112         }
113
114         h = ldb_attrib_handler(ldb, attr);
115         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116                 /* canonicalisation can be refused. For example, 
117                    a attribute that takes wildcards will refuse to canonicalise
118                    if the value contains a wildcard */
119                 talloc_free(attr_folded);
120                 return NULL;
121         }
122         if (ldb_should_b64_encode(&v)) {
123                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124                 if (!vstr) return NULL;
125                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
126                 talloc_free(vstr);
127                 if (v.data != value->data) {
128                         talloc_free(v.data);
129                 }
130                 talloc_free(attr_folded);
131                 if (dn == NULL) return NULL;
132                 goto done;
133         }
134
135         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
136                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
137
138         if (v.data != value->data) {
139                 talloc_free(v.data);
140         }
141         talloc_free(attr_folded);
142
143 done:
144         ret = ldb_dn_explode(ldb, dn);
145         talloc_free(dn);
146         return ret;
147 }
148
149 /*
150   see if a attribute value is in the list of indexed attributes
151 */
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153                             unsigned int *v_idx, const char *key)
154 {
155         unsigned int i, j;
156         for (i=0;i<msg->num_elements;i++) {
157                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158                         const struct ldb_message_element *el = 
159                                 &msg->elements[i];
160                         for (j=0;j<el->num_values;j++) {
161                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
162                                         if (v_idx) {
163                                                 *v_idx = j;
164                                         }
165                                         return i;
166                                 }
167                         }
168                 }
169         }
170         return -1;
171 }
172
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
175 {
176         return strcmp(*s1, *s2);
177 }
178
179 /*
180   return a list of dn's that might match a simple indexed search or
181  */
182 static int ltdb_index_dn_simple(struct ldb_module *module, 
183                                 struct ldb_parse_tree *tree,
184                                 const struct ldb_message *index_list,
185                                 struct dn_list *list)
186 {
187         struct ldb_context *ldb = module->ldb;
188         struct ldb_dn *dn;
189         int ret;
190         unsigned int i, j;
191         struct ldb_message *msg;
192
193         list->count = 0;
194         list->dn = NULL;
195
196         /* if the attribute isn't in the list of indexed attributes then
197            this node needs a full search */
198         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
199                 return -1;
200         }
201
202         /* the attribute is indexed. Pull the list of DNs that match the 
203            search criterion */
204         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
205         if (!dn) return -1;
206
207         msg = talloc(list, struct ldb_message);
208         if (msg == NULL) {
209                 return -1;
210         }
211
212         ret = ltdb_search_dn1(module, dn, msg);
213         talloc_free(dn);
214         if (ret == 0 || ret == -1) {
215                 return ret;
216         }
217
218         for (i=0;i<msg->num_elements;i++) {
219                 struct ldb_message_element *el;
220
221                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
222                         continue;
223                 }
224
225                 el = &msg->elements[i];
226
227                 list->dn = talloc_array(list, char *, el->num_values);
228                 if (!list->dn) {
229                         break;          
230                 }
231
232                 for (j=0;j<el->num_values;j++) {
233                         list->dn[list->count] = 
234                                 talloc_strdup(list->dn, (char *)el->values[j].data);
235                         if (!list->dn[list->count]) {
236                                 return -1;
237                         }
238                         list->count++;
239                 }
240         }
241
242         talloc_free(msg);
243
244         if (list->count > 1) {
245                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
246         }
247
248         return 1;
249 }
250
251
252 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
253
254 /*
255   return a list of dn's that might match a simple indexed search on
256   the special objectclass attribute
257  */
258 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
259                                      struct ldb_parse_tree *tree,
260                                      const struct ldb_message *index_list,
261                                      struct dn_list *list)
262 {
263         struct ldb_context *ldb = module->ldb;
264         unsigned int i;
265         int ret;
266         const char *target = (const char *)tree->u.equality.value.data;
267         const char **subclasses;
268
269         list->count = 0;
270         list->dn = NULL;
271
272         ret = ltdb_index_dn_simple(module, tree, index_list, list);
273
274         subclasses = ldb_subclass_list(module->ldb, target);
275
276         if (subclasses == NULL) {
277                 return ret;
278         }
279
280         for (i=0;subclasses[i];i++) {
281                 struct ldb_parse_tree tree2;
282                 struct dn_list *list2;
283                 tree2.operation = LDB_OP_EQUALITY;
284                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
285                 if (!tree2.u.equality.attr) {
286                         return -1;
287                 }
288                 tree2.u.equality.value.data = 
289                         (uint8_t *)talloc_strdup(list, subclasses[i]);
290                 if (tree2.u.equality.value.data == NULL) {
291                         return -1;                      
292                 }
293                 tree2.u.equality.value.length = strlen(subclasses[i]);
294                 list2 = talloc(list, struct dn_list);
295                 if (list2 == NULL) {
296                         talloc_free(tree2.u.equality.value.data);
297                         return -1;
298                 }
299                 if (ltdb_index_dn_objectclass(module, &tree2, 
300                                               index_list, list2) == 1) {
301                         if (list->count == 0) {
302                                 *list = *list2;
303                                 ret = 1;
304                         } else {
305                                 list_union(ldb, list, list2);
306                                 talloc_free(list2);
307                         }
308                 }
309                 talloc_free(tree2.u.equality.value.data);
310         }
311
312         return ret;
313 }
314
315 /*
316   return a list of dn's that might match a leaf indexed search
317  */
318 static int ltdb_index_dn_leaf(struct ldb_module *module, 
319                               struct ldb_parse_tree *tree,
320                               const struct ldb_message *index_list,
321                               struct dn_list *list)
322 {
323         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
324                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
325         }
326         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
327                 list->dn = talloc_array(list, char *, 1);
328                 if (list->dn == NULL) {
329                         ldb_oom(module->ldb);
330                         return -1;
331                 }
332                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
333                 if (list->dn[0] == NULL) {
334                         ldb_oom(module->ldb);
335                         return -1;
336                 }
337                 list->count = 1;
338                 return 1;
339         }
340         return ltdb_index_dn_simple(module, tree, index_list, list);
341 }
342
343
344 /*
345   list intersection
346   list = list & list2
347   relies on the lists being sorted
348 */
349 static int list_intersect(struct ldb_context *ldb,
350                           struct dn_list *list, const struct dn_list *list2)
351 {
352         struct dn_list *list3;
353         unsigned int i;
354
355         if (list->count == 0 || list2->count == 0) {
356                 /* 0 & X == 0 */
357                 return 0;
358         }
359
360         list3 = talloc(ldb, struct dn_list);
361         if (list3 == NULL) {
362                 return -1;
363         }
364
365         list3->dn = talloc_array(list3, char *, list->count);
366         if (!list3->dn) {
367                 talloc_free(list3);
368                 return -1;
369         }
370         list3->count = 0;
371
372         for (i=0;i<list->count;i++) {
373                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
374                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
375                         list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
376                         list3->count++;
377                 } else {
378                         talloc_free(list->dn[i]);
379                 }               
380         }
381
382         talloc_free(list->dn);
383         list->dn = talloc_steal(list, list3->dn);
384         list->count = list3->count;
385         talloc_free(list3);
386
387         return 0;
388 }
389
390
391 /*
392   list union
393   list = list | list2
394   relies on the lists being sorted
395 */
396 static int list_union(struct ldb_context *ldb, 
397                       struct dn_list *list, const struct dn_list *list2)
398 {
399         unsigned int i;
400         char **d;
401         unsigned int count = list->count;
402
403         if (list->count == 0 && list2->count == 0) {
404                 /* 0 | 0 == 0 */
405                 return 0;
406         }
407
408         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
409         if (!d) {
410                 return -1;
411         }
412         list->dn = d;
413
414         for (i=0;i<list2->count;i++) {
415                 if (ldb_list_find(list2->dn[i], list->dn, count, 
416                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
417                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
418                         if (!list->dn[list->count]) {
419                                 return -1;
420                         }
421                         list->count++;
422                 }               
423         }
424
425         if (list->count != count) {
426                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
427         }
428
429         return 0;
430 }
431
432 static int ltdb_index_dn(struct ldb_module *module, 
433                          struct ldb_parse_tree *tree,
434                          const struct ldb_message *index_list,
435                          struct dn_list *list);
436
437
438 /*
439   OR two index results
440  */
441 static int ltdb_index_dn_or(struct ldb_module *module, 
442                             struct ldb_parse_tree *tree,
443                             const struct ldb_message *index_list,
444                             struct dn_list *list)
445 {
446         struct ldb_context *ldb = module->ldb;
447         unsigned int i;
448         int ret;
449         
450         ret = -1;
451         list->dn = NULL;
452         list->count = 0;
453
454         for (i=0;i<tree->u.list.num_elements;i++) {
455                 struct dn_list *list2;
456                 int v;
457
458                 list2 = talloc(module, struct dn_list);
459                 if (list2 == NULL) {
460                         return -1;
461                 }
462
463                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
464
465                 if (v == 0) {
466                         /* 0 || X == X */
467                         if (ret == -1) {
468                                 ret = 0;
469                         }
470                         talloc_free(list2);
471                         continue;
472                 }
473
474                 if (v == -1) {
475                         /* 1 || X == 1 */
476                         talloc_free(list->dn);
477                         talloc_free(list2);
478                         return -1;
479                 }
480
481                 if (ret == -1) {
482                         ret = 1;
483                         list->dn = talloc_steal(list, list2->dn);
484                         list->count = list2->count;
485                 } else {
486                         if (list_union(ldb, list, list2) == -1) {
487                                 talloc_free(list2);
488                                 return -1;
489                         }
490                         ret = 1;
491                 }
492                 talloc_free(list2);
493         }
494
495         if (list->count == 0) {
496                 return 0;
497         }
498
499         return ret;
500 }
501
502
503 /*
504   NOT an index results
505  */
506 static int ltdb_index_dn_not(struct ldb_module *module, 
507                              struct ldb_parse_tree *tree,
508                              const struct ldb_message *index_list,
509                              struct dn_list *list)
510 {
511         /* the only way to do an indexed not would be if we could
512            negate the not via another not or if we knew the total
513            number of database elements so we could know that the
514            existing expression covered the whole database. 
515            
516            instead, we just give up, and rely on a full index scan
517            (unless an outer & manages to reduce the list)
518         */
519         return -1;
520 }
521
522 /*
523   AND two index results
524  */
525 static int ltdb_index_dn_and(struct ldb_module *module, 
526                              struct ldb_parse_tree *tree,
527                              const struct ldb_message *index_list,
528                              struct dn_list *list)
529 {
530         struct ldb_context *ldb = module->ldb;
531         unsigned int i;
532         int ret;
533         
534         ret = -1;
535         list->dn = NULL;
536         list->count = 0;
537
538         for (i=0;i<tree->u.list.num_elements;i++) {
539                 struct dn_list *list2;
540                 int v;
541
542                 list2 = talloc(module, struct dn_list);
543                 if (list2 == NULL) {
544                         return -1;
545                 }
546
547                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
548
549                 if (v == 0) {
550                         /* 0 && X == 0 */
551                         talloc_free(list->dn);
552                         talloc_free(list2);
553                         return 0;
554                 }
555
556                 if (v == -1) {
557                         talloc_free(list2);
558                         continue;
559                 }
560
561                 if (ret == -1) {
562                         ret = 1;
563                         talloc_free(list->dn);
564                         list->dn = talloc_steal(list, list2->dn);
565                         list->count = list2->count;
566                 } else {
567                         if (list_intersect(ldb, list, list2) == -1) {
568                                 talloc_free(list2);
569                                 return -1;
570                         }
571                 }
572
573                 talloc_free(list2);
574
575                 if (list->count == 0) {
576                         talloc_free(list->dn);
577                         return 0;
578                 }
579         }
580
581         return ret;
582 }
583
584 /*
585   return a list of dn's that might match a indexed search or
586   -1 if an error. return 0 for no matches, or 1 for matches
587  */
588 static int ltdb_index_dn(struct ldb_module *module, 
589                          struct ldb_parse_tree *tree,
590                          const struct ldb_message *index_list,
591                          struct dn_list *list)
592 {
593         int ret = -1;
594
595         switch (tree->operation) {
596         case LDB_OP_AND:
597                 ret = ltdb_index_dn_and(module, tree, index_list, list);
598                 break;
599
600         case LDB_OP_OR:
601                 ret = ltdb_index_dn_or(module, tree, index_list, list);
602                 break;
603
604         case LDB_OP_NOT:
605                 ret = ltdb_index_dn_not(module, tree, index_list, list);
606                 break;
607
608         case LDB_OP_EQUALITY:
609                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
610                 break;
611
612         case LDB_OP_SUBSTRING:
613         case LDB_OP_GREATER:
614         case LDB_OP_LESS:
615         case LDB_OP_PRESENT:
616         case LDB_OP_APPROX:
617         case LDB_OP_EXTENDED:
618                 /* we can't index with fancy bitops yet */
619                 ret = -1;
620                 break;
621         }
622
623         return ret;
624 }
625
626 /*
627   filter a candidate dn_list from an indexed search into a set of results
628   extracting just the given attributes
629 */
630 static int ltdb_index_filter(const struct dn_list *dn_list, 
631                              struct ldb_async_handle *handle)
632 {
633         struct ltdb_async_context *ac = talloc_get_type(handle->private_data, struct ltdb_async_context);
634         struct ldb_async_result *ares = NULL;
635         unsigned int i;
636
637         for (i = 0; i < dn_list->count; i++) {
638                 struct ldb_dn *dn;
639                 int ret;
640
641                 ares = talloc_zero(ac, struct ldb_async_result);
642                 if (!ares) {
643                         handle->status = LDB_ERR_OPERATIONS_ERROR;
644                         handle->state = LDB_ASYNC_DONE;
645                         return LDB_ERR_OPERATIONS_ERROR;
646                 }
647
648                 ares->message = ldb_msg_new(ares);
649                 if (!ares->message) {
650                         handle->status = LDB_ERR_OPERATIONS_ERROR;
651                         handle->state = LDB_ASYNC_DONE;
652                         talloc_free(ares);
653                         return LDB_ERR_OPERATIONS_ERROR;
654                 }
655
656
657                 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
658                 if (dn == NULL) {
659                         talloc_free(ares);
660                         return LDB_ERR_OPERATIONS_ERROR;
661                 }
662
663                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
664                 talloc_free(dn);
665                 if (ret == 0) {
666                         /* the record has disappeared? yes, this can happen */
667                         talloc_free(ares);
668                         continue;
669                 }
670
671                 if (ret == -1) {
672                         /* an internal error */
673                         talloc_free(ares);
674                         return LDB_ERR_OPERATIONS_ERROR;
675                 }
676
677                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
678                         talloc_free(ares);
679                         continue;
680                 }
681
682                 /* filter the attributes that the user wants */
683                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
684
685                 if (ret == -1) {
686                         handle->status = LDB_ERR_OPERATIONS_ERROR;
687                         handle->state = LDB_ASYNC_DONE;
688                         talloc_free(ares);
689                         return LDB_ERR_OPERATIONS_ERROR;
690                 }
691
692                 ares->type = LDB_REPLY_ENTRY;
693                 handle->state = LDB_ASYNC_PENDING;
694                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
695
696                 if (handle->status != LDB_SUCCESS) {
697                         handle->state = LDB_ASYNC_DONE;
698                         return handle->status;
699                 }
700         }
701
702         return LDB_SUCCESS;
703 }
704
705 /*
706   search the database with a LDAP-like expression using indexes
707   returns -1 if an indexed search is not possible, in which
708   case the caller should call ltdb_search_full() 
709 */
710 int ltdb_search_indexed(struct ldb_async_handle *handle)
711 {
712         struct ltdb_async_context *ac = talloc_get_type(handle->private_data, struct ltdb_async_context);
713         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
714         struct dn_list *dn_list;
715         int ret;
716
717         if (ltdb->cache->indexlist->num_elements == 0 && 
718             ac->scope != LDB_SCOPE_BASE) {
719                 /* no index list? must do full search */
720                 return -1;
721         }
722
723         dn_list = talloc(handle, struct dn_list);
724         if (dn_list == NULL) {
725                 return -1;
726         }
727
728         if (ac->scope == LDB_SCOPE_BASE) {
729                 /* with BASE searches only one DN can match */
730                 dn_list->dn = talloc_array(dn_list, char *, 1);
731                 if (dn_list->dn == NULL) {
732                         ldb_oom(ac->module->ldb);
733                         return -1;
734                 }
735                 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
736                 if (dn_list->dn[0] == NULL) {
737                         ldb_oom(ac->module->ldb);
738                         return -1;
739                 }
740                 dn_list->count = 1;
741                 ret = 1;
742         } else {
743                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
744         }
745
746         if (ret == 1) {
747                 /* we've got a candidate list - now filter by the full tree
748                    and extract the needed attributes */
749                 ret = ltdb_index_filter(dn_list, handle);
750                 handle->status = ret;
751                 handle->state = LDB_ASYNC_DONE;
752         }
753
754         talloc_free(dn_list);
755
756         return ret;
757 }
758
759 /*
760   add a index element where this is the first indexed DN for this value
761 */
762 static int ltdb_index_add1_new(struct ldb_context *ldb, 
763                                struct ldb_message *msg,
764                                struct ldb_message_element *el,
765                                char *dn)
766 {
767         struct ldb_message_element *el2;
768
769         /* add another entry */
770         el2 = talloc_realloc(msg, msg->elements, 
771                                struct ldb_message_element, msg->num_elements+1);
772         if (!el2) {
773                 return -1;
774         }
775
776         msg->elements = el2;
777         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
778         if (!msg->elements[msg->num_elements].name) {
779                 return -1;
780         }
781         msg->elements[msg->num_elements].num_values = 0;
782         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
783         if (!msg->elements[msg->num_elements].values) {
784                 return -1;
785         }
786         msg->elements[msg->num_elements].values[0].length = strlen(dn);
787         msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
788         msg->elements[msg->num_elements].num_values = 1;
789         msg->num_elements++;
790
791         return 0;
792 }
793
794
795 /*
796   add a index element where this is not the first indexed DN for this
797   value
798 */
799 static int ltdb_index_add1_add(struct ldb_context *ldb, 
800                                struct ldb_message *msg,
801                                struct ldb_message_element *el,
802                                int idx,
803                                char *dn)
804 {
805         struct ldb_val *v2;
806         unsigned int i;
807
808         /* for multi-valued attributes we can end up with repeats */
809         for (i=0;i<msg->elements[idx].num_values;i++) {
810                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
811                         return 0;
812                 }
813         }
814
815         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
816                               struct ldb_val, 
817                               msg->elements[idx].num_values+1);
818         if (!v2) {
819                 return -1;
820         }
821         msg->elements[idx].values = v2;
822
823         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
824         msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
825         msg->elements[idx].num_values++;
826
827         return 0;
828 }
829
830 /*
831   add an index entry for one message element
832 */
833 static int ltdb_index_add1(struct ldb_module *module, char *dn, 
834                            struct ldb_message_element *el, int v_idx)
835 {
836         struct ldb_context *ldb = module->ldb;
837         struct ldb_message *msg;
838         struct ldb_dn *dn_key;
839         int ret;
840         unsigned int i;
841
842         msg = talloc(module, struct ldb_message);
843         if (msg == NULL) {
844                 errno = ENOMEM;
845                 return -1;
846         }
847
848         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
849         if (!dn_key) {
850                 talloc_free(msg);
851                 errno = ENOMEM;
852                 return -1;
853         }
854         talloc_steal(msg, dn_key);
855
856         ret = ltdb_search_dn1(module, dn_key, msg);
857         if (ret == -1) {
858                 talloc_free(msg);
859                 return -1;
860         }
861
862         if (ret == 0) {
863                 msg->dn = dn_key;
864                 msg->num_elements = 0;
865                 msg->elements = NULL;
866         }
867
868         for (i=0;i<msg->num_elements;i++) {
869                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
870                         break;
871                 }
872         }
873
874         if (i == msg->num_elements) {
875                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
876         } else {
877                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
878         }
879
880         if (ret == 0) {
881                 ret = ltdb_store(module, msg, TDB_REPLACE);
882         }
883
884         talloc_free(msg);
885
886         return ret;
887 }
888
889 static int ltdb_index_add0(struct ldb_module *module, char *dn,
890                            struct ldb_message_element *elements, int num_el)
891 {
892         struct ltdb_private *ltdb = module->private_data;
893         int ret;
894         unsigned int i, j;
895
896         if (dn[0] == '@') {
897                 return 0;
898         }
899
900         if (ltdb->cache->indexlist->num_elements == 0) {
901                 /* no indexed fields */
902                 return 0;
903         }
904
905         for (i = 0; i < num_el; i++) {
906                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
907                                        NULL, LTDB_IDXATTR);
908                 if (ret == -1) {
909                         continue;
910                 }
911                 for (j = 0; j < elements[i].num_values; j++) {
912                         ret = ltdb_index_add1(module, dn, &elements[i], j);
913                         if (ret == -1) {
914                                 talloc_free(dn);
915                                 return -1;
916                         }
917                 }
918         }
919
920         return 0;
921 }
922
923 /*
924   add the index entries for a new record
925   return -1 on failure
926 */
927 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
928 {
929         struct ltdb_private *ltdb = module->private_data;
930         char *dn;
931         int ret;
932
933         dn = ldb_dn_linearize(ltdb, msg->dn);
934         if (dn == NULL) {
935                 return -1;
936         }
937
938         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
939
940         talloc_free(dn);
941
942         return ret;
943 }
944
945
946 /*
947   delete an index entry for one message element
948 */
949 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
950                          struct ldb_message_element *el, int v_idx)
951 {
952         struct ldb_context *ldb = module->ldb;
953         struct ldb_message *msg;
954         struct ldb_dn *dn_key;
955         int ret, i;
956         unsigned int j;
957
958         if (dn[0] == '@') {
959                 return 0;
960         }
961
962         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
963         if (!dn_key) {
964                 return -1;
965         }
966
967         msg = talloc(dn_key, struct ldb_message);
968         if (msg == NULL) {
969                 talloc_free(dn_key);
970                 return -1;
971         }
972
973         ret = ltdb_search_dn1(module, dn_key, msg);
974         if (ret == -1) {
975                 talloc_free(dn_key);
976                 return -1;
977         }
978
979         if (ret == 0) {
980                 /* it wasn't indexed. Did we have an earlier error? If we did then
981                    its gone now */
982                 talloc_free(dn_key);
983                 return 0;
984         }
985
986         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
987         if (i == -1) {
988                 ldb_debug(ldb, LDB_DEBUG_ERROR,
989                                 "ERROR: dn %s not found in %s\n", dn,
990                                 ldb_dn_linearize(dn_key, dn_key));
991                 /* it ain't there. hmmm */
992                 talloc_free(dn_key);
993                 return 0;
994         }
995
996         if (j != msg->elements[i].num_values - 1) {
997                 memmove(&msg->elements[i].values[j], 
998                         &msg->elements[i].values[j+1], 
999                         (msg->elements[i].num_values-(j+1)) * 
1000                         sizeof(msg->elements[i].values[0]));
1001         }
1002         msg->elements[i].num_values--;
1003
1004         if (msg->elements[i].num_values == 0) {
1005                 ret = ltdb_delete_noindex(module, dn_key);
1006         } else {
1007                 ret = ltdb_store(module, msg, TDB_REPLACE);
1008         }
1009
1010         talloc_free(dn_key);
1011
1012         return ret;
1013 }
1014
1015 /*
1016   delete the index entries for a record
1017   return -1 on failure
1018 */
1019 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1020 {
1021         struct ltdb_private *ltdb = module->private_data;
1022         int ret;
1023         char *dn;
1024         unsigned int i, j;
1025
1026         if (ldb_dn_is_special(msg->dn)) {
1027                 return 0;
1028         }
1029
1030         dn = ldb_dn_linearize(ltdb, msg->dn);
1031         if (dn == NULL) {
1032                 return -1;
1033         }
1034
1035         /* find the list of indexed fields */   
1036         if (ltdb->cache->indexlist->num_elements == 0) {
1037                 /* no indexed fields */
1038                 return 0;
1039         }
1040
1041         for (i = 0; i < msg->num_elements; i++) {
1042                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1043                                        NULL, LTDB_IDXATTR);
1044                 if (ret == -1) {
1045                         continue;
1046                 }
1047                 for (j = 0; j < msg->elements[i].num_values; j++) {
1048                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1049                         if (ret == -1) {
1050                                 talloc_free(dn);
1051                                 return -1;
1052                         }
1053                 }
1054         }
1055
1056         talloc_free(dn);
1057         return 0;
1058 }
1059
1060
1061 /*
1062   traversal function that deletes all @INDEX records
1063 */
1064 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1065 {
1066         const char *dn = "DN=" LTDB_INDEX ":";
1067         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1068                 return tdb_delete(tdb, key);
1069         }
1070         return 0;
1071 }
1072
1073 /*
1074   traversal function that adds @INDEX records during a re index
1075 */
1076 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1077 {
1078         struct ldb_module *module = state;
1079         struct ldb_message *msg;
1080         char *dn = NULL;
1081         int ret;
1082         TDB_DATA key2;
1083
1084         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1085             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1086                 return 0;
1087         }
1088
1089         msg = talloc(module, struct ldb_message);
1090         if (msg == NULL) {
1091                 return -1;
1092         }
1093
1094         ret = ltdb_unpack_data(module, &data, msg);
1095         if (ret != 0) {
1096                 talloc_free(msg);
1097                 return -1;
1098         }
1099
1100         /* check if the DN key has changed, perhaps due to the 
1101            case insensitivity of an element changing */
1102         key2 = ltdb_key(module, msg->dn);
1103         if (key2.dptr == NULL) {
1104                 /* probably a corrupt record ... darn */
1105                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1106                                                         ldb_dn_linearize(msg, msg->dn));
1107                 talloc_free(msg);
1108                 return 0;
1109         }
1110         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1111                 tdb_delete(tdb, key);
1112                 tdb_store(tdb, key2, data, 0);
1113         }
1114         talloc_free(key2.dptr);
1115
1116         if (msg->dn == NULL) {
1117                 dn = (char *)key.dptr + 3;
1118         } else {
1119                 dn = ldb_dn_linearize(msg->dn, msg->dn);
1120         }
1121
1122         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1123
1124         talloc_free(msg);
1125
1126         return ret;
1127 }
1128
1129 /*
1130   force a complete reindex of the database
1131 */
1132 int ltdb_reindex(struct ldb_module *module)
1133 {
1134         struct ltdb_private *ltdb = module->private_data;
1135         int ret;
1136
1137         if (ltdb_cache_reload(module) != 0) {
1138                 return -1;
1139         }
1140
1141         /* first traverse the database deleting any @INDEX records */
1142         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1143         if (ret == -1) {
1144                 return -1;
1145         }
1146
1147         /* now traverse adding any indexes for normal LDB records */
1148         ret = tdb_traverse(ltdb->tdb, re_index, module);
1149         if (ret == -1) {
1150                 return -1;
1151         }
1152
1153         return 0;
1154 }