4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
106 const struct ldb_attrib_handler *h;
109 attr_folded = ldb_casefold(ldb, attr);
114 h = ldb_attrib_handler(ldb, attr);
115 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116 /* canonicalisation can be refused. For example,
117 a attribute that takes wildcards will refuse to canonicalise
118 if the value contains a wildcard */
119 talloc_free(attr_folded);
122 if (ldb_should_b64_encode(&v)) {
123 char *vstr = ldb_base64_encode(ldb, v.data, v.length);
124 if (!vstr) return NULL;
125 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
127 if (v.data != value->data) {
130 talloc_free(attr_folded);
131 if (dn == NULL) return NULL;
135 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
136 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138 if (v.data != value->data) {
141 talloc_free(attr_folded);
144 ret = ldb_dn_explode(ldb, dn);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
232 for (j=0;j<el->num_values;j++) {
233 list->dn[list->count] =
234 talloc_strdup(list->dn, (char *)el->values[j].data);
235 if (!list->dn[list->count]) {
244 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
253 return a list of dn's that might match a simple indexed search on
254 the special objectclass attribute
256 static int ltdb_index_dn_objectclass(struct ldb_module *module,
257 struct ldb_parse_tree *tree,
258 const struct ldb_message *index_list,
259 struct dn_list *list)
261 struct ldb_context *ldb = module->ldb;
264 const char *target = tree->u.equality.value.data;
265 const char **subclasses;
270 ret = ltdb_index_dn_simple(module, tree, index_list, list);
272 subclasses = ldb_subclass_list(module->ldb, target);
274 if (subclasses == NULL) {
278 for (i=0;subclasses[i];i++) {
279 struct ldb_parse_tree tree2;
280 struct dn_list *list2;
281 tree2.operation = LDB_OP_EQUALITY;
282 tree2.u.equality.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
283 if (!tree2.u.equality.attr) {
286 tree2.u.equality.value.data = talloc_strdup(tree2.u.equality.attr, subclasses[i]);
287 if (tree2.u.equality.value.data == NULL) {
290 tree2.u.equality.value.length = strlen(subclasses[i]);
291 list2 = talloc(list, struct dn_list);
295 if (ltdb_index_dn_objectclass(module, &tree2,
296 index_list, list2) == 1) {
297 if (list->count == 0) {
301 list_union(ldb, list, list2);
305 talloc_free(tree2.u.equality.attr);
312 return a list of dn's that might match a leaf indexed search
314 static int ltdb_index_dn_leaf(struct ldb_module *module,
315 struct ldb_parse_tree *tree,
316 const struct ldb_message *index_list,
317 struct dn_list *list)
319 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
320 return ltdb_index_dn_objectclass(module, tree, index_list, list);
322 return ltdb_index_dn_simple(module, tree, index_list, list);
329 relies on the lists being sorted
331 static int list_intersect(struct ldb_context *ldb,
332 struct dn_list *list, const struct dn_list *list2)
334 struct dn_list *list3;
337 if (list->count == 0 || list2->count == 0) {
342 list3 = talloc(ldb, struct dn_list);
347 list3->dn = talloc_array(list3, char *, list->count);
354 for (i=0;i<list->count;i++) {
355 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
356 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
357 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
360 talloc_free(list->dn[i]);
364 talloc_free(list->dn);
365 list->dn = talloc_steal(list, list3->dn);
366 list->count = list3->count;
376 relies on the lists being sorted
378 static int list_union(struct ldb_context *ldb,
379 struct dn_list *list, const struct dn_list *list2)
383 unsigned int count = list->count;
385 if (list->count == 0 && list2->count == 0) {
390 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
396 for (i=0;i<list2->count;i++) {
397 if (ldb_list_find(list2->dn[i], list->dn, count,
398 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
399 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
400 if (!list->dn[list->count]) {
407 if (list->count != count) {
408 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
414 static int ltdb_index_dn(struct ldb_module *module,
415 struct ldb_parse_tree *tree,
416 const struct ldb_message *index_list,
417 struct dn_list *list);
423 static int ltdb_index_dn_or(struct ldb_module *module,
424 struct ldb_parse_tree *tree,
425 const struct ldb_message *index_list,
426 struct dn_list *list)
428 struct ldb_context *ldb = module->ldb;
436 for (i=0;i<tree->u.list.num_elements;i++) {
437 struct dn_list *list2;
440 list2 = talloc(module, struct dn_list);
445 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
458 talloc_free(list->dn);
465 list->dn = talloc_steal(list, list2->dn);
466 list->count = list2->count;
468 if (list_union(ldb, list, list2) == -1) {
477 if (list->count == 0) {
488 static int ltdb_index_dn_not(struct ldb_module *module,
489 struct ldb_parse_tree *tree,
490 const struct ldb_message *index_list,
491 struct dn_list *list)
493 /* the only way to do an indexed not would be if we could
494 negate the not via another not or if we knew the total
495 number of database elements so we could know that the
496 existing expression covered the whole database.
498 instead, we just give up, and rely on a full index scan
499 (unless an outer & manages to reduce the list)
505 AND two index results
507 static int ltdb_index_dn_and(struct ldb_module *module,
508 struct ldb_parse_tree *tree,
509 const struct ldb_message *index_list,
510 struct dn_list *list)
512 struct ldb_context *ldb = module->ldb;
520 for (i=0;i<tree->u.list.num_elements;i++) {
521 struct dn_list *list2;
524 list2 = talloc(module, struct dn_list);
529 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
533 talloc_free(list->dn);
545 talloc_free(list->dn);
546 list->dn = talloc_steal(list, list2->dn);
547 list->count = list2->count;
549 if (list_intersect(ldb, list, list2) == -1) {
557 if (list->count == 0) {
558 talloc_free(list->dn);
567 return a list of dn's that might match a indexed search or
568 -1 if an error. return 0 for no matches, or 1 for matches
570 static int ltdb_index_dn(struct ldb_module *module,
571 struct ldb_parse_tree *tree,
572 const struct ldb_message *index_list,
573 struct dn_list *list)
577 switch (tree->operation) {
579 ret = ltdb_index_dn_and(module, tree, index_list, list);
583 ret = ltdb_index_dn_or(module, tree, index_list, list);
587 ret = ltdb_index_dn_not(module, tree, index_list, list);
590 case LDB_OP_EQUALITY:
591 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
594 case LDB_OP_SUBSTRING:
599 case LDB_OP_EXTENDED:
600 /* we can't index with fancy bitops yet */
609 filter a candidate dn_list from an indexed search into a set of results
610 extracting just the given attributes
612 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
613 const struct ldb_dn *base,
614 enum ldb_scope scope,
615 const struct dn_list *dn_list,
616 const char * const attrs[], struct ldb_message ***res)
621 for (i = 0; i < dn_list->count; i++) {
622 struct ldb_message *msg;
626 msg = talloc(module, struct ldb_message);
631 dn = ldb_dn_explode(msg, dn_list->dn[i]);
637 ret = ltdb_search_dn1(module, dn, msg);
640 /* the record has disappeared? yes, this can happen */
646 /* an internal error */
652 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
653 ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
665 search the database with a LDAP-like expression using indexes
666 returns -1 if an indexed search is not possible, in which
667 case the caller should call ltdb_search_full()
669 int ltdb_search_indexed(struct ldb_module *module,
670 const struct ldb_dn *base,
671 enum ldb_scope scope,
672 struct ldb_parse_tree *tree,
673 const char * const attrs[], struct ldb_message ***res)
675 struct ltdb_private *ltdb = module->private_data;
676 struct dn_list *dn_list;
679 if (ltdb->cache->indexlist->num_elements == 0 &&
680 scope != LDB_SCOPE_BASE) {
681 /* no index list? must do full search */
685 dn_list = talloc(module, struct dn_list);
686 if (dn_list == NULL) {
690 if (scope == LDB_SCOPE_BASE) {
691 /* with BASE searches only one DN can match */
692 char *dn = ldb_dn_linearize(dn_list, base);
700 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
704 /* we've got a candidate list - now filter by the full tree
705 and extract the needed attributes */
706 ret = ldb_index_filter(module, tree, base, scope, dn_list,
710 talloc_free(dn_list);
716 add a index element where this is the first indexed DN for this value
718 static int ltdb_index_add1_new(struct ldb_context *ldb,
719 struct ldb_message *msg,
720 struct ldb_message_element *el,
723 struct ldb_message_element *el2;
725 /* add another entry */
726 el2 = talloc_realloc(msg, msg->elements,
727 struct ldb_message_element, msg->num_elements+1);
733 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
734 if (!msg->elements[msg->num_elements].name) {
737 msg->elements[msg->num_elements].num_values = 0;
738 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
739 if (!msg->elements[msg->num_elements].values) {
742 msg->elements[msg->num_elements].values[0].length = strlen(dn);
743 msg->elements[msg->num_elements].values[0].data = dn;
744 msg->elements[msg->num_elements].num_values = 1;
752 add a index element where this is not the first indexed DN for this
755 static int ltdb_index_add1_add(struct ldb_context *ldb,
756 struct ldb_message *msg,
757 struct ldb_message_element *el,
764 /* for multi-valued attributes we can end up with repeats */
765 for (i=0;i<msg->elements[idx].num_values;i++) {
766 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
771 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
773 msg->elements[idx].num_values+1);
777 msg->elements[idx].values = v2;
779 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
780 msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
781 msg->elements[idx].num_values++;
787 add an index entry for one message element
789 static int ltdb_index_add1(struct ldb_module *module, char *dn,
790 struct ldb_message_element *el, int v_idx)
792 struct ldb_context *ldb = module->ldb;
793 struct ldb_message *msg;
794 struct ldb_dn *dn_key;
798 msg = talloc(module, struct ldb_message);
804 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
810 talloc_steal(msg, dn_key);
812 ret = ltdb_search_dn1(module, dn_key, msg);
820 msg->num_elements = 0;
821 msg->elements = NULL;
824 for (i=0;i<msg->num_elements;i++) {
825 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
830 if (i == msg->num_elements) {
831 ret = ltdb_index_add1_new(ldb, msg, el, dn);
833 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
837 ret = ltdb_store(module, msg, TDB_REPLACE);
845 static int ltdb_index_add0(struct ldb_module *module, char *dn,
846 struct ldb_message_element *elements, int num_el)
848 struct ltdb_private *ltdb = module->private_data;
856 if (ltdb->cache->indexlist->num_elements == 0) {
857 /* no indexed fields */
861 for (i = 0; i < num_el; i++) {
862 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
867 for (j = 0; j < elements[i].num_values; j++) {
868 ret = ltdb_index_add1(module, dn, &elements[i], j);
880 add the index entries for a new record
883 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
885 struct ltdb_private *ltdb = module->private_data;
889 dn = ldb_dn_linearize(ltdb, msg->dn);
894 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
903 delete an index entry for one message element
905 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
906 struct ldb_message_element *el, int v_idx)
908 struct ldb_context *ldb = module->ldb;
909 struct ldb_message *msg;
910 struct ldb_dn *dn_key;
918 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
923 msg = talloc(dn_key, struct ldb_message);
929 ret = ltdb_search_dn1(module, dn_key, msg);
936 /* it wasn't indexed. Did we have an earlier error? If we did then
942 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
944 ldb_debug(ldb, LDB_DEBUG_ERROR,
945 "ERROR: dn %s not found in %s\n", dn,
946 ldb_dn_linearize(dn_key, dn_key));
947 /* it ain't there. hmmm */
952 if (j != msg->elements[i].num_values - 1) {
953 memmove(&msg->elements[i].values[j],
954 &msg->elements[i].values[j+1],
955 (msg->elements[i].num_values-(j+1)) *
956 sizeof(msg->elements[i].values[0]));
958 msg->elements[i].num_values--;
960 if (msg->elements[i].num_values == 0) {
961 ret = ltdb_delete_noindex(module, dn_key);
963 ret = ltdb_store(module, msg, TDB_REPLACE);
972 delete the index entries for a record
975 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
977 struct ltdb_private *ltdb = module->private_data;
982 if (ldb_dn_is_special(msg->dn)) {
986 dn = ldb_dn_linearize(ltdb, msg->dn);
991 /* find the list of indexed fields */
992 if (ltdb->cache->indexlist->num_elements == 0) {
993 /* no indexed fields */
997 for (i = 0; i < msg->num_elements; i++) {
998 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1003 for (j = 0; j < msg->elements[i].num_values; j++) {
1004 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1018 traversal function that deletes all @INDEX records
1020 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1022 const char *dn = "DN=" LTDB_INDEX ":";
1023 if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
1024 return tdb_delete(tdb, key);
1030 traversal function that adds @INDEX records during a re index
1032 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1034 struct ldb_module *module = state;
1035 struct ldb_message *msg;
1040 if (strncmp(key.dptr, "DN=@", 4) == 0 ||
1041 strncmp(key.dptr, "DN=", 3) != 0) {
1045 msg = talloc(module, struct ldb_message);
1050 ret = ltdb_unpack_data(module, &data, msg);
1056 /* check if the DN key has changed, perhaps due to the
1057 case insensitivity of an element changing */
1058 key2 = ltdb_key(module, msg->dn);
1059 if (key2.dptr == NULL) {
1060 /* probably a corrupt record ... darn */
1061 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1062 ldb_dn_linearize(msg, msg->dn));
1066 if (strcmp(key2.dptr, key.dptr) != 0) {
1067 tdb_delete(tdb, key);
1068 tdb_store(tdb, key2, data, 0);
1070 talloc_free(key2.dptr);
1072 if (msg->dn == NULL) {
1075 dn = ldb_dn_linearize(msg->dn, msg->dn);
1078 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1086 force a complete reindex of the database
1088 int ltdb_reindex(struct ldb_module *module)
1090 struct ltdb_private *ltdb = module->private_data;
1093 if (ltdb_cache_reload(module) != 0) {
1097 /* first traverse the database deleting any @INDEX records */
1098 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1104 /* now traverse adding any indexes for normal LDB records */
1105 ret = tdb_traverse(ltdb->tdb, re_index, module);