4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/includes.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
106 const struct ldb_attrib_handler *h;
109 attr_folded = ldb_attr_casefold(ldb, attr);
114 h = ldb_attrib_handler(ldb, attr);
115 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116 /* canonicalisation can be refused. For example,
117 a attribute that takes wildcards will refuse to canonicalise
118 if the value contains a wildcard */
119 talloc_free(attr_folded);
122 if (ldb_should_b64_encode(&v)) {
123 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124 if (!vstr) return NULL;
125 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
127 if (v.data != value->data) {
130 talloc_free(attr_folded);
131 if (dn == NULL) return NULL;
135 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
136 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138 if (v.data != value->data) {
141 talloc_free(attr_folded);
144 ret = ldb_dn_explode(ldb, dn);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
232 for (j=0;j<el->num_values;j++) {
233 list->dn[list->count] =
234 talloc_strdup(list->dn, (char *)el->values[j].data);
235 if (!list->dn[list->count]) {
244 if (list->count > 1) {
245 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
252 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
255 return a list of dn's that might match a simple indexed search on
256 the special objectclass attribute
258 static int ltdb_index_dn_objectclass(struct ldb_module *module,
259 struct ldb_parse_tree *tree,
260 const struct ldb_message *index_list,
261 struct dn_list *list)
263 struct ldb_context *ldb = module->ldb;
266 const char *target = (const char *)tree->u.equality.value.data;
267 const char **subclasses;
272 ret = ltdb_index_dn_simple(module, tree, index_list, list);
274 subclasses = ldb_subclass_list(module->ldb, target);
276 if (subclasses == NULL) {
280 for (i=0;subclasses[i];i++) {
281 struct ldb_parse_tree tree2;
282 struct dn_list *list2;
283 tree2.operation = LDB_OP_EQUALITY;
284 tree2.u.equality.attr = LTDB_OBJECTCLASS;
285 if (!tree2.u.equality.attr) {
288 tree2.u.equality.value.data =
289 (uint8_t *)talloc_strdup(list, subclasses[i]);
290 if (tree2.u.equality.value.data == NULL) {
293 tree2.u.equality.value.length = strlen(subclasses[i]);
294 list2 = talloc(list, struct dn_list);
296 talloc_free(tree2.u.equality.value.data);
299 if (ltdb_index_dn_objectclass(module, &tree2,
300 index_list, list2) == 1) {
301 if (list->count == 0) {
305 list_union(ldb, list, list2);
309 talloc_free(tree2.u.equality.value.data);
316 return a list of dn's that might match a leaf indexed search
318 static int ltdb_index_dn_leaf(struct ldb_module *module,
319 struct ldb_parse_tree *tree,
320 const struct ldb_message *index_list,
321 struct dn_list *list)
323 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
324 return ltdb_index_dn_objectclass(module, tree, index_list, list);
326 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
327 list->dn = talloc_array(list, char *, 1);
328 if (list->dn == NULL) {
329 ldb_oom(module->ldb);
332 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
333 if (list->dn[0] == NULL) {
334 ldb_oom(module->ldb);
340 return ltdb_index_dn_simple(module, tree, index_list, list);
347 relies on the lists being sorted
349 static int list_intersect(struct ldb_context *ldb,
350 struct dn_list *list, const struct dn_list *list2)
352 struct dn_list *list3;
355 if (list->count == 0 || list2->count == 0) {
360 list3 = talloc(ldb, struct dn_list);
365 list3->dn = talloc_array(list3, char *, list->count);
372 for (i=0;i<list->count;i++) {
373 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
374 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
375 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
378 talloc_free(list->dn[i]);
382 talloc_free(list->dn);
383 list->dn = talloc_steal(list, list3->dn);
384 list->count = list3->count;
394 relies on the lists being sorted
396 static int list_union(struct ldb_context *ldb,
397 struct dn_list *list, const struct dn_list *list2)
401 unsigned int count = list->count;
403 if (list->count == 0 && list2->count == 0) {
408 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
414 for (i=0;i<list2->count;i++) {
415 if (ldb_list_find(list2->dn[i], list->dn, count,
416 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
417 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
418 if (!list->dn[list->count]) {
425 if (list->count != count) {
426 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
432 static int ltdb_index_dn(struct ldb_module *module,
433 struct ldb_parse_tree *tree,
434 const struct ldb_message *index_list,
435 struct dn_list *list);
441 static int ltdb_index_dn_or(struct ldb_module *module,
442 struct ldb_parse_tree *tree,
443 const struct ldb_message *index_list,
444 struct dn_list *list)
446 struct ldb_context *ldb = module->ldb;
454 for (i=0;i<tree->u.list.num_elements;i++) {
455 struct dn_list *list2;
458 list2 = talloc(module, struct dn_list);
463 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
476 talloc_free(list->dn);
483 list->dn = talloc_steal(list, list2->dn);
484 list->count = list2->count;
486 if (list_union(ldb, list, list2) == -1) {
495 if (list->count == 0) {
506 static int ltdb_index_dn_not(struct ldb_module *module,
507 struct ldb_parse_tree *tree,
508 const struct ldb_message *index_list,
509 struct dn_list *list)
511 /* the only way to do an indexed not would be if we could
512 negate the not via another not or if we knew the total
513 number of database elements so we could know that the
514 existing expression covered the whole database.
516 instead, we just give up, and rely on a full index scan
517 (unless an outer & manages to reduce the list)
523 AND two index results
525 static int ltdb_index_dn_and(struct ldb_module *module,
526 struct ldb_parse_tree *tree,
527 const struct ldb_message *index_list,
528 struct dn_list *list)
530 struct ldb_context *ldb = module->ldb;
538 for (i=0;i<tree->u.list.num_elements;i++) {
539 struct dn_list *list2;
542 list2 = talloc(module, struct dn_list);
547 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
551 talloc_free(list->dn);
563 talloc_free(list->dn);
564 list->dn = talloc_steal(list, list2->dn);
565 list->count = list2->count;
567 if (list_intersect(ldb, list, list2) == -1) {
575 if (list->count == 0) {
576 talloc_free(list->dn);
585 return a list of dn's that might match a indexed search or
586 -1 if an error. return 0 for no matches, or 1 for matches
588 static int ltdb_index_dn(struct ldb_module *module,
589 struct ldb_parse_tree *tree,
590 const struct ldb_message *index_list,
591 struct dn_list *list)
595 switch (tree->operation) {
597 ret = ltdb_index_dn_and(module, tree, index_list, list);
601 ret = ltdb_index_dn_or(module, tree, index_list, list);
605 ret = ltdb_index_dn_not(module, tree, index_list, list);
608 case LDB_OP_EQUALITY:
609 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
612 case LDB_OP_SUBSTRING:
617 case LDB_OP_EXTENDED:
618 /* we can't index with fancy bitops yet */
627 filter a candidate dn_list from an indexed search into a set of results
628 extracting just the given attributes
630 static int ltdb_index_filter(const struct dn_list *dn_list,
631 struct ldb_async_handle *handle)
633 struct ltdb_async_context *ac = talloc_get_type(handle->private_data, struct ltdb_async_context);
634 struct ldb_async_result *ares = NULL;
637 for (i = 0; i < dn_list->count; i++) {
641 ares = talloc_zero(ac, struct ldb_async_result);
643 handle->status = LDB_ERR_OPERATIONS_ERROR;
644 handle->state = LDB_ASYNC_DONE;
645 return LDB_ERR_OPERATIONS_ERROR;
648 ares->message = ldb_msg_new(ares);
649 if (!ares->message) {
650 handle->status = LDB_ERR_OPERATIONS_ERROR;
651 handle->state = LDB_ASYNC_DONE;
653 return LDB_ERR_OPERATIONS_ERROR;
657 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
660 return LDB_ERR_OPERATIONS_ERROR;
663 ret = ltdb_search_dn1(ac->module, dn, ares->message);
666 /* the record has disappeared? yes, this can happen */
672 /* an internal error */
674 return LDB_ERR_OPERATIONS_ERROR;
677 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
682 /* filter the attributes that the user wants */
683 ret = ltdb_filter_attrs(ares->message, ac->attrs);
686 handle->status = LDB_ERR_OPERATIONS_ERROR;
687 handle->state = LDB_ASYNC_DONE;
689 return LDB_ERR_OPERATIONS_ERROR;
692 ares->type = LDB_REPLY_ENTRY;
693 handle->state = LDB_ASYNC_PENDING;
694 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
696 if (handle->status != LDB_SUCCESS) {
697 handle->state = LDB_ASYNC_DONE;
698 return handle->status;
706 search the database with a LDAP-like expression using indexes
707 returns -1 if an indexed search is not possible, in which
708 case the caller should call ltdb_search_full()
710 int ltdb_search_indexed(struct ldb_async_handle *handle)
712 struct ltdb_async_context *ac = talloc_get_type(handle->private_data, struct ltdb_async_context);
713 struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
714 struct dn_list *dn_list;
717 if (ltdb->cache->indexlist->num_elements == 0 &&
718 ac->scope != LDB_SCOPE_BASE) {
719 /* no index list? must do full search */
723 dn_list = talloc(handle, struct dn_list);
724 if (dn_list == NULL) {
728 if (ac->scope == LDB_SCOPE_BASE) {
729 /* with BASE searches only one DN can match */
730 dn_list->dn = talloc_array(dn_list, char *, 1);
731 if (dn_list->dn == NULL) {
732 ldb_oom(ac->module->ldb);
735 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
736 if (dn_list->dn[0] == NULL) {
737 ldb_oom(ac->module->ldb);
743 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
747 /* we've got a candidate list - now filter by the full tree
748 and extract the needed attributes */
749 ret = ltdb_index_filter(dn_list, handle);
750 handle->status = ret;
751 handle->state = LDB_ASYNC_DONE;
754 talloc_free(dn_list);
760 add a index element where this is the first indexed DN for this value
762 static int ltdb_index_add1_new(struct ldb_context *ldb,
763 struct ldb_message *msg,
764 struct ldb_message_element *el,
767 struct ldb_message_element *el2;
769 /* add another entry */
770 el2 = talloc_realloc(msg, msg->elements,
771 struct ldb_message_element, msg->num_elements+1);
777 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
778 if (!msg->elements[msg->num_elements].name) {
781 msg->elements[msg->num_elements].num_values = 0;
782 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
783 if (!msg->elements[msg->num_elements].values) {
786 msg->elements[msg->num_elements].values[0].length = strlen(dn);
787 msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
788 msg->elements[msg->num_elements].num_values = 1;
796 add a index element where this is not the first indexed DN for this
799 static int ltdb_index_add1_add(struct ldb_context *ldb,
800 struct ldb_message *msg,
801 struct ldb_message_element *el,
808 /* for multi-valued attributes we can end up with repeats */
809 for (i=0;i<msg->elements[idx].num_values;i++) {
810 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
815 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
817 msg->elements[idx].num_values+1);
821 msg->elements[idx].values = v2;
823 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
824 msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
825 msg->elements[idx].num_values++;
831 add an index entry for one message element
833 static int ltdb_index_add1(struct ldb_module *module, char *dn,
834 struct ldb_message_element *el, int v_idx)
836 struct ldb_context *ldb = module->ldb;
837 struct ldb_message *msg;
838 struct ldb_dn *dn_key;
842 msg = talloc(module, struct ldb_message);
848 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
854 talloc_steal(msg, dn_key);
856 ret = ltdb_search_dn1(module, dn_key, msg);
864 msg->num_elements = 0;
865 msg->elements = NULL;
868 for (i=0;i<msg->num_elements;i++) {
869 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
874 if (i == msg->num_elements) {
875 ret = ltdb_index_add1_new(ldb, msg, el, dn);
877 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
881 ret = ltdb_store(module, msg, TDB_REPLACE);
889 static int ltdb_index_add0(struct ldb_module *module, char *dn,
890 struct ldb_message_element *elements, int num_el)
892 struct ltdb_private *ltdb = module->private_data;
900 if (ltdb->cache->indexlist->num_elements == 0) {
901 /* no indexed fields */
905 for (i = 0; i < num_el; i++) {
906 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
911 for (j = 0; j < elements[i].num_values; j++) {
912 ret = ltdb_index_add1(module, dn, &elements[i], j);
924 add the index entries for a new record
927 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
929 struct ltdb_private *ltdb = module->private_data;
933 dn = ldb_dn_linearize(ltdb, msg->dn);
938 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
947 delete an index entry for one message element
949 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
950 struct ldb_message_element *el, int v_idx)
952 struct ldb_context *ldb = module->ldb;
953 struct ldb_message *msg;
954 struct ldb_dn *dn_key;
962 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
967 msg = talloc(dn_key, struct ldb_message);
973 ret = ltdb_search_dn1(module, dn_key, msg);
980 /* it wasn't indexed. Did we have an earlier error? If we did then
986 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
988 ldb_debug(ldb, LDB_DEBUG_ERROR,
989 "ERROR: dn %s not found in %s\n", dn,
990 ldb_dn_linearize(dn_key, dn_key));
991 /* it ain't there. hmmm */
996 if (j != msg->elements[i].num_values - 1) {
997 memmove(&msg->elements[i].values[j],
998 &msg->elements[i].values[j+1],
999 (msg->elements[i].num_values-(j+1)) *
1000 sizeof(msg->elements[i].values[0]));
1002 msg->elements[i].num_values--;
1004 if (msg->elements[i].num_values == 0) {
1005 ret = ltdb_delete_noindex(module, dn_key);
1007 ret = ltdb_store(module, msg, TDB_REPLACE);
1010 talloc_free(dn_key);
1016 delete the index entries for a record
1017 return -1 on failure
1019 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1021 struct ltdb_private *ltdb = module->private_data;
1026 if (ldb_dn_is_special(msg->dn)) {
1030 dn = ldb_dn_linearize(ltdb, msg->dn);
1035 /* find the list of indexed fields */
1036 if (ltdb->cache->indexlist->num_elements == 0) {
1037 /* no indexed fields */
1041 for (i = 0; i < msg->num_elements; i++) {
1042 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1043 NULL, LTDB_IDXATTR);
1047 for (j = 0; j < msg->elements[i].num_values; j++) {
1048 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1062 traversal function that deletes all @INDEX records
1064 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1066 const char *dn = "DN=" LTDB_INDEX ":";
1067 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1068 return tdb_delete(tdb, key);
1074 traversal function that adds @INDEX records during a re index
1076 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1078 struct ldb_module *module = state;
1079 struct ldb_message *msg;
1084 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1085 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1089 msg = talloc(module, struct ldb_message);
1094 ret = ltdb_unpack_data(module, &data, msg);
1100 /* check if the DN key has changed, perhaps due to the
1101 case insensitivity of an element changing */
1102 key2 = ltdb_key(module, msg->dn);
1103 if (key2.dptr == NULL) {
1104 /* probably a corrupt record ... darn */
1105 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1106 ldb_dn_linearize(msg, msg->dn));
1110 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1111 tdb_delete(tdb, key);
1112 tdb_store(tdb, key2, data, 0);
1114 talloc_free(key2.dptr);
1116 if (msg->dn == NULL) {
1117 dn = (char *)key.dptr + 3;
1119 dn = ldb_dn_linearize(msg->dn, msg->dn);
1122 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1130 force a complete reindex of the database
1132 int ltdb_reindex(struct ldb_module *module)
1134 struct ltdb_private *ltdb = module->private_data;
1137 if (ltdb_cache_reload(module) != 0) {
1141 /* first traverse the database deleting any @INDEX records */
1142 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1147 /* now traverse adding any indexes for normal LDB records */
1148 ret = tdb_traverse(ltdb->tdb, re_index, module);