4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/includes.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 /* the following cast looks strange, but is
65 correct. The key to understanding it is that base_p
66 is a pointer to an array of pointers, so we have to
67 dereference it after casting to void **. The strange
68 const in the middle gives us the right type of pointer
69 after the dereference (tridge) */
70 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
72 /* scan back for first element */
74 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
90 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
103 return the dn key to be used for an index
106 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
107 const char *attr, const struct ldb_val *value)
111 const struct ldb_attrib_handler *h;
115 attr_folded = ldb_attr_casefold(ldb, attr);
120 h = ldb_attrib_handler(ldb, attr);
121 r = h->canonicalise_fn(ldb, ldb, value, &v);
122 if (r != LDB_SUCCESS) {
123 const char *errstr = ldb_errstring(ldb);
124 /* canonicalisation can be refused. For example,
125 a attribute that takes wildcards will refuse to canonicalise
126 if the value contains a wildcard */
127 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
128 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
129 talloc_free(attr_folded);
132 if (ldb_should_b64_encode(&v)) {
133 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
134 if (!vstr) return NULL;
135 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
138 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
141 if (v.data != value->data) {
144 talloc_free(attr_folded);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 const struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
233 for (j=0;j<el->num_values;j++) {
234 list->dn[list->count] =
235 talloc_strdup(list->dn, (char *)el->values[j].data);
236 if (!list->dn[list->count]) {
246 if (list->count > 1) {
247 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
254 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
257 return a list of dn's that might match a simple indexed search on
258 the special objectclass attribute
260 static int ltdb_index_dn_objectclass(struct ldb_module *module,
261 const struct ldb_parse_tree *tree,
262 const struct ldb_message *index_list,
263 struct dn_list *list)
265 struct ldb_context *ldb = module->ldb;
268 const char *target = (const char *)tree->u.equality.value.data;
269 const char **subclasses;
274 ret = ltdb_index_dn_simple(module, tree, index_list, list);
276 subclasses = ldb_subclass_list(module->ldb, target);
278 if (subclasses == NULL) {
282 for (i=0;subclasses[i];i++) {
283 struct ldb_parse_tree tree2;
284 struct dn_list *list2;
285 tree2.operation = LDB_OP_EQUALITY;
286 tree2.u.equality.attr = LTDB_OBJECTCLASS;
287 if (!tree2.u.equality.attr) {
290 tree2.u.equality.value.data =
291 (uint8_t *)talloc_strdup(list, subclasses[i]);
292 if (tree2.u.equality.value.data == NULL) {
295 tree2.u.equality.value.length = strlen(subclasses[i]);
296 list2 = talloc(list, struct dn_list);
298 talloc_free(tree2.u.equality.value.data);
301 if (ltdb_index_dn_objectclass(module, &tree2,
302 index_list, list2) == 1) {
303 if (list->count == 0) {
307 list_union(ldb, list, list2);
311 talloc_free(tree2.u.equality.value.data);
318 return a list of dn's that might match a leaf indexed search
320 static int ltdb_index_dn_leaf(struct ldb_module *module,
321 const struct ldb_parse_tree *tree,
322 const struct ldb_message *index_list,
323 struct dn_list *list)
325 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
326 return ltdb_index_dn_objectclass(module, tree, index_list, list);
328 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
329 list->dn = talloc_array(list, char *, 1);
330 if (list->dn == NULL) {
331 ldb_oom(module->ldb);
334 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
335 if (list->dn[0] == NULL) {
336 ldb_oom(module->ldb);
342 return ltdb_index_dn_simple(module, tree, index_list, list);
349 relies on the lists being sorted
351 static int list_intersect(struct ldb_context *ldb,
352 struct dn_list *list, const struct dn_list *list2)
354 struct dn_list *list3;
357 if (list->count == 0 || list2->count == 0) {
362 list3 = talloc(ldb, struct dn_list);
367 list3->dn = talloc_array(list3, char *, list->count);
374 for (i=0;i<list->count;i++) {
375 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
376 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
377 list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
380 talloc_free(list->dn[i]);
384 talloc_free(list->dn);
385 list->dn = talloc_move(list, &list3->dn);
386 list->count = list3->count;
396 relies on the lists being sorted
398 static int list_union(struct ldb_context *ldb,
399 struct dn_list *list, const struct dn_list *list2)
403 unsigned int count = list->count;
405 if (list->count == 0 && list2->count == 0) {
410 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
416 for (i=0;i<list2->count;i++) {
417 if (ldb_list_find(list2->dn[i], list->dn, count,
418 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
419 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
420 if (!list->dn[list->count]) {
427 if (list->count != count) {
428 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
434 static int ltdb_index_dn(struct ldb_module *module,
435 const struct ldb_parse_tree *tree,
436 const struct ldb_message *index_list,
437 struct dn_list *list);
443 static int ltdb_index_dn_or(struct ldb_module *module,
444 const struct ldb_parse_tree *tree,
445 const struct ldb_message *index_list,
446 struct dn_list *list)
448 struct ldb_context *ldb = module->ldb;
456 for (i=0;i<tree->u.list.num_elements;i++) {
457 struct dn_list *list2;
460 list2 = talloc(module, struct dn_list);
465 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
478 talloc_free(list->dn);
485 list->dn = talloc_move(list, &list2->dn);
486 list->count = list2->count;
488 if (list_union(ldb, list, list2) == -1) {
497 if (list->count == 0) {
508 static int ltdb_index_dn_not(struct ldb_module *module,
509 const struct ldb_parse_tree *tree,
510 const struct ldb_message *index_list,
511 struct dn_list *list)
513 /* the only way to do an indexed not would be if we could
514 negate the not via another not or if we knew the total
515 number of database elements so we could know that the
516 existing expression covered the whole database.
518 instead, we just give up, and rely on a full index scan
519 (unless an outer & manages to reduce the list)
525 AND two index results
527 static int ltdb_index_dn_and(struct ldb_module *module,
528 const struct ldb_parse_tree *tree,
529 const struct ldb_message *index_list,
530 struct dn_list *list)
532 struct ldb_context *ldb = module->ldb;
540 for (i=0;i<tree->u.list.num_elements;i++) {
541 struct dn_list *list2;
544 list2 = talloc(module, struct dn_list);
549 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
553 talloc_free(list->dn);
565 talloc_free(list->dn);
566 list->dn = talloc_move(list, &list2->dn);
567 list->count = list2->count;
569 if (list_intersect(ldb, list, list2) == -1) {
577 if (list->count == 0) {
578 talloc_free(list->dn);
587 return a list of dn's that might match a indexed search or
588 -1 if an error. return 0 for no matches, or 1 for matches
590 static int ltdb_index_dn(struct ldb_module *module,
591 const struct ldb_parse_tree *tree,
592 const struct ldb_message *index_list,
593 struct dn_list *list)
597 switch (tree->operation) {
599 ret = ltdb_index_dn_and(module, tree, index_list, list);
603 ret = ltdb_index_dn_or(module, tree, index_list, list);
607 ret = ltdb_index_dn_not(module, tree, index_list, list);
610 case LDB_OP_EQUALITY:
611 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
614 case LDB_OP_SUBSTRING:
619 case LDB_OP_EXTENDED:
620 /* we can't index with fancy bitops yet */
629 filter a candidate dn_list from an indexed search into a set of results
630 extracting just the given attributes
632 static int ltdb_index_filter(const struct dn_list *dn_list,
633 struct ldb_handle *handle)
635 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
636 struct ldb_reply *ares = NULL;
639 for (i = 0; i < dn_list->count; i++) {
643 ares = talloc_zero(ac, struct ldb_reply);
645 handle->status = LDB_ERR_OPERATIONS_ERROR;
646 handle->state = LDB_ASYNC_DONE;
647 return LDB_ERR_OPERATIONS_ERROR;
650 ares->message = ldb_msg_new(ares);
651 if (!ares->message) {
652 handle->status = LDB_ERR_OPERATIONS_ERROR;
653 handle->state = LDB_ASYNC_DONE;
655 return LDB_ERR_OPERATIONS_ERROR;
659 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
662 return LDB_ERR_OPERATIONS_ERROR;
665 ret = ltdb_search_dn1(ac->module, dn, ares->message);
668 /* the record has disappeared? yes, this can happen */
674 /* an internal error */
676 return LDB_ERR_OPERATIONS_ERROR;
679 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
684 /* filter the attributes that the user wants */
685 ret = ltdb_filter_attrs(ares->message, ac->attrs);
688 handle->status = LDB_ERR_OPERATIONS_ERROR;
689 handle->state = LDB_ASYNC_DONE;
691 return LDB_ERR_OPERATIONS_ERROR;
694 ares->type = LDB_REPLY_ENTRY;
695 handle->state = LDB_ASYNC_PENDING;
696 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
698 if (handle->status != LDB_SUCCESS) {
699 handle->state = LDB_ASYNC_DONE;
700 return handle->status;
708 search the database with a LDAP-like expression using indexes
709 returns -1 if an indexed search is not possible, in which
710 case the caller should call ltdb_search_full()
712 int ltdb_search_indexed(struct ldb_handle *handle)
714 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
715 struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
716 struct dn_list *dn_list;
719 if (ltdb->cache->indexlist->num_elements == 0 &&
720 ac->scope != LDB_SCOPE_BASE) {
721 /* no index list? must do full search */
725 dn_list = talloc(handle, struct dn_list);
726 if (dn_list == NULL) {
730 if (ac->scope == LDB_SCOPE_BASE) {
731 /* with BASE searches only one DN can match */
732 dn_list->dn = talloc_array(dn_list, char *, 1);
733 if (dn_list->dn == NULL) {
734 ldb_oom(ac->module->ldb);
737 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
738 if (dn_list->dn[0] == NULL) {
739 ldb_oom(ac->module->ldb);
745 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
749 /* we've got a candidate list - now filter by the full tree
750 and extract the needed attributes */
751 ret = ltdb_index_filter(dn_list, handle);
752 handle->status = ret;
753 handle->state = LDB_ASYNC_DONE;
756 talloc_free(dn_list);
762 add a index element where this is the first indexed DN for this value
764 static int ltdb_index_add1_new(struct ldb_context *ldb,
765 struct ldb_message *msg,
768 struct ldb_message_element *el;
770 /* add another entry */
771 el = talloc_realloc(msg, msg->elements,
772 struct ldb_message_element, msg->num_elements+1);
778 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
779 if (!msg->elements[msg->num_elements].name) {
782 msg->elements[msg->num_elements].num_values = 0;
783 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
784 if (!msg->elements[msg->num_elements].values) {
787 msg->elements[msg->num_elements].values[0].length = strlen(dn);
788 msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
789 msg->elements[msg->num_elements].num_values = 1;
797 add a index element where this is not the first indexed DN for this
800 static int ltdb_index_add1_add(struct ldb_context *ldb,
801 struct ldb_message *msg,
808 /* for multi-valued attributes we can end up with repeats */
809 for (i=0;i<msg->elements[idx].num_values;i++) {
810 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
815 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
817 msg->elements[idx].num_values+1);
821 msg->elements[idx].values = v2;
823 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
824 msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
825 msg->elements[idx].num_values++;
831 add an index entry for one message element
833 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
834 struct ldb_message_element *el, int v_idx)
836 struct ldb_context *ldb = module->ldb;
837 struct ldb_message *msg;
838 struct ldb_dn *dn_key;
842 msg = talloc(module, struct ldb_message);
848 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
853 talloc_steal(msg, dn_key);
855 ret = ltdb_search_dn1(module, dn_key, msg);
863 msg->num_elements = 0;
864 msg->elements = NULL;
867 for (i=0;i<msg->num_elements;i++) {
868 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
873 if (i == msg->num_elements) {
874 ret = ltdb_index_add1_new(ldb, msg, dn);
876 ret = ltdb_index_add1_add(ldb, msg, i, dn);
880 ret = ltdb_store(module, msg, TDB_REPLACE);
888 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
889 struct ldb_message_element *elements, int num_el)
891 struct ltdb_private *ltdb = module->private_data;
899 if (ltdb->cache->indexlist->num_elements == 0) {
900 /* no indexed fields */
904 for (i = 0; i < num_el; i++) {
905 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
910 for (j = 0; j < elements[i].num_values; j++) {
911 ret = ltdb_index_add1(module, dn, &elements[i], j);
922 add the index entries for a new record
925 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
930 dn = ldb_dn_get_linearized(msg->dn);
935 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
942 delete an index entry for one message element
944 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
945 struct ldb_message_element *el, int v_idx)
947 struct ldb_context *ldb = module->ldb;
948 struct ldb_message *msg;
949 struct ldb_dn *dn_key;
957 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
962 msg = talloc(dn_key, struct ldb_message);
968 ret = ltdb_search_dn1(module, dn_key, msg);
975 /* it wasn't indexed. Did we have an earlier error? If we did then
981 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
983 ldb_debug(ldb, LDB_DEBUG_ERROR,
984 "ERROR: dn %s not found in %s\n", dn,
985 ldb_dn_get_linearized(dn_key));
986 /* it ain't there. hmmm */
991 if (j != msg->elements[i].num_values - 1) {
992 memmove(&msg->elements[i].values[j],
993 &msg->elements[i].values[j+1],
994 (msg->elements[i].num_values-(j+1)) *
995 sizeof(msg->elements[i].values[0]));
997 msg->elements[i].num_values--;
999 if (msg->elements[i].num_values == 0) {
1000 ret = ltdb_delete_noindex(module, dn_key);
1002 ret = ltdb_store(module, msg, TDB_REPLACE);
1005 talloc_free(dn_key);
1011 delete the index entries for a record
1012 return -1 on failure
1014 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1016 struct ltdb_private *ltdb = module->private_data;
1021 /* find the list of indexed fields */
1022 if (ltdb->cache->indexlist->num_elements == 0) {
1023 /* no indexed fields */
1027 if (ldb_dn_is_special(msg->dn)) {
1031 dn = ldb_dn_get_linearized(msg->dn);
1036 for (i = 0; i < msg->num_elements; i++) {
1037 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1038 NULL, LTDB_IDXATTR);
1042 for (j = 0; j < msg->elements[i].num_values; j++) {
1043 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1055 traversal function that deletes all @INDEX records
1057 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1059 const char *dn = "DN=" LTDB_INDEX ":";
1060 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1061 return tdb_delete(tdb, key);
1067 traversal function that adds @INDEX records during a re index
1069 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1071 struct ldb_module *module = state;
1072 struct ldb_message *msg;
1073 const char *dn = NULL;
1077 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1078 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1082 msg = talloc(module, struct ldb_message);
1087 ret = ltdb_unpack_data(module, &data, msg);
1093 /* check if the DN key has changed, perhaps due to the
1094 case insensitivity of an element changing */
1095 key2 = ltdb_key(module, msg->dn);
1096 if (key2.dptr == NULL) {
1097 /* probably a corrupt record ... darn */
1098 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1099 ldb_dn_get_linearized(msg->dn));
1103 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1104 tdb_delete(tdb, key);
1105 tdb_store(tdb, key2, data, 0);
1107 talloc_free(key2.dptr);
1109 if (msg->dn == NULL) {
1110 dn = (char *)key.dptr + 3;
1112 dn = ldb_dn_get_linearized(msg->dn);
1115 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1123 force a complete reindex of the database
1125 int ltdb_reindex(struct ldb_module *module)
1127 struct ltdb_private *ltdb = module->private_data;
1130 if (ltdb_cache_reload(module) != 0) {
1134 /* first traverse the database deleting any @INDEX records */
1135 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1140 /* now traverse adding any indexes for normal LDB records */
1141 ret = tdb_traverse(ltdb->tdb, re_index, module);