4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static char *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
105 if (ldb_should_b64_encode(value)) {
106 char *vstr = ldb_base64_encode(ldb, value->data, value->length);
107 if (!vstr) return NULL;
108 ret = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr, vstr);
113 return talloc_asprintf(ldb, "%s:%s:%.*s",
114 LTDB_INDEX, attr, value->length, (char *)value->data);
118 see if a attribute value is in the list of indexed attributes
120 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
121 unsigned int *v_idx, const char *key)
124 for (i=0;i<msg->num_elements;i++) {
125 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
126 const struct ldb_message_element *el =
128 for (j=0;j<el->num_values;j++) {
129 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
141 /* used in sorting dn lists */
142 static int list_cmp(const char **s1, const char **s2)
144 return strcmp(*s1, *s2);
148 return a list of dn's that might match a simple indexed search or
150 static int ltdb_index_dn_simple(struct ldb_module *module,
151 struct ldb_parse_tree *tree,
152 const struct ldb_message *index_list,
153 struct dn_list *list)
155 struct ldb_context *ldb = module->ldb;
159 struct ldb_message *msg;
165 if the value is a wildcard then we can't do a match via indexing
167 if (ltdb_has_wildcard(module, tree->u.simple.attr, &tree->u.simple.value)) {
171 /* if the attribute isn't in the list of indexed attributes then
172 this node needs a full search */
173 if (ldb_msg_find_idx(index_list, tree->u.simple.attr, NULL, LTDB_IDXATTR) == -1) {
177 /* the attribute is indexed. Pull the list of DNs that match the
179 dn = ldb_dn_key(ldb, tree->u.simple.attr, &tree->u.simple.value);
182 msg = talloc(list, struct ldb_message);
187 ret = ltdb_search_dn1(module, dn, msg);
189 if (ret == 0 || ret == -1) {
193 for (i=0;i<msg->num_elements;i++) {
194 struct ldb_message_element *el;
196 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
200 el = &msg->elements[i];
202 list->dn = talloc_array(list, char *, el->num_values);
207 for (j=0;j<el->num_values;j++) {
208 list->dn[list->count] =
209 talloc_strdup(list->dn, (char *)el->values[j].data);
210 if (!list->dn[list->count]) {
219 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
225 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
228 return a list of dn's that might match a simple indexed search on
229 the special objectclass attribute
231 static int ltdb_index_dn_objectclass(struct ldb_module *module,
232 struct ldb_parse_tree *tree,
233 const struct ldb_message *index_list,
234 struct dn_list *list)
236 struct ldb_context *ldb = module->ldb;
237 struct ltdb_private *ltdb = module->private_data;
240 const char *target = tree->u.simple.value.data;
245 ret = ltdb_index_dn_simple(module, tree, index_list, list);
247 for (i=0;i<ltdb->cache->subclasses->num_elements;i++) {
248 struct ldb_message_element *el = <db->cache->subclasses->elements[i];
249 if (ldb_attr_cmp(el->name, target) == 0) {
251 for (j=0;j<el->num_values;j++) {
252 struct ldb_parse_tree tree2;
253 struct dn_list *list2;
254 tree2.operation = LDB_OP_SIMPLE;
255 tree2.u.simple.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
256 if (!tree2.u.simple.attr) {
259 tree2.u.simple.value = el->values[j];
260 list2 = talloc(list, struct dn_list);
264 if (ltdb_index_dn_objectclass(module, &tree2,
265 index_list, list2) == 1) {
266 if (list->count == 0) {
270 list_union(ldb, list, list2);
274 talloc_free(tree2.u.simple.attr);
283 return a list of dn's that might match a leaf indexed search
285 static int ltdb_index_dn_leaf(struct ldb_module *module,
286 struct ldb_parse_tree *tree,
287 const struct ldb_message *index_list,
288 struct dn_list *list)
290 if (ldb_attr_cmp(tree->u.simple.attr, LTDB_OBJECTCLASS) == 0) {
291 return ltdb_index_dn_objectclass(module, tree, index_list, list);
293 return ltdb_index_dn_simple(module, tree, index_list, list);
300 relies on the lists being sorted
302 static int list_intersect(struct ldb_context *ldb,
303 struct dn_list *list, const struct dn_list *list2)
305 struct dn_list *list3;
308 if (list->count == 0 || list2->count == 0) {
313 list3 = talloc(ldb, struct dn_list);
318 list3->dn = talloc_array(list3, char *, list->count);
325 for (i=0;i<list->count;i++) {
326 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
327 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
328 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
331 talloc_free(list->dn[i]);
335 talloc_free(list->dn);
336 list->dn = talloc_steal(list, list3->dn);
337 list->count = list3->count;
347 relies on the lists being sorted
349 static int list_union(struct ldb_context *ldb,
350 struct dn_list *list, const struct dn_list *list2)
354 unsigned int count = list->count;
356 if (list->count == 0 && list2->count == 0) {
361 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
367 for (i=0;i<list2->count;i++) {
368 if (ldb_list_find(list2->dn[i], list->dn, count,
369 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
370 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
371 if (!list->dn[list->count]) {
378 if (list->count != count) {
379 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
385 static int ltdb_index_dn(struct ldb_module *module,
386 struct ldb_parse_tree *tree,
387 const struct ldb_message *index_list,
388 struct dn_list *list);
394 static int ltdb_index_dn_or(struct ldb_module *module,
395 struct ldb_parse_tree *tree,
396 const struct ldb_message *index_list,
397 struct dn_list *list)
399 struct ldb_context *ldb = module->ldb;
407 for (i=0;i<tree->u.list.num_elements;i++) {
408 struct dn_list *list2;
411 list2 = talloc(module, struct dn_list);
416 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
429 talloc_free(list->dn);
436 list->dn = talloc_steal(list, list2->dn);
437 list->count = list2->count;
439 if (list_union(ldb, list, list2) == -1) {
448 if (list->count == 0) {
459 static int ltdb_index_dn_not(struct ldb_module *module,
460 struct ldb_parse_tree *tree,
461 const struct ldb_message *index_list,
462 struct dn_list *list)
464 /* the only way to do an indexed not would be if we could
465 negate the not via another not or if we knew the total
466 number of database elements so we could know that the
467 existing expression covered the whole database.
469 instead, we just give up, and rely on a full index scan
470 (unless an outer & manages to reduce the list)
476 AND two index results
478 static int ltdb_index_dn_and(struct ldb_module *module,
479 struct ldb_parse_tree *tree,
480 const struct ldb_message *index_list,
481 struct dn_list *list)
483 struct ldb_context *ldb = module->ldb;
491 for (i=0;i<tree->u.list.num_elements;i++) {
492 struct dn_list *list2;
495 list2 = talloc(module, struct dn_list);
500 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
504 talloc_free(list->dn);
516 talloc_free(list->dn);
517 list->dn = talloc_steal(list, list2->dn);
518 list->count = list2->count;
520 if (list_intersect(ldb, list, list2) == -1) {
528 if (list->count == 0) {
529 talloc_free(list->dn);
538 return a list of dn's that might match a indexed search or
539 -1 if an error. return 0 for no matches, or 1 for matches
541 static int ltdb_index_dn(struct ldb_module *module,
542 struct ldb_parse_tree *tree,
543 const struct ldb_message *index_list,
544 struct dn_list *list)
548 switch (tree->operation) {
550 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
553 case LDB_OP_EXTENDED:
554 /* we can't index with fancy bitops yet */
559 ret = ltdb_index_dn_and(module, tree, index_list, list);
563 ret = ltdb_index_dn_or(module, tree, index_list, list);
567 ret = ltdb_index_dn_not(module, tree, index_list, list);
575 filter a candidate dn_list from an indexed search into a set of results
576 extracting just the given attributes
578 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
580 enum ldb_scope scope,
581 const struct dn_list *dn_list,
582 const char * const attrs[], struct ldb_message ***res)
587 for (i=0;i<dn_list->count;i++) {
588 struct ldb_message *msg;
591 msg = talloc(module, struct ldb_message);
596 ret = ltdb_search_dn1(module, dn_list->dn[i], msg);
598 /* the record has disappeared? yes, this can happen */
604 /* an internal error */
610 if (ltdb_message_match(module, msg, tree, base, scope) == 1) {
611 ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
623 search the database with a LDAP-like expression using indexes
624 returns -1 if an indexed search is not possible, in which
625 case the caller should call ltdb_search_full()
627 int ltdb_search_indexed(struct ldb_module *module,
629 enum ldb_scope scope,
630 struct ldb_parse_tree *tree,
631 const char * const attrs[], struct ldb_message ***res)
633 struct ltdb_private *ltdb = module->private_data;
634 struct dn_list *dn_list;
637 if (ltdb->cache->indexlist->num_elements == 0) {
638 /* no index list? must do full search */
642 dn_list = talloc(module, struct dn_list);
643 if (dn_list == NULL) {
647 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
650 /* we've got a candidate list - now filter by the full tree
651 and extract the needed attributes */
652 ret = ldb_index_filter(module, tree, base, scope, dn_list,
656 talloc_free(dn_list);
662 add a index element where this is the first indexed DN for this value
664 static int ltdb_index_add1_new(struct ldb_context *ldb,
665 struct ldb_message *msg,
666 struct ldb_message_element *el,
669 struct ldb_message_element *el2;
671 /* add another entry */
672 el2 = talloc_realloc(msg, msg->elements,
673 struct ldb_message_element, msg->num_elements+1);
679 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
680 if (!msg->elements[msg->num_elements].name) {
683 msg->elements[msg->num_elements].num_values = 0;
684 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
685 if (!msg->elements[msg->num_elements].values) {
688 msg->elements[msg->num_elements].values[0].length = strlen(dn);
689 msg->elements[msg->num_elements].values[0].data = dn;
690 msg->elements[msg->num_elements].num_values = 1;
698 add a index element where this is not the first indexed DN for this
701 static int ltdb_index_add1_add(struct ldb_context *ldb,
702 struct ldb_message *msg,
703 struct ldb_message_element *el,
710 /* for multi-valued attributes we can end up with repeats */
711 for (i=0;i<msg->elements[idx].num_values;i++) {
712 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
717 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
719 msg->elements[idx].num_values+1);
723 msg->elements[idx].values = v2;
725 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
726 msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
727 msg->elements[idx].num_values++;
733 add an index entry for one message element
735 static int ltdb_index_add1(struct ldb_module *module, char *dn,
736 struct ldb_message_element *el, int v_idx)
738 struct ldb_context *ldb = module->ldb;
739 struct ldb_message *msg;
744 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
749 msg = talloc(dn_key, struct ldb_message);
754 ret = ltdb_search_dn1(module, dn_key, msg);
761 msg->dn = talloc_strdup(msg, dn_key);
767 msg->num_elements = 0;
768 msg->elements = NULL;
771 for (i=0;i<msg->num_elements;i++) {
772 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
777 if (i == msg->num_elements) {
778 ret = ltdb_index_add1_new(ldb, msg, el, dn);
780 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
784 ret = ltdb_store(module, msg, TDB_REPLACE);
793 add the index entries for a new record
796 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
798 struct ltdb_private *ltdb = module->private_data;
802 if (ltdb->cache->indexlist->num_elements == 0) {
803 /* no indexed fields */
807 for (i=0;i<msg->num_elements;i++) {
808 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
813 for (j=0;j<msg->elements[i].num_values;j++) {
814 ret = ltdb_index_add1(module, msg->dn, &msg->elements[i], j);
826 delete an index entry for one message element
828 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
829 struct ldb_message_element *el, int v_idx)
831 struct ldb_context *ldb = module->ldb;
832 struct ldb_message *msg;
837 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
842 msg = talloc(dn_key, struct ldb_message);
848 ret = ltdb_search_dn1(module, dn_key, msg);
855 /* it wasn't indexed. Did we have an earlier error? If we did then
861 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
863 ldb_debug(ldb, LDB_DEBUG_ERROR, "ERROR: dn %s not found in %s\n", dn, dn_key);
864 /* it ain't there. hmmm */
869 if (j != msg->elements[i].num_values - 1) {
870 memmove(&msg->elements[i].values[j],
871 &msg->elements[i].values[j+1],
872 (msg->elements[i].num_values-(j+1)) *
873 sizeof(msg->elements[i].values[0]));
875 msg->elements[i].num_values--;
877 if (msg->elements[i].num_values == 0) {
878 ret = ltdb_delete_noindex(module, dn_key);
880 ret = ltdb_store(module, msg, TDB_REPLACE);
889 delete the index entries for a record
892 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
894 struct ltdb_private *ltdb = module->private_data;
898 /* find the list of indexed fields */
899 if (ltdb->cache->indexlist->num_elements == 0) {
900 /* no indexed fields */
904 for (i=0;i<msg->num_elements;i++) {
905 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
910 for (j=0;j<msg->elements[i].num_values;j++) {
911 ret = ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
923 traversal function that deletes all @INDEX records
925 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
927 const char *dn = "DN=" LTDB_INDEX ":";
928 if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
929 return tdb_delete(tdb, key);
935 traversal function that adds @INDEX records during a re index
937 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
939 struct ldb_module *module = state;
940 struct ldb_message *msg;
944 if (strncmp(key.dptr, "DN=@", 4) == 0 ||
945 strncmp(key.dptr, "DN=", 3) != 0) {
949 msg = talloc(module, struct ldb_message);
954 ret = ltdb_unpack_data(module, &data, msg);
960 /* check if the DN key has changed, perhaps due to the
961 case insensitivity of an element changing */
962 key2 = ltdb_key(module, msg->dn);
963 if (strcmp(key2.dptr, key.dptr) != 0) {
964 tdb_delete(tdb, key);
965 tdb_store(tdb, key2, data, 0);
967 talloc_free(key2.dptr);
970 msg->dn = key.dptr+3;
973 ret = ltdb_index_add(module, msg);
981 force a complete reindex of the database
983 int ltdb_reindex(struct ldb_module *module)
985 struct ltdb_private *ltdb = module->private_data;
988 if (ltdb_cache_reload(module) != 0) {
992 /* first traverse the database deleting any @INDEX records */
993 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
999 /* now traverse adding any indexes for normal LDB records */
1000 ret = tdb_traverse(ltdb->tdb, re_index, module);