4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
35 #include "ldb/include/includes.h"
37 #include "ldb/ldb_tdb/ldb_tdb.h"
40 find an element in a list, using the given comparison function and
41 assuming that the list is already sorted using comp_fn
43 return -1 if not found, or the index of the first occurance of needle if found
45 static int ldb_list_find(const void *needle,
46 const void *base, size_t nmemb, size_t size,
47 comparison_fn_t comp_fn)
49 const char *base_p = (const char *)base;
50 size_t min_i, max_i, test_i;
59 while (min_i < max_i) {
62 test_i = (min_i + max_i) / 2;
63 /* the following cast looks strange, but is
64 correct. The key to understanding it is that base_p
65 is a pointer to an array of pointers, so we have to
66 dereference it after casting to void **. The strange
67 const in the middle gives us the right type of pointer
68 after the dereference (tridge) */
69 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71 /* scan back for first element */
73 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
89 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
102 return the dn key to be used for an index
105 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
106 const char *attr, const struct ldb_val *value)
111 const struct ldb_attrib_handler *h;
114 attr_folded = ldb_attr_casefold(ldb, attr);
119 h = ldb_attrib_handler(ldb, attr);
120 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
121 /* canonicalisation can be refused. For example,
122 a attribute that takes wildcards will refuse to canonicalise
123 if the value contains a wildcard */
124 talloc_free(attr_folded);
127 if (ldb_should_b64_encode(&v)) {
128 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
129 if (!vstr) return NULL;
130 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
132 if (v.data != value->data) {
135 talloc_free(attr_folded);
136 if (dn == NULL) return NULL;
140 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
141 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
143 if (v.data != value->data) {
146 talloc_free(attr_folded);
149 ret = ldb_dn_explode(ldb, dn);
155 see if a attribute value is in the list of indexed attributes
157 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
158 unsigned int *v_idx, const char *key)
161 for (i=0;i<msg->num_elements;i++) {
162 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
163 const struct ldb_message_element *el =
165 for (j=0;j<el->num_values;j++) {
166 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
178 /* used in sorting dn lists */
179 static int list_cmp(const char **s1, const char **s2)
181 return strcmp(*s1, *s2);
185 return a list of dn's that might match a simple indexed search or
187 static int ltdb_index_dn_simple(struct ldb_module *module,
188 const struct ldb_parse_tree *tree,
189 const struct ldb_message *index_list,
190 struct dn_list *list)
192 struct ldb_context *ldb = module->ldb;
196 struct ldb_message *msg;
201 /* if the attribute isn't in the list of indexed attributes then
202 this node needs a full search */
203 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
207 /* the attribute is indexed. Pull the list of DNs that match the
209 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
212 msg = talloc(list, struct ldb_message);
217 ret = ltdb_search_dn1(module, dn, msg);
219 if (ret == 0 || ret == -1) {
223 for (i=0;i<msg->num_elements;i++) {
224 struct ldb_message_element *el;
226 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
230 el = &msg->elements[i];
232 list->dn = talloc_array(list, char *, el->num_values);
238 for (j=0;j<el->num_values;j++) {
239 list->dn[list->count] =
240 talloc_strdup(list->dn, (char *)el->values[j].data);
241 if (!list->dn[list->count]) {
251 if (list->count > 1) {
252 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
259 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
262 return a list of dn's that might match a simple indexed search on
263 the special objectclass attribute
265 static int ltdb_index_dn_objectclass(struct ldb_module *module,
266 const struct ldb_parse_tree *tree,
267 const struct ldb_message *index_list,
268 struct dn_list *list)
270 struct ldb_context *ldb = module->ldb;
273 const char *target = (const char *)tree->u.equality.value.data;
274 const char **subclasses;
279 ret = ltdb_index_dn_simple(module, tree, index_list, list);
281 subclasses = ldb_subclass_list(module->ldb, target);
283 if (subclasses == NULL) {
287 for (i=0;subclasses[i];i++) {
288 struct ldb_parse_tree tree2;
289 struct dn_list *list2;
290 tree2.operation = LDB_OP_EQUALITY;
291 tree2.u.equality.attr = LTDB_OBJECTCLASS;
292 if (!tree2.u.equality.attr) {
295 tree2.u.equality.value.data =
296 (uint8_t *)talloc_strdup(list, subclasses[i]);
297 if (tree2.u.equality.value.data == NULL) {
300 tree2.u.equality.value.length = strlen(subclasses[i]);
301 list2 = talloc(list, struct dn_list);
303 talloc_free(tree2.u.equality.value.data);
306 if (ltdb_index_dn_objectclass(module, &tree2,
307 index_list, list2) == 1) {
308 if (list->count == 0) {
312 list_union(ldb, list, list2);
316 talloc_free(tree2.u.equality.value.data);
323 return a list of dn's that might match a leaf indexed search
325 static int ltdb_index_dn_leaf(struct ldb_module *module,
326 const struct ldb_parse_tree *tree,
327 const struct ldb_message *index_list,
328 struct dn_list *list)
330 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
331 return ltdb_index_dn_objectclass(module, tree, index_list, list);
333 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
334 list->dn = talloc_array(list, char *, 1);
335 if (list->dn == NULL) {
336 ldb_oom(module->ldb);
339 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
340 if (list->dn[0] == NULL) {
341 ldb_oom(module->ldb);
347 return ltdb_index_dn_simple(module, tree, index_list, list);
354 relies on the lists being sorted
356 static int list_intersect(struct ldb_context *ldb,
357 struct dn_list *list, const struct dn_list *list2)
359 struct dn_list *list3;
362 if (list->count == 0 || list2->count == 0) {
367 list3 = talloc(ldb, struct dn_list);
372 list3->dn = talloc_array(list3, char *, list->count);
379 for (i=0;i<list->count;i++) {
380 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
381 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
382 list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
385 talloc_free(list->dn[i]);
389 talloc_free(list->dn);
390 list->dn = talloc_move(list, &list3->dn);
391 list->count = list3->count;
401 relies on the lists being sorted
403 static int list_union(struct ldb_context *ldb,
404 struct dn_list *list, const struct dn_list *list2)
408 unsigned int count = list->count;
410 if (list->count == 0 && list2->count == 0) {
415 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
421 for (i=0;i<list2->count;i++) {
422 if (ldb_list_find(list2->dn[i], list->dn, count,
423 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
424 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
425 if (!list->dn[list->count]) {
432 if (list->count != count) {
433 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
439 static int ltdb_index_dn(struct ldb_module *module,
440 const struct ldb_parse_tree *tree,
441 const struct ldb_message *index_list,
442 struct dn_list *list);
448 static int ltdb_index_dn_or(struct ldb_module *module,
449 const struct ldb_parse_tree *tree,
450 const struct ldb_message *index_list,
451 struct dn_list *list)
453 struct ldb_context *ldb = module->ldb;
461 for (i=0;i<tree->u.list.num_elements;i++) {
462 struct dn_list *list2;
465 list2 = talloc(module, struct dn_list);
470 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
483 talloc_free(list->dn);
490 list->dn = talloc_move(list, &list2->dn);
491 list->count = list2->count;
493 if (list_union(ldb, list, list2) == -1) {
502 if (list->count == 0) {
513 static int ltdb_index_dn_not(struct ldb_module *module,
514 const struct ldb_parse_tree *tree,
515 const struct ldb_message *index_list,
516 struct dn_list *list)
518 /* the only way to do an indexed not would be if we could
519 negate the not via another not or if we knew the total
520 number of database elements so we could know that the
521 existing expression covered the whole database.
523 instead, we just give up, and rely on a full index scan
524 (unless an outer & manages to reduce the list)
530 AND two index results
532 static int ltdb_index_dn_and(struct ldb_module *module,
533 const struct ldb_parse_tree *tree,
534 const struct ldb_message *index_list,
535 struct dn_list *list)
537 struct ldb_context *ldb = module->ldb;
545 for (i=0;i<tree->u.list.num_elements;i++) {
546 struct dn_list *list2;
549 list2 = talloc(module, struct dn_list);
554 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
558 talloc_free(list->dn);
570 talloc_free(list->dn);
571 list->dn = talloc_move(list, &list2->dn);
572 list->count = list2->count;
574 if (list_intersect(ldb, list, list2) == -1) {
582 if (list->count == 0) {
583 talloc_free(list->dn);
592 return a list of dn's that might match a indexed search or
593 -1 if an error. return 0 for no matches, or 1 for matches
595 static int ltdb_index_dn(struct ldb_module *module,
596 const struct ldb_parse_tree *tree,
597 const struct ldb_message *index_list,
598 struct dn_list *list)
602 switch (tree->operation) {
604 ret = ltdb_index_dn_and(module, tree, index_list, list);
608 ret = ltdb_index_dn_or(module, tree, index_list, list);
612 ret = ltdb_index_dn_not(module, tree, index_list, list);
615 case LDB_OP_EQUALITY:
616 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
619 case LDB_OP_SUBSTRING:
624 case LDB_OP_EXTENDED:
625 /* we can't index with fancy bitops yet */
634 filter a candidate dn_list from an indexed search into a set of results
635 extracting just the given attributes
637 static int ltdb_index_filter(const struct dn_list *dn_list,
638 struct ldb_handle *handle)
640 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
641 struct ldb_reply *ares = NULL;
645 return LDB_ERR_OPERATIONS_ERROR;
648 for (i = 0; i < dn_list->count; i++) {
652 ares = talloc_zero(ac, struct ldb_reply);
654 handle->status = LDB_ERR_OPERATIONS_ERROR;
655 handle->state = LDB_ASYNC_DONE;
656 return LDB_ERR_OPERATIONS_ERROR;
659 ares->message = ldb_msg_new(ares);
660 if (!ares->message) {
661 handle->status = LDB_ERR_OPERATIONS_ERROR;
662 handle->state = LDB_ASYNC_DONE;
664 return LDB_ERR_OPERATIONS_ERROR;
668 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
671 return LDB_ERR_OPERATIONS_ERROR;
674 ret = ltdb_search_dn1(ac->module, dn, ares->message);
677 /* the record has disappeared? yes, this can happen */
683 /* an internal error */
685 return LDB_ERR_OPERATIONS_ERROR;
688 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
693 /* filter the attributes that the user wants */
694 ret = ltdb_filter_attrs(ares->message, ac->attrs);
697 handle->status = LDB_ERR_OPERATIONS_ERROR;
698 handle->state = LDB_ASYNC_DONE;
700 return LDB_ERR_OPERATIONS_ERROR;
703 ares->type = LDB_REPLY_ENTRY;
704 handle->state = LDB_ASYNC_PENDING;
705 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
707 if (handle->status != LDB_SUCCESS) {
708 handle->state = LDB_ASYNC_DONE;
709 return handle->status;
717 search the database with a LDAP-like expression using indexes
718 returns -1 if an indexed search is not possible, in which
719 case the caller should call ltdb_search_full()
721 int ltdb_search_indexed(struct ldb_handle *handle)
723 struct ltdb_context *ac;
724 struct ltdb_private *ltdb;
725 struct dn_list *dn_list;
728 if (!(ac = talloc_get_type(handle->private_data,
729 struct ltdb_context)) ||
730 !(ltdb = talloc_get_type(ac->module->private_data,
731 struct ltdb_private))) {
735 if (ltdb->cache->indexlist->num_elements == 0 &&
736 ac->scope != LDB_SCOPE_BASE) {
737 /* no index list? must do full search */
741 dn_list = talloc(handle, struct dn_list);
742 if (dn_list == NULL) {
746 if (ac->scope == LDB_SCOPE_BASE) {
747 /* with BASE searches only one DN can match */
748 dn_list->dn = talloc_array(dn_list, char *, 1);
749 if (dn_list->dn == NULL) {
750 ldb_oom(ac->module->ldb);
753 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
754 if (dn_list->dn[0] == NULL) {
755 ldb_oom(ac->module->ldb);
761 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
765 /* we've got a candidate list - now filter by the full tree
766 and extract the needed attributes */
767 ret = ltdb_index_filter(dn_list, handle);
768 handle->status = ret;
769 handle->state = LDB_ASYNC_DONE;
772 talloc_free(dn_list);
778 add a index element where this is the first indexed DN for this value
780 static int ltdb_index_add1_new(struct ldb_context *ldb,
781 struct ldb_message *msg,
782 struct ldb_message_element *el,
785 struct ldb_message_element *el2;
787 /* add another entry */
788 el2 = talloc_realloc(msg, msg->elements,
789 struct ldb_message_element, msg->num_elements+1);
795 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
796 if (!msg->elements[msg->num_elements].name) {
799 msg->elements[msg->num_elements].num_values = 0;
800 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
801 if (!msg->elements[msg->num_elements].values) {
804 msg->elements[msg->num_elements].values[0].length = strlen(dn);
805 msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
806 msg->elements[msg->num_elements].num_values = 1;
814 add a index element where this is not the first indexed DN for this
817 static int ltdb_index_add1_add(struct ldb_context *ldb,
818 struct ldb_message *msg,
819 struct ldb_message_element *el,
826 /* for multi-valued attributes we can end up with repeats */
827 for (i=0;i<msg->elements[idx].num_values;i++) {
828 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
833 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
835 msg->elements[idx].num_values+1);
839 msg->elements[idx].values = v2;
841 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
842 msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
843 msg->elements[idx].num_values++;
849 add an index entry for one message element
851 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
852 struct ldb_message_element *el, int v_idx)
854 struct ldb_context *ldb = module->ldb;
855 struct ldb_message *msg;
856 struct ldb_dn *dn_key;
860 msg = talloc(module, struct ldb_message);
866 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
872 talloc_steal(msg, dn_key);
874 ret = ltdb_search_dn1(module, dn_key, msg);
882 msg->num_elements = 0;
883 msg->elements = NULL;
886 for (i=0;i<msg->num_elements;i++) {
887 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
892 if (i == msg->num_elements) {
893 ret = ltdb_index_add1_new(ldb, msg, el, dn);
895 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
899 ret = ltdb_store(module, msg, TDB_REPLACE);
907 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
908 struct ldb_message_element *elements, int num_el)
910 struct ltdb_private *ltdb =
911 (struct ltdb_private *)module->private_data;
919 if (ltdb->cache->indexlist->num_elements == 0) {
920 /* no indexed fields */
924 for (i = 0; i < num_el; i++) {
925 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
930 for (j = 0; j < elements[i].num_values; j++) {
931 ret = ltdb_index_add1(module, dn, &elements[i], j);
942 add the index entries for a new record
945 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
947 struct ltdb_private *ltdb =
948 (struct ltdb_private *)module->private_data;
952 dn = ldb_dn_linearize(ltdb, msg->dn);
957 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
966 delete an index entry for one message element
968 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
969 struct ldb_message_element *el, int v_idx)
971 struct ldb_context *ldb = module->ldb;
972 struct ldb_message *msg;
973 struct ldb_dn *dn_key;
981 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
986 msg = talloc(dn_key, struct ldb_message);
992 ret = ltdb_search_dn1(module, dn_key, msg);
999 /* it wasn't indexed. Did we have an earlier error? If we did then
1001 talloc_free(dn_key);
1005 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1007 ldb_debug(ldb, LDB_DEBUG_ERROR,
1008 "ERROR: dn %s not found in %s\n", dn,
1009 ldb_dn_linearize(dn_key, dn_key));
1010 /* it ain't there. hmmm */
1011 talloc_free(dn_key);
1015 if (j != msg->elements[i].num_values - 1) {
1016 memmove(&msg->elements[i].values[j],
1017 &msg->elements[i].values[j+1],
1018 (msg->elements[i].num_values-(j+1)) *
1019 sizeof(msg->elements[i].values[0]));
1021 msg->elements[i].num_values--;
1023 if (msg->elements[i].num_values == 0) {
1024 ret = ltdb_delete_noindex(module, dn_key);
1026 ret = ltdb_store(module, msg, TDB_REPLACE);
1029 talloc_free(dn_key);
1035 delete the index entries for a record
1036 return -1 on failure
1038 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1040 struct ltdb_private *ltdb =
1041 (struct ltdb_private *)module->private_data;
1046 /* find the list of indexed fields */
1047 if (ltdb->cache->indexlist->num_elements == 0) {
1048 /* no indexed fields */
1052 if (ldb_dn_is_special(msg->dn)) {
1056 dn = ldb_dn_linearize(ltdb, msg->dn);
1061 for (i = 0; i < msg->num_elements; i++) {
1062 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1063 NULL, LTDB_IDXATTR);
1067 for (j = 0; j < msg->elements[i].num_values; j++) {
1068 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1082 traversal function that deletes all @INDEX records
1084 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1086 const char *dn = "DN=" LTDB_INDEX ":";
1087 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1088 return tdb_delete(tdb, key);
1094 traversal function that adds @INDEX records during a re index
1096 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1098 struct ldb_module *module = (struct ldb_module *)state;
1099 struct ldb_message *msg;
1104 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1105 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1109 msg = talloc(module, struct ldb_message);
1114 ret = ltdb_unpack_data(module, &data, msg);
1120 /* check if the DN key has changed, perhaps due to the
1121 case insensitivity of an element changing */
1122 key2 = ltdb_key(module, msg->dn);
1123 if (key2.dptr == NULL) {
1124 /* probably a corrupt record ... darn */
1125 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1126 ldb_dn_linearize(msg, msg->dn));
1130 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1131 tdb_delete(tdb, key);
1132 tdb_store(tdb, key2, data, 0);
1134 talloc_free(key2.dptr);
1136 if (msg->dn == NULL) {
1137 dn = (char *)key.dptr + 3;
1139 if (!(dn = ldb_dn_linearize(msg->dn, msg->dn))) {
1145 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1153 force a complete reindex of the database
1155 int ltdb_reindex(struct ldb_module *module)
1157 struct ltdb_private *ltdb =
1158 (struct ltdb_private *)module->private_data;
1161 if (ltdb_cache_reload(module) != 0) {
1165 /* first traverse the database deleting any @INDEX records */
1166 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1171 /* now traverse adding any indexes for normal LDB records */
1172 ret = tdb_traverse(ltdb->tdb, re_index, module);