4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/includes.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
106 const struct ldb_attrib_handler *h;
109 attr_folded = ldb_casefold(ldb, attr);
114 h = ldb_attrib_handler(ldb, attr);
115 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116 /* canonicalisation can be refused. For example,
117 a attribute that takes wildcards will refuse to canonicalise
118 if the value contains a wildcard */
119 talloc_free(attr_folded);
122 if (ldb_should_b64_encode(&v)) {
123 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124 if (!vstr) return NULL;
125 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
127 if (v.data != value->data) {
130 talloc_free(attr_folded);
131 if (dn == NULL) return NULL;
135 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
136 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138 if (v.data != value->data) {
141 talloc_free(attr_folded);
144 ret = ldb_dn_explode(ldb, dn);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
232 for (j=0;j<el->num_values;j++) {
233 list->dn[list->count] =
234 talloc_strdup(list->dn, (char *)el->values[j].data);
235 if (!list->dn[list->count]) {
244 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
253 return a list of dn's that might match a simple indexed search on
254 the special objectclass attribute
256 static int ltdb_index_dn_objectclass(struct ldb_module *module,
257 struct ldb_parse_tree *tree,
258 const struct ldb_message *index_list,
259 struct dn_list *list)
261 struct ldb_context *ldb = module->ldb;
264 const char *target = (const char *)tree->u.equality.value.data;
265 const char **subclasses;
270 ret = ltdb_index_dn_simple(module, tree, index_list, list);
272 subclasses = ldb_subclass_list(module->ldb, target);
274 if (subclasses == NULL) {
278 for (i=0;subclasses[i];i++) {
279 struct ldb_parse_tree tree2;
280 struct dn_list *list2;
281 tree2.operation = LDB_OP_EQUALITY;
282 tree2.u.equality.attr = LTDB_OBJECTCLASS;
283 if (!tree2.u.equality.attr) {
286 tree2.u.equality.value.data =
287 (uint8_t *)talloc_strdup(list, subclasses[i]);
288 if (tree2.u.equality.value.data == NULL) {
291 tree2.u.equality.value.length = strlen(subclasses[i]);
292 list2 = talloc(list, struct dn_list);
294 talloc_free(tree2.u.equality.value.data);
297 if (ltdb_index_dn_objectclass(module, &tree2,
298 index_list, list2) == 1) {
299 if (list->count == 0) {
303 list_union(ldb, list, list2);
307 talloc_free(tree2.u.equality.value.data);
314 return a list of dn's that might match a leaf indexed search
316 static int ltdb_index_dn_leaf(struct ldb_module *module,
317 struct ldb_parse_tree *tree,
318 const struct ldb_message *index_list,
319 struct dn_list *list)
321 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
322 return ltdb_index_dn_objectclass(module, tree, index_list, list);
324 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
325 list->dn = talloc_array(list, char *, 1);
326 if (list->dn == NULL) {
327 ldb_oom(module->ldb);
330 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
331 if (list->dn[0] == NULL) {
332 ldb_oom(module->ldb);
338 return ltdb_index_dn_simple(module, tree, index_list, list);
345 relies on the lists being sorted
347 static int list_intersect(struct ldb_context *ldb,
348 struct dn_list *list, const struct dn_list *list2)
350 struct dn_list *list3;
353 if (list->count == 0 || list2->count == 0) {
358 list3 = talloc(ldb, struct dn_list);
363 list3->dn = talloc_array(list3, char *, list->count);
370 for (i=0;i<list->count;i++) {
371 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
372 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
373 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
376 talloc_free(list->dn[i]);
380 talloc_free(list->dn);
381 list->dn = talloc_steal(list, list3->dn);
382 list->count = list3->count;
392 relies on the lists being sorted
394 static int list_union(struct ldb_context *ldb,
395 struct dn_list *list, const struct dn_list *list2)
399 unsigned int count = list->count;
401 if (list->count == 0 && list2->count == 0) {
406 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
412 for (i=0;i<list2->count;i++) {
413 if (ldb_list_find(list2->dn[i], list->dn, count,
414 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
415 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
416 if (!list->dn[list->count]) {
423 if (list->count != count) {
424 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
430 static int ltdb_index_dn(struct ldb_module *module,
431 struct ldb_parse_tree *tree,
432 const struct ldb_message *index_list,
433 struct dn_list *list);
439 static int ltdb_index_dn_or(struct ldb_module *module,
440 struct ldb_parse_tree *tree,
441 const struct ldb_message *index_list,
442 struct dn_list *list)
444 struct ldb_context *ldb = module->ldb;
452 for (i=0;i<tree->u.list.num_elements;i++) {
453 struct dn_list *list2;
456 list2 = talloc(module, struct dn_list);
461 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
474 talloc_free(list->dn);
481 list->dn = talloc_steal(list, list2->dn);
482 list->count = list2->count;
484 if (list_union(ldb, list, list2) == -1) {
493 if (list->count == 0) {
504 static int ltdb_index_dn_not(struct ldb_module *module,
505 struct ldb_parse_tree *tree,
506 const struct ldb_message *index_list,
507 struct dn_list *list)
509 /* the only way to do an indexed not would be if we could
510 negate the not via another not or if we knew the total
511 number of database elements so we could know that the
512 existing expression covered the whole database.
514 instead, we just give up, and rely on a full index scan
515 (unless an outer & manages to reduce the list)
521 AND two index results
523 static int ltdb_index_dn_and(struct ldb_module *module,
524 struct ldb_parse_tree *tree,
525 const struct ldb_message *index_list,
526 struct dn_list *list)
528 struct ldb_context *ldb = module->ldb;
536 for (i=0;i<tree->u.list.num_elements;i++) {
537 struct dn_list *list2;
540 list2 = talloc(module, struct dn_list);
545 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
549 talloc_free(list->dn);
561 talloc_free(list->dn);
562 list->dn = talloc_steal(list, list2->dn);
563 list->count = list2->count;
565 if (list_intersect(ldb, list, list2) == -1) {
573 if (list->count == 0) {
574 talloc_free(list->dn);
583 return a list of dn's that might match a indexed search or
584 -1 if an error. return 0 for no matches, or 1 for matches
586 static int ltdb_index_dn(struct ldb_module *module,
587 struct ldb_parse_tree *tree,
588 const struct ldb_message *index_list,
589 struct dn_list *list)
593 switch (tree->operation) {
595 ret = ltdb_index_dn_and(module, tree, index_list, list);
599 ret = ltdb_index_dn_or(module, tree, index_list, list);
603 ret = ltdb_index_dn_not(module, tree, index_list, list);
606 case LDB_OP_EQUALITY:
607 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
610 case LDB_OP_SUBSTRING:
615 case LDB_OP_EXTENDED:
616 /* we can't index with fancy bitops yet */
625 filter a candidate dn_list from an indexed search into a set of results
626 extracting just the given attributes
628 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
629 const struct ldb_dn *base,
630 enum ldb_scope scope,
631 const struct dn_list *dn_list,
632 const char * const attrs[], struct ldb_result *res)
636 for (i = 0; i < dn_list->count; i++) {
637 struct ldb_message *msg;
641 msg = talloc(module, struct ldb_message);
643 return LDB_ERR_OTHER;
646 dn = ldb_dn_explode(msg, dn_list->dn[i]);
649 return LDB_ERR_OTHER;
652 ret = ltdb_search_dn1(module, dn, msg);
655 /* the record has disappeared? yes, this can happen */
661 /* an internal error */
663 return LDB_ERR_OTHER;
667 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
668 ret = ltdb_add_attr_results(module, res, msg,
669 attrs, &(res->count), &(res->msgs));
673 return LDB_ERR_OTHER;
681 search the database with a LDAP-like expression using indexes
682 returns -1 if an indexed search is not possible, in which
683 case the caller should call ltdb_search_full()
685 int ltdb_search_indexed(struct ldb_module *module,
686 const struct ldb_dn *base,
687 enum ldb_scope scope,
688 struct ldb_parse_tree *tree,
689 const char * const attrs[], struct ldb_result **res)
691 struct ltdb_private *ltdb = module->private_data;
692 struct dn_list *dn_list;
695 if (ltdb->cache->indexlist->num_elements == 0 &&
696 scope != LDB_SCOPE_BASE) {
697 /* no index list? must do full search */
701 dn_list = talloc(module, struct dn_list);
702 if (dn_list == NULL) {
706 *res = talloc(module, struct ldb_result);
708 return LDB_ERR_OTHER;
712 (*res)->controls = NULL;
714 if (scope == LDB_SCOPE_BASE) {
715 /* with BASE searches only one DN can match */
716 dn_list->dn = talloc_array(dn_list, char *, 1);
717 if (dn_list->dn == NULL) {
718 ldb_oom(module->ldb);
721 dn_list->dn[0] = ldb_dn_linearize(dn_list, base);
722 if (dn_list->dn[0] == NULL) {
723 ldb_oom(module->ldb);
729 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
733 /* we've got a candidate list - now filter by the full tree
734 and extract the needed attributes */
735 ret = ldb_index_filter(module, tree, base, scope, dn_list,
739 talloc_free(dn_list);
745 add a index element where this is the first indexed DN for this value
747 static int ltdb_index_add1_new(struct ldb_context *ldb,
748 struct ldb_message *msg,
749 struct ldb_message_element *el,
752 struct ldb_message_element *el2;
754 /* add another entry */
755 el2 = talloc_realloc(msg, msg->elements,
756 struct ldb_message_element, msg->num_elements+1);
762 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
763 if (!msg->elements[msg->num_elements].name) {
766 msg->elements[msg->num_elements].num_values = 0;
767 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
768 if (!msg->elements[msg->num_elements].values) {
771 msg->elements[msg->num_elements].values[0].length = strlen(dn);
772 msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
773 msg->elements[msg->num_elements].num_values = 1;
781 add a index element where this is not the first indexed DN for this
784 static int ltdb_index_add1_add(struct ldb_context *ldb,
785 struct ldb_message *msg,
786 struct ldb_message_element *el,
793 /* for multi-valued attributes we can end up with repeats */
794 for (i=0;i<msg->elements[idx].num_values;i++) {
795 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
800 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
802 msg->elements[idx].num_values+1);
806 msg->elements[idx].values = v2;
808 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
809 msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
810 msg->elements[idx].num_values++;
816 add an index entry for one message element
818 static int ltdb_index_add1(struct ldb_module *module, char *dn,
819 struct ldb_message_element *el, int v_idx)
821 struct ldb_context *ldb = module->ldb;
822 struct ldb_message *msg;
823 struct ldb_dn *dn_key;
827 msg = talloc(module, struct ldb_message);
833 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
839 talloc_steal(msg, dn_key);
841 ret = ltdb_search_dn1(module, dn_key, msg);
849 msg->num_elements = 0;
850 msg->elements = NULL;
853 for (i=0;i<msg->num_elements;i++) {
854 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
859 if (i == msg->num_elements) {
860 ret = ltdb_index_add1_new(ldb, msg, el, dn);
862 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
866 ret = ltdb_store(module, msg, TDB_REPLACE);
874 static int ltdb_index_add0(struct ldb_module *module, char *dn,
875 struct ldb_message_element *elements, int num_el)
877 struct ltdb_private *ltdb = module->private_data;
885 if (ltdb->cache->indexlist->num_elements == 0) {
886 /* no indexed fields */
890 for (i = 0; i < num_el; i++) {
891 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
896 for (j = 0; j < elements[i].num_values; j++) {
897 ret = ltdb_index_add1(module, dn, &elements[i], j);
909 add the index entries for a new record
912 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
914 struct ltdb_private *ltdb = module->private_data;
918 dn = ldb_dn_linearize(ltdb, msg->dn);
923 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
932 delete an index entry for one message element
934 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
935 struct ldb_message_element *el, int v_idx)
937 struct ldb_context *ldb = module->ldb;
938 struct ldb_message *msg;
939 struct ldb_dn *dn_key;
947 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
952 msg = talloc(dn_key, struct ldb_message);
958 ret = ltdb_search_dn1(module, dn_key, msg);
965 /* it wasn't indexed. Did we have an earlier error? If we did then
971 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
973 ldb_debug(ldb, LDB_DEBUG_ERROR,
974 "ERROR: dn %s not found in %s\n", dn,
975 ldb_dn_linearize(dn_key, dn_key));
976 /* it ain't there. hmmm */
981 if (j != msg->elements[i].num_values - 1) {
982 memmove(&msg->elements[i].values[j],
983 &msg->elements[i].values[j+1],
984 (msg->elements[i].num_values-(j+1)) *
985 sizeof(msg->elements[i].values[0]));
987 msg->elements[i].num_values--;
989 if (msg->elements[i].num_values == 0) {
990 ret = ltdb_delete_noindex(module, dn_key);
992 ret = ltdb_store(module, msg, TDB_REPLACE);
1001 delete the index entries for a record
1002 return -1 on failure
1004 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1006 struct ltdb_private *ltdb = module->private_data;
1011 if (ldb_dn_is_special(msg->dn)) {
1015 dn = ldb_dn_linearize(ltdb, msg->dn);
1020 /* find the list of indexed fields */
1021 if (ltdb->cache->indexlist->num_elements == 0) {
1022 /* no indexed fields */
1026 for (i = 0; i < msg->num_elements; i++) {
1027 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1028 NULL, LTDB_IDXATTR);
1032 for (j = 0; j < msg->elements[i].num_values; j++) {
1033 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1047 traversal function that deletes all @INDEX records
1049 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1051 const char *dn = "DN=" LTDB_INDEX ":";
1052 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1053 return tdb_delete(tdb, key);
1059 traversal function that adds @INDEX records during a re index
1061 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1063 struct ldb_module *module = state;
1064 struct ldb_message *msg;
1069 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1070 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1074 msg = talloc(module, struct ldb_message);
1079 ret = ltdb_unpack_data(module, &data, msg);
1085 /* check if the DN key has changed, perhaps due to the
1086 case insensitivity of an element changing */
1087 key2 = ltdb_key(module, msg->dn);
1088 if (key2.dptr == NULL) {
1089 /* probably a corrupt record ... darn */
1090 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1091 ldb_dn_linearize(msg, msg->dn));
1095 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1096 tdb_delete(tdb, key);
1097 tdb_store(tdb, key2, data, 0);
1099 talloc_free(key2.dptr);
1101 if (msg->dn == NULL) {
1102 dn = (char *)key.dptr + 3;
1104 dn = ldb_dn_linearize(msg->dn, msg->dn);
1107 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1115 force a complete reindex of the database
1117 int ltdb_reindex(struct ldb_module *module)
1119 struct ltdb_private *ltdb = module->private_data;
1122 if (ltdb_cache_reload(module) != 0) {
1126 /* first traverse the database deleting any @INDEX records */
1127 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1132 /* now traverse adding any indexes for normal LDB records */
1133 ret = tdb_traverse(ltdb->tdb, re_index, module);