4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/includes.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 /* the following cast looks strange, but is
65 correct. The key to understanding it is that base_p
66 is a pointer to an array of pointers, so we have to
67 dereference it after casting to void **. The strange
68 const in the middle gives us the right type of pointer
69 after the dereference (tridge) */
70 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
72 /* scan back for first element */
74 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
90 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
103 return the dn key to be used for an index
106 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
107 const char *attr, const struct ldb_val *value)
112 const struct ldb_attrib_handler *h;
115 attr_folded = ldb_attr_casefold(ldb, attr);
120 h = ldb_attrib_handler(ldb, attr);
121 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
122 /* canonicalisation can be refused. For example,
123 a attribute that takes wildcards will refuse to canonicalise
124 if the value contains a wildcard */
125 talloc_free(attr_folded);
128 if (ldb_should_b64_encode(&v)) {
129 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
130 if (!vstr) return NULL;
131 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
133 if (v.data != value->data) {
136 talloc_free(attr_folded);
137 if (dn == NULL) return NULL;
141 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
142 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
144 if (v.data != value->data) {
147 talloc_free(attr_folded);
150 ret = ldb_dn_explode(ldb, dn);
156 see if a attribute value is in the list of indexed attributes
158 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
159 unsigned int *v_idx, const char *key)
162 for (i=0;i<msg->num_elements;i++) {
163 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
164 const struct ldb_message_element *el =
166 for (j=0;j<el->num_values;j++) {
167 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
179 /* used in sorting dn lists */
180 static int list_cmp(const char **s1, const char **s2)
182 return strcmp(*s1, *s2);
186 return a list of dn's that might match a simple indexed search or
188 static int ltdb_index_dn_simple(struct ldb_module *module,
189 const struct ldb_parse_tree *tree,
190 const struct ldb_message *index_list,
191 struct dn_list *list)
193 struct ldb_context *ldb = module->ldb;
197 struct ldb_message *msg;
202 /* if the attribute isn't in the list of indexed attributes then
203 this node needs a full search */
204 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
208 /* the attribute is indexed. Pull the list of DNs that match the
210 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
213 msg = talloc(list, struct ldb_message);
218 ret = ltdb_search_dn1(module, dn, msg);
220 if (ret == 0 || ret == -1) {
224 for (i=0;i<msg->num_elements;i++) {
225 struct ldb_message_element *el;
227 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
231 el = &msg->elements[i];
233 list->dn = talloc_array(list, char *, el->num_values);
238 for (j=0;j<el->num_values;j++) {
239 list->dn[list->count] =
240 talloc_strdup(list->dn, (char *)el->values[j].data);
241 if (!list->dn[list->count]) {
250 if (list->count > 1) {
251 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
258 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
261 return a list of dn's that might match a simple indexed search on
262 the special objectclass attribute
264 static int ltdb_index_dn_objectclass(struct ldb_module *module,
265 const struct ldb_parse_tree *tree,
266 const struct ldb_message *index_list,
267 struct dn_list *list)
269 struct ldb_context *ldb = module->ldb;
272 const char *target = (const char *)tree->u.equality.value.data;
273 const char **subclasses;
278 ret = ltdb_index_dn_simple(module, tree, index_list, list);
280 subclasses = ldb_subclass_list(module->ldb, target);
282 if (subclasses == NULL) {
286 for (i=0;subclasses[i];i++) {
287 struct ldb_parse_tree tree2;
288 struct dn_list *list2;
289 tree2.operation = LDB_OP_EQUALITY;
290 tree2.u.equality.attr = LTDB_OBJECTCLASS;
291 if (!tree2.u.equality.attr) {
294 tree2.u.equality.value.data =
295 (uint8_t *)talloc_strdup(list, subclasses[i]);
296 if (tree2.u.equality.value.data == NULL) {
299 tree2.u.equality.value.length = strlen(subclasses[i]);
300 list2 = talloc(list, struct dn_list);
302 talloc_free(tree2.u.equality.value.data);
305 if (ltdb_index_dn_objectclass(module, &tree2,
306 index_list, list2) == 1) {
307 if (list->count == 0) {
311 list_union(ldb, list, list2);
315 talloc_free(tree2.u.equality.value.data);
322 return a list of dn's that might match a leaf indexed search
324 static int ltdb_index_dn_leaf(struct ldb_module *module,
325 const struct ldb_parse_tree *tree,
326 const struct ldb_message *index_list,
327 struct dn_list *list)
329 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
330 return ltdb_index_dn_objectclass(module, tree, index_list, list);
332 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
333 list->dn = talloc_array(list, char *, 1);
334 if (list->dn == NULL) {
335 ldb_oom(module->ldb);
338 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
339 if (list->dn[0] == NULL) {
340 ldb_oom(module->ldb);
346 return ltdb_index_dn_simple(module, tree, index_list, list);
353 relies on the lists being sorted
355 static int list_intersect(struct ldb_context *ldb,
356 struct dn_list *list, const struct dn_list *list2)
358 struct dn_list *list3;
361 if (list->count == 0 || list2->count == 0) {
366 list3 = talloc(ldb, struct dn_list);
371 list3->dn = talloc_array(list3, char *, list->count);
378 for (i=0;i<list->count;i++) {
379 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
380 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
381 list3->dn[list3->count] = talloc_move(list3->dn, list->dn[i]);
384 talloc_free(list->dn[i]);
388 talloc_free(list->dn);
389 list->dn = talloc_move(list, list3->dn);
390 list->count = list3->count;
400 relies on the lists being sorted
402 static int list_union(struct ldb_context *ldb,
403 struct dn_list *list, const struct dn_list *list2)
407 unsigned int count = list->count;
409 if (list->count == 0 && list2->count == 0) {
414 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
420 for (i=0;i<list2->count;i++) {
421 if (ldb_list_find(list2->dn[i], list->dn, count,
422 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
423 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
424 if (!list->dn[list->count]) {
431 if (list->count != count) {
432 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
438 static int ltdb_index_dn(struct ldb_module *module,
439 const struct ldb_parse_tree *tree,
440 const struct ldb_message *index_list,
441 struct dn_list *list);
447 static int ltdb_index_dn_or(struct ldb_module *module,
448 const struct ldb_parse_tree *tree,
449 const struct ldb_message *index_list,
450 struct dn_list *list)
452 struct ldb_context *ldb = module->ldb;
460 for (i=0;i<tree->u.list.num_elements;i++) {
461 struct dn_list *list2;
464 list2 = talloc(module, struct dn_list);
469 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
482 talloc_free(list->dn);
489 list->dn = talloc_move(list, list2->dn);
490 list->count = list2->count;
492 if (list_union(ldb, list, list2) == -1) {
501 if (list->count == 0) {
512 static int ltdb_index_dn_not(struct ldb_module *module,
513 const struct ldb_parse_tree *tree,
514 const struct ldb_message *index_list,
515 struct dn_list *list)
517 /* the only way to do an indexed not would be if we could
518 negate the not via another not or if we knew the total
519 number of database elements so we could know that the
520 existing expression covered the whole database.
522 instead, we just give up, and rely on a full index scan
523 (unless an outer & manages to reduce the list)
529 AND two index results
531 static int ltdb_index_dn_and(struct ldb_module *module,
532 const struct ldb_parse_tree *tree,
533 const struct ldb_message *index_list,
534 struct dn_list *list)
536 struct ldb_context *ldb = module->ldb;
544 for (i=0;i<tree->u.list.num_elements;i++) {
545 struct dn_list *list2;
548 list2 = talloc(module, struct dn_list);
553 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
557 talloc_free(list->dn);
569 talloc_free(list->dn);
570 list->dn = talloc_move(list, list2->dn);
571 list->count = list2->count;
573 if (list_intersect(ldb, list, list2) == -1) {
581 if (list->count == 0) {
582 talloc_free(list->dn);
591 return a list of dn's that might match a indexed search or
592 -1 if an error. return 0 for no matches, or 1 for matches
594 static int ltdb_index_dn(struct ldb_module *module,
595 const struct ldb_parse_tree *tree,
596 const struct ldb_message *index_list,
597 struct dn_list *list)
601 switch (tree->operation) {
603 ret = ltdb_index_dn_and(module, tree, index_list, list);
607 ret = ltdb_index_dn_or(module, tree, index_list, list);
611 ret = ltdb_index_dn_not(module, tree, index_list, list);
614 case LDB_OP_EQUALITY:
615 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
618 case LDB_OP_SUBSTRING:
623 case LDB_OP_EXTENDED:
624 /* we can't index with fancy bitops yet */
633 filter a candidate dn_list from an indexed search into a set of results
634 extracting just the given attributes
636 static int ltdb_index_filter(const struct dn_list *dn_list,
637 struct ldb_handle *handle)
639 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
640 struct ldb_reply *ares = NULL;
643 for (i = 0; i < dn_list->count; i++) {
647 ares = talloc_zero(ac, struct ldb_reply);
649 handle->status = LDB_ERR_OPERATIONS_ERROR;
650 handle->state = LDB_ASYNC_DONE;
651 return LDB_ERR_OPERATIONS_ERROR;
654 ares->message = ldb_msg_new(ares);
655 if (!ares->message) {
656 handle->status = LDB_ERR_OPERATIONS_ERROR;
657 handle->state = LDB_ASYNC_DONE;
659 return LDB_ERR_OPERATIONS_ERROR;
663 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
666 return LDB_ERR_OPERATIONS_ERROR;
669 ret = ltdb_search_dn1(ac->module, dn, ares->message);
672 /* the record has disappeared? yes, this can happen */
678 /* an internal error */
680 return LDB_ERR_OPERATIONS_ERROR;
683 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
688 /* filter the attributes that the user wants */
689 ret = ltdb_filter_attrs(ares->message, ac->attrs);
692 handle->status = LDB_ERR_OPERATIONS_ERROR;
693 handle->state = LDB_ASYNC_DONE;
695 return LDB_ERR_OPERATIONS_ERROR;
698 ares->type = LDB_REPLY_ENTRY;
699 handle->state = LDB_ASYNC_PENDING;
700 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
702 if (handle->status != LDB_SUCCESS) {
703 handle->state = LDB_ASYNC_DONE;
704 return handle->status;
712 search the database with a LDAP-like expression using indexes
713 returns -1 if an indexed search is not possible, in which
714 case the caller should call ltdb_search_full()
716 int ltdb_search_indexed(struct ldb_handle *handle)
718 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
719 struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
720 struct dn_list *dn_list;
723 if (ltdb->cache->indexlist->num_elements == 0 &&
724 ac->scope != LDB_SCOPE_BASE) {
725 /* no index list? must do full search */
729 dn_list = talloc(handle, struct dn_list);
730 if (dn_list == NULL) {
734 if (ac->scope == LDB_SCOPE_BASE) {
735 /* with BASE searches only one DN can match */
736 dn_list->dn = talloc_array(dn_list, char *, 1);
737 if (dn_list->dn == NULL) {
738 ldb_oom(ac->module->ldb);
741 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
742 if (dn_list->dn[0] == NULL) {
743 ldb_oom(ac->module->ldb);
749 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
753 /* we've got a candidate list - now filter by the full tree
754 and extract the needed attributes */
755 ret = ltdb_index_filter(dn_list, handle);
756 handle->status = ret;
757 handle->state = LDB_ASYNC_DONE;
760 talloc_free(dn_list);
766 add a index element where this is the first indexed DN for this value
768 static int ltdb_index_add1_new(struct ldb_context *ldb,
769 struct ldb_message *msg,
770 struct ldb_message_element *el,
773 struct ldb_message_element *el2;
775 /* add another entry */
776 el2 = talloc_realloc(msg, msg->elements,
777 struct ldb_message_element, msg->num_elements+1);
783 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
784 if (!msg->elements[msg->num_elements].name) {
787 msg->elements[msg->num_elements].num_values = 0;
788 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
789 if (!msg->elements[msg->num_elements].values) {
792 msg->elements[msg->num_elements].values[0].length = strlen(dn);
793 msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
794 msg->elements[msg->num_elements].num_values = 1;
802 add a index element where this is not the first indexed DN for this
805 static int ltdb_index_add1_add(struct ldb_context *ldb,
806 struct ldb_message *msg,
807 struct ldb_message_element *el,
814 /* for multi-valued attributes we can end up with repeats */
815 for (i=0;i<msg->elements[idx].num_values;i++) {
816 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
821 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
823 msg->elements[idx].num_values+1);
827 msg->elements[idx].values = v2;
829 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
830 msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
831 msg->elements[idx].num_values++;
837 add an index entry for one message element
839 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
840 struct ldb_message_element *el, int v_idx)
842 struct ldb_context *ldb = module->ldb;
843 struct ldb_message *msg;
844 struct ldb_dn *dn_key;
848 msg = talloc(module, struct ldb_message);
854 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
860 talloc_steal(msg, dn_key);
862 ret = ltdb_search_dn1(module, dn_key, msg);
870 msg->num_elements = 0;
871 msg->elements = NULL;
874 for (i=0;i<msg->num_elements;i++) {
875 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
880 if (i == msg->num_elements) {
881 ret = ltdb_index_add1_new(ldb, msg, el, dn);
883 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
887 ret = ltdb_store(module, msg, TDB_REPLACE);
895 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
896 struct ldb_message_element *elements, int num_el)
898 struct ltdb_private *ltdb = module->private_data;
906 if (ltdb->cache->indexlist->num_elements == 0) {
907 /* no indexed fields */
911 for (i = 0; i < num_el; i++) {
912 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
917 for (j = 0; j < elements[i].num_values; j++) {
918 ret = ltdb_index_add1(module, dn, &elements[i], j);
929 add the index entries for a new record
932 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
934 struct ltdb_private *ltdb = module->private_data;
938 dn = ldb_dn_linearize(ltdb, msg->dn);
943 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
952 delete an index entry for one message element
954 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
955 struct ldb_message_element *el, int v_idx)
957 struct ldb_context *ldb = module->ldb;
958 struct ldb_message *msg;
959 struct ldb_dn *dn_key;
967 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
972 msg = talloc(dn_key, struct ldb_message);
978 ret = ltdb_search_dn1(module, dn_key, msg);
985 /* it wasn't indexed. Did we have an earlier error? If we did then
991 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
993 ldb_debug(ldb, LDB_DEBUG_ERROR,
994 "ERROR: dn %s not found in %s\n", dn,
995 ldb_dn_linearize(dn_key, dn_key));
996 /* it ain't there. hmmm */
1001 if (j != msg->elements[i].num_values - 1) {
1002 memmove(&msg->elements[i].values[j],
1003 &msg->elements[i].values[j+1],
1004 (msg->elements[i].num_values-(j+1)) *
1005 sizeof(msg->elements[i].values[0]));
1007 msg->elements[i].num_values--;
1009 if (msg->elements[i].num_values == 0) {
1010 ret = ltdb_delete_noindex(module, dn_key);
1012 ret = ltdb_store(module, msg, TDB_REPLACE);
1015 talloc_free(dn_key);
1021 delete the index entries for a record
1022 return -1 on failure
1024 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1026 struct ltdb_private *ltdb = module->private_data;
1031 if (ldb_dn_is_special(msg->dn)) {
1035 dn = ldb_dn_linearize(ltdb, msg->dn);
1040 /* find the list of indexed fields */
1041 if (ltdb->cache->indexlist->num_elements == 0) {
1042 /* no indexed fields */
1046 for (i = 0; i < msg->num_elements; i++) {
1047 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1048 NULL, LTDB_IDXATTR);
1052 for (j = 0; j < msg->elements[i].num_values; j++) {
1053 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1067 traversal function that deletes all @INDEX records
1069 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1071 const char *dn = "DN=" LTDB_INDEX ":";
1072 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1073 return tdb_delete(tdb, key);
1079 traversal function that adds @INDEX records during a re index
1081 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1083 struct ldb_module *module = state;
1084 struct ldb_message *msg;
1089 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1090 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1094 msg = talloc(module, struct ldb_message);
1099 ret = ltdb_unpack_data(module, &data, msg);
1105 /* check if the DN key has changed, perhaps due to the
1106 case insensitivity of an element changing */
1107 key2 = ltdb_key(module, msg->dn);
1108 if (key2.dptr == NULL) {
1109 /* probably a corrupt record ... darn */
1110 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1111 ldb_dn_linearize(msg, msg->dn));
1115 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1116 tdb_delete(tdb, key);
1117 tdb_store(tdb, key2, data, 0);
1119 talloc_free(key2.dptr);
1121 if (msg->dn == NULL) {
1122 dn = (char *)key.dptr + 3;
1124 dn = ldb_dn_linearize(msg->dn, msg->dn);
1127 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1135 force a complete reindex of the database
1137 int ltdb_reindex(struct ldb_module *module)
1139 struct ltdb_private *ltdb = module->private_data;
1142 if (ltdb_cache_reload(module) != 0) {
1146 /* first traverse the database deleting any @INDEX records */
1147 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1152 /* now traverse adding any indexes for normal LDB records */
1153 ret = tdb_traverse(ltdb->tdb, re_index, module);