4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/includes.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 /* the following cast looks strange, but is
65 correct. The key to understanding it is that base_p
66 is a pointer to an array of pointers, so we have to
67 dereference it after casting to void **. The strange
68 const in the middle gives us the right type of pointer
69 after the dereference (tridge) */
70 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
72 /* scan back for first element */
74 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
90 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
103 return the dn key to be used for an index
106 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
107 const char *attr, const struct ldb_val *value)
112 const struct ldb_attrib_handler *h;
115 attr_folded = ldb_attr_casefold(ldb, attr);
120 h = ldb_attrib_handler(ldb, attr);
121 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
122 /* canonicalisation can be refused. For example,
123 a attribute that takes wildcards will refuse to canonicalise
124 if the value contains a wildcard */
125 talloc_free(attr_folded);
128 if (ldb_should_b64_encode(&v)) {
129 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
130 if (!vstr) return NULL;
131 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
133 if (v.data != value->data) {
136 talloc_free(attr_folded);
137 if (dn == NULL) return NULL;
141 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
142 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
144 if (v.data != value->data) {
147 talloc_free(attr_folded);
150 ret = ldb_dn_explode(ldb, dn);
156 see if a attribute value is in the list of indexed attributes
158 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
159 unsigned int *v_idx, const char *key)
162 for (i=0;i<msg->num_elements;i++) {
163 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
164 const struct ldb_message_element *el =
166 for (j=0;j<el->num_values;j++) {
167 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
179 /* used in sorting dn lists */
180 static int list_cmp(const char **s1, const char **s2)
182 return strcmp(*s1, *s2);
186 return a list of dn's that might match a simple indexed search or
188 static int ltdb_index_dn_simple(struct ldb_module *module,
189 const struct ldb_parse_tree *tree,
190 const struct ldb_message *index_list,
191 struct dn_list *list)
193 struct ldb_context *ldb = module->ldb;
197 struct ldb_message *msg;
202 /* if the attribute isn't in the list of indexed attributes then
203 this node needs a full search */
204 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
208 /* the attribute is indexed. Pull the list of DNs that match the
210 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
213 msg = talloc(list, struct ldb_message);
218 ret = ltdb_search_dn1(module, dn, msg);
220 if (ret == 0 || ret == -1) {
224 for (i=0;i<msg->num_elements;i++) {
225 struct ldb_message_element *el;
227 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
231 el = &msg->elements[i];
233 list->dn = talloc_array(list, char *, el->num_values);
239 for (j=0;j<el->num_values;j++) {
240 list->dn[list->count] =
241 talloc_strdup(list->dn, (char *)el->values[j].data);
242 if (!list->dn[list->count]) {
252 if (list->count > 1) {
253 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
260 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
263 return a list of dn's that might match a simple indexed search on
264 the special objectclass attribute
266 static int ltdb_index_dn_objectclass(struct ldb_module *module,
267 const struct ldb_parse_tree *tree,
268 const struct ldb_message *index_list,
269 struct dn_list *list)
271 struct ldb_context *ldb = module->ldb;
274 const char *target = (const char *)tree->u.equality.value.data;
275 const char **subclasses;
280 ret = ltdb_index_dn_simple(module, tree, index_list, list);
282 subclasses = ldb_subclass_list(module->ldb, target);
284 if (subclasses == NULL) {
288 for (i=0;subclasses[i];i++) {
289 struct ldb_parse_tree tree2;
290 struct dn_list *list2;
291 tree2.operation = LDB_OP_EQUALITY;
292 tree2.u.equality.attr = LTDB_OBJECTCLASS;
293 if (!tree2.u.equality.attr) {
296 tree2.u.equality.value.data =
297 (uint8_t *)talloc_strdup(list, subclasses[i]);
298 if (tree2.u.equality.value.data == NULL) {
301 tree2.u.equality.value.length = strlen(subclasses[i]);
302 list2 = talloc(list, struct dn_list);
304 talloc_free(tree2.u.equality.value.data);
307 if (ltdb_index_dn_objectclass(module, &tree2,
308 index_list, list2) == 1) {
309 if (list->count == 0) {
313 list_union(ldb, list, list2);
317 talloc_free(tree2.u.equality.value.data);
324 return a list of dn's that might match a leaf indexed search
326 static int ltdb_index_dn_leaf(struct ldb_module *module,
327 const struct ldb_parse_tree *tree,
328 const struct ldb_message *index_list,
329 struct dn_list *list)
331 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
332 return ltdb_index_dn_objectclass(module, tree, index_list, list);
334 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
335 list->dn = talloc_array(list, char *, 1);
336 if (list->dn == NULL) {
337 ldb_oom(module->ldb);
340 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
341 if (list->dn[0] == NULL) {
342 ldb_oom(module->ldb);
348 return ltdb_index_dn_simple(module, tree, index_list, list);
355 relies on the lists being sorted
357 static int list_intersect(struct ldb_context *ldb,
358 struct dn_list *list, const struct dn_list *list2)
360 struct dn_list *list3;
363 if (list->count == 0 || list2->count == 0) {
368 list3 = talloc(ldb, struct dn_list);
373 list3->dn = talloc_array(list3, char *, list->count);
380 for (i=0;i<list->count;i++) {
381 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
382 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
383 list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
386 talloc_free(list->dn[i]);
390 talloc_free(list->dn);
391 list->dn = talloc_move(list, &list3->dn);
392 list->count = list3->count;
402 relies on the lists being sorted
404 static int list_union(struct ldb_context *ldb,
405 struct dn_list *list, const struct dn_list *list2)
409 unsigned int count = list->count;
411 if (list->count == 0 && list2->count == 0) {
416 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
422 for (i=0;i<list2->count;i++) {
423 if (ldb_list_find(list2->dn[i], list->dn, count,
424 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
425 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
426 if (!list->dn[list->count]) {
433 if (list->count != count) {
434 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
440 static int ltdb_index_dn(struct ldb_module *module,
441 const struct ldb_parse_tree *tree,
442 const struct ldb_message *index_list,
443 struct dn_list *list);
449 static int ltdb_index_dn_or(struct ldb_module *module,
450 const struct ldb_parse_tree *tree,
451 const struct ldb_message *index_list,
452 struct dn_list *list)
454 struct ldb_context *ldb = module->ldb;
462 for (i=0;i<tree->u.list.num_elements;i++) {
463 struct dn_list *list2;
466 list2 = talloc(module, struct dn_list);
471 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
484 talloc_free(list->dn);
491 list->dn = talloc_move(list, &list2->dn);
492 list->count = list2->count;
494 if (list_union(ldb, list, list2) == -1) {
503 if (list->count == 0) {
514 static int ltdb_index_dn_not(struct ldb_module *module,
515 const struct ldb_parse_tree *tree,
516 const struct ldb_message *index_list,
517 struct dn_list *list)
519 /* the only way to do an indexed not would be if we could
520 negate the not via another not or if we knew the total
521 number of database elements so we could know that the
522 existing expression covered the whole database.
524 instead, we just give up, and rely on a full index scan
525 (unless an outer & manages to reduce the list)
531 AND two index results
533 static int ltdb_index_dn_and(struct ldb_module *module,
534 const struct ldb_parse_tree *tree,
535 const struct ldb_message *index_list,
536 struct dn_list *list)
538 struct ldb_context *ldb = module->ldb;
546 for (i=0;i<tree->u.list.num_elements;i++) {
547 struct dn_list *list2;
550 list2 = talloc(module, struct dn_list);
555 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
559 talloc_free(list->dn);
571 talloc_free(list->dn);
572 list->dn = talloc_move(list, &list2->dn);
573 list->count = list2->count;
575 if (list_intersect(ldb, list, list2) == -1) {
583 if (list->count == 0) {
584 talloc_free(list->dn);
593 return a list of dn's that might match a indexed search or
594 -1 if an error. return 0 for no matches, or 1 for matches
596 static int ltdb_index_dn(struct ldb_module *module,
597 const struct ldb_parse_tree *tree,
598 const struct ldb_message *index_list,
599 struct dn_list *list)
603 switch (tree->operation) {
605 ret = ltdb_index_dn_and(module, tree, index_list, list);
609 ret = ltdb_index_dn_or(module, tree, index_list, list);
613 ret = ltdb_index_dn_not(module, tree, index_list, list);
616 case LDB_OP_EQUALITY:
617 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
620 case LDB_OP_SUBSTRING:
625 case LDB_OP_EXTENDED:
626 /* we can't index with fancy bitops yet */
635 filter a candidate dn_list from an indexed search into a set of results
636 extracting just the given attributes
638 static int ltdb_index_filter(const struct dn_list *dn_list,
639 struct ldb_handle *handle)
641 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
642 struct ldb_reply *ares = NULL;
645 for (i = 0; i < dn_list->count; i++) {
649 ares = talloc_zero(ac, struct ldb_reply);
651 handle->status = LDB_ERR_OPERATIONS_ERROR;
652 handle->state = LDB_ASYNC_DONE;
653 return LDB_ERR_OPERATIONS_ERROR;
656 ares->message = ldb_msg_new(ares);
657 if (!ares->message) {
658 handle->status = LDB_ERR_OPERATIONS_ERROR;
659 handle->state = LDB_ASYNC_DONE;
661 return LDB_ERR_OPERATIONS_ERROR;
665 dn = ldb_dn_explode(ares->message, dn_list->dn[i]);
668 return LDB_ERR_OPERATIONS_ERROR;
671 ret = ltdb_search_dn1(ac->module, dn, ares->message);
674 /* the record has disappeared? yes, this can happen */
680 /* an internal error */
682 return LDB_ERR_OPERATIONS_ERROR;
685 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
690 /* filter the attributes that the user wants */
691 ret = ltdb_filter_attrs(ares->message, ac->attrs);
694 handle->status = LDB_ERR_OPERATIONS_ERROR;
695 handle->state = LDB_ASYNC_DONE;
697 return LDB_ERR_OPERATIONS_ERROR;
700 ares->type = LDB_REPLY_ENTRY;
701 handle->state = LDB_ASYNC_PENDING;
702 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
704 if (handle->status != LDB_SUCCESS) {
705 handle->state = LDB_ASYNC_DONE;
706 return handle->status;
714 search the database with a LDAP-like expression using indexes
715 returns -1 if an indexed search is not possible, in which
716 case the caller should call ltdb_search_full()
718 int ltdb_search_indexed(struct ldb_handle *handle)
720 struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
721 struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
722 struct dn_list *dn_list;
725 if (ltdb->cache->indexlist->num_elements == 0 &&
726 ac->scope != LDB_SCOPE_BASE) {
727 /* no index list? must do full search */
731 dn_list = talloc(handle, struct dn_list);
732 if (dn_list == NULL) {
736 if (ac->scope == LDB_SCOPE_BASE) {
737 /* with BASE searches only one DN can match */
738 dn_list->dn = talloc_array(dn_list, char *, 1);
739 if (dn_list->dn == NULL) {
740 ldb_oom(ac->module->ldb);
743 dn_list->dn[0] = ldb_dn_linearize(dn_list, ac->base);
744 if (dn_list->dn[0] == NULL) {
745 ldb_oom(ac->module->ldb);
751 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
755 /* we've got a candidate list - now filter by the full tree
756 and extract the needed attributes */
757 ret = ltdb_index_filter(dn_list, handle);
758 handle->status = ret;
759 handle->state = LDB_ASYNC_DONE;
762 talloc_free(dn_list);
768 add a index element where this is the first indexed DN for this value
770 static int ltdb_index_add1_new(struct ldb_context *ldb,
771 struct ldb_message *msg,
772 struct ldb_message_element *el,
775 struct ldb_message_element *el2;
777 /* add another entry */
778 el2 = talloc_realloc(msg, msg->elements,
779 struct ldb_message_element, msg->num_elements+1);
785 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
786 if (!msg->elements[msg->num_elements].name) {
789 msg->elements[msg->num_elements].num_values = 0;
790 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
791 if (!msg->elements[msg->num_elements].values) {
794 msg->elements[msg->num_elements].values[0].length = strlen(dn);
795 msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
796 msg->elements[msg->num_elements].num_values = 1;
804 add a index element where this is not the first indexed DN for this
807 static int ltdb_index_add1_add(struct ldb_context *ldb,
808 struct ldb_message *msg,
809 struct ldb_message_element *el,
816 /* for multi-valued attributes we can end up with repeats */
817 for (i=0;i<msg->elements[idx].num_values;i++) {
818 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
823 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
825 msg->elements[idx].num_values+1);
829 msg->elements[idx].values = v2;
831 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
832 msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
833 msg->elements[idx].num_values++;
839 add an index entry for one message element
841 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
842 struct ldb_message_element *el, int v_idx)
844 struct ldb_context *ldb = module->ldb;
845 struct ldb_message *msg;
846 struct ldb_dn *dn_key;
850 msg = talloc(module, struct ldb_message);
856 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
862 talloc_steal(msg, dn_key);
864 ret = ltdb_search_dn1(module, dn_key, msg);
872 msg->num_elements = 0;
873 msg->elements = NULL;
876 for (i=0;i<msg->num_elements;i++) {
877 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
882 if (i == msg->num_elements) {
883 ret = ltdb_index_add1_new(ldb, msg, el, dn);
885 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
889 ret = ltdb_store(module, msg, TDB_REPLACE);
897 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
898 struct ldb_message_element *elements, int num_el)
900 struct ltdb_private *ltdb = module->private_data;
908 if (ltdb->cache->indexlist->num_elements == 0) {
909 /* no indexed fields */
913 for (i = 0; i < num_el; i++) {
914 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
919 for (j = 0; j < elements[i].num_values; j++) {
920 ret = ltdb_index_add1(module, dn, &elements[i], j);
931 add the index entries for a new record
934 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
936 struct ltdb_private *ltdb = module->private_data;
940 dn = ldb_dn_linearize(ltdb, msg->dn);
945 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
954 delete an index entry for one message element
956 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
957 struct ldb_message_element *el, int v_idx)
959 struct ldb_context *ldb = module->ldb;
960 struct ldb_message *msg;
961 struct ldb_dn *dn_key;
969 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
974 msg = talloc(dn_key, struct ldb_message);
980 ret = ltdb_search_dn1(module, dn_key, msg);
987 /* it wasn't indexed. Did we have an earlier error? If we did then
993 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
995 ldb_debug(ldb, LDB_DEBUG_ERROR,
996 "ERROR: dn %s not found in %s\n", dn,
997 ldb_dn_linearize(dn_key, dn_key));
998 /* it ain't there. hmmm */
1003 if (j != msg->elements[i].num_values - 1) {
1004 memmove(&msg->elements[i].values[j],
1005 &msg->elements[i].values[j+1],
1006 (msg->elements[i].num_values-(j+1)) *
1007 sizeof(msg->elements[i].values[0]));
1009 msg->elements[i].num_values--;
1011 if (msg->elements[i].num_values == 0) {
1012 ret = ltdb_delete_noindex(module, dn_key);
1014 ret = ltdb_store(module, msg, TDB_REPLACE);
1017 talloc_free(dn_key);
1023 delete the index entries for a record
1024 return -1 on failure
1026 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1028 struct ltdb_private *ltdb = module->private_data;
1033 if (ldb_dn_is_special(msg->dn)) {
1037 dn = ldb_dn_linearize(ltdb, msg->dn);
1042 /* find the list of indexed fields */
1043 if (ltdb->cache->indexlist->num_elements == 0) {
1044 /* no indexed fields */
1048 for (i = 0; i < msg->num_elements; i++) {
1049 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1050 NULL, LTDB_IDXATTR);
1054 for (j = 0; j < msg->elements[i].num_values; j++) {
1055 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1069 traversal function that deletes all @INDEX records
1071 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1073 const char *dn = "DN=" LTDB_INDEX ":";
1074 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1075 return tdb_delete(tdb, key);
1081 traversal function that adds @INDEX records during a re index
1083 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1085 struct ldb_module *module = state;
1086 struct ldb_message *msg;
1091 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1092 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1096 msg = talloc(module, struct ldb_message);
1101 ret = ltdb_unpack_data(module, &data, msg);
1107 /* check if the DN key has changed, perhaps due to the
1108 case insensitivity of an element changing */
1109 key2 = ltdb_key(module, msg->dn);
1110 if (key2.dptr == NULL) {
1111 /* probably a corrupt record ... darn */
1112 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1113 ldb_dn_linearize(msg, msg->dn));
1117 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1118 tdb_delete(tdb, key);
1119 tdb_store(tdb, key2, data, 0);
1121 talloc_free(key2.dptr);
1123 if (msg->dn == NULL) {
1124 dn = (char *)key.dptr + 3;
1126 dn = ldb_dn_linearize(msg->dn, msg->dn);
1129 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1137 force a complete reindex of the database
1139 int ltdb_reindex(struct ldb_module *module)
1141 struct ltdb_private *ltdb = module->private_data;
1144 if (ltdb_cache_reload(module) != 0) {
1148 /* first traverse the database deleting any @INDEX records */
1149 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1154 /* now traverse adding any indexes for normal LDB records */
1155 ret = tdb_traverse(ltdb->tdb, re_index, module);