4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
106 const struct ldb_attrib_handler *h;
109 attr_folded = ldb_casefold(ldb, attr);
114 h = ldb_attrib_handler(ldb, attr);
115 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116 /* canonicalisation can be refused. For example,
117 a attribute that takes wildcards will refuse to canonicalise
118 if the value contains a wildcard */
119 talloc_free(attr_folded);
122 if (ldb_should_b64_encode(&v)) {
123 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
124 if (!vstr) return NULL;
125 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
127 if (v.data != value->data) {
130 talloc_free(attr_folded);
131 if (dn == NULL) return NULL;
135 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
136 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138 if (v.data != value->data) {
141 talloc_free(attr_folded);
144 ret = ldb_dn_explode(ldb, dn);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
232 for (j=0;j<el->num_values;j++) {
233 list->dn[list->count] =
234 talloc_strdup(list->dn, (char *)el->values[j].data);
235 if (!list->dn[list->count]) {
244 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
253 return a list of dn's that might match a simple indexed search on
254 the special objectclass attribute
256 static int ltdb_index_dn_objectclass(struct ldb_module *module,
257 struct ldb_parse_tree *tree,
258 const struct ldb_message *index_list,
259 struct dn_list *list)
261 struct ldb_context *ldb = module->ldb;
264 const char *target = (const char *)tree->u.equality.value.data;
265 const char **subclasses;
270 ret = ltdb_index_dn_simple(module, tree, index_list, list);
272 subclasses = ldb_subclass_list(module->ldb, target);
274 if (subclasses == NULL) {
278 for (i=0;subclasses[i];i++) {
279 struct ldb_parse_tree tree2;
280 struct dn_list *list2;
281 tree2.operation = LDB_OP_EQUALITY;
282 tree2.u.equality.attr = LTDB_OBJECTCLASS;
283 if (!tree2.u.equality.attr) {
286 tree2.u.equality.value.data =
287 (uint8_t *)talloc_strdup(list, subclasses[i]);
288 if (tree2.u.equality.value.data == NULL) {
291 tree2.u.equality.value.length = strlen(subclasses[i]);
292 list2 = talloc(list, struct dn_list);
294 talloc_free(tree2.u.equality.value.data);
297 if (ltdb_index_dn_objectclass(module, &tree2,
298 index_list, list2) == 1) {
299 if (list->count == 0) {
303 list_union(ldb, list, list2);
307 talloc_free(tree2.u.equality.value.data);
314 return a list of dn's that might match a leaf indexed search
316 static int ltdb_index_dn_leaf(struct ldb_module *module,
317 struct ldb_parse_tree *tree,
318 const struct ldb_message *index_list,
319 struct dn_list *list)
321 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
322 return ltdb_index_dn_objectclass(module, tree, index_list, list);
324 if (ldb_attr_cmp(tree->u.equality.attr, "distinguishedName") == 0 ||
325 ldb_attr_cmp(tree->u.equality.attr, "dn") == 0) {
326 list->dn = talloc_array(list, char *, 1);
327 if (list->dn == NULL) {
328 ldb_oom(module->ldb);
331 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
332 if (list->dn[0] == NULL) {
333 ldb_oom(module->ldb);
339 return ltdb_index_dn_simple(module, tree, index_list, list);
346 relies on the lists being sorted
348 static int list_intersect(struct ldb_context *ldb,
349 struct dn_list *list, const struct dn_list *list2)
351 struct dn_list *list3;
354 if (list->count == 0 || list2->count == 0) {
359 list3 = talloc(ldb, struct dn_list);
364 list3->dn = talloc_array(list3, char *, list->count);
371 for (i=0;i<list->count;i++) {
372 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
373 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
374 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
377 talloc_free(list->dn[i]);
381 talloc_free(list->dn);
382 list->dn = talloc_steal(list, list3->dn);
383 list->count = list3->count;
393 relies on the lists being sorted
395 static int list_union(struct ldb_context *ldb,
396 struct dn_list *list, const struct dn_list *list2)
400 unsigned int count = list->count;
402 if (list->count == 0 && list2->count == 0) {
407 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
413 for (i=0;i<list2->count;i++) {
414 if (ldb_list_find(list2->dn[i], list->dn, count,
415 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
416 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
417 if (!list->dn[list->count]) {
424 if (list->count != count) {
425 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
431 static int ltdb_index_dn(struct ldb_module *module,
432 struct ldb_parse_tree *tree,
433 const struct ldb_message *index_list,
434 struct dn_list *list);
440 static int ltdb_index_dn_or(struct ldb_module *module,
441 struct ldb_parse_tree *tree,
442 const struct ldb_message *index_list,
443 struct dn_list *list)
445 struct ldb_context *ldb = module->ldb;
453 for (i=0;i<tree->u.list.num_elements;i++) {
454 struct dn_list *list2;
457 list2 = talloc(module, struct dn_list);
462 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
475 talloc_free(list->dn);
482 list->dn = talloc_steal(list, list2->dn);
483 list->count = list2->count;
485 if (list_union(ldb, list, list2) == -1) {
494 if (list->count == 0) {
505 static int ltdb_index_dn_not(struct ldb_module *module,
506 struct ldb_parse_tree *tree,
507 const struct ldb_message *index_list,
508 struct dn_list *list)
510 /* the only way to do an indexed not would be if we could
511 negate the not via another not or if we knew the total
512 number of database elements so we could know that the
513 existing expression covered the whole database.
515 instead, we just give up, and rely on a full index scan
516 (unless an outer & manages to reduce the list)
522 AND two index results
524 static int ltdb_index_dn_and(struct ldb_module *module,
525 struct ldb_parse_tree *tree,
526 const struct ldb_message *index_list,
527 struct dn_list *list)
529 struct ldb_context *ldb = module->ldb;
537 for (i=0;i<tree->u.list.num_elements;i++) {
538 struct dn_list *list2;
541 list2 = talloc(module, struct dn_list);
546 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
550 talloc_free(list->dn);
562 talloc_free(list->dn);
563 list->dn = talloc_steal(list, list2->dn);
564 list->count = list2->count;
566 if (list_intersect(ldb, list, list2) == -1) {
574 if (list->count == 0) {
575 talloc_free(list->dn);
584 return a list of dn's that might match a indexed search or
585 -1 if an error. return 0 for no matches, or 1 for matches
587 static int ltdb_index_dn(struct ldb_module *module,
588 struct ldb_parse_tree *tree,
589 const struct ldb_message *index_list,
590 struct dn_list *list)
594 switch (tree->operation) {
596 ret = ltdb_index_dn_and(module, tree, index_list, list);
600 ret = ltdb_index_dn_or(module, tree, index_list, list);
604 ret = ltdb_index_dn_not(module, tree, index_list, list);
607 case LDB_OP_EQUALITY:
608 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
611 case LDB_OP_SUBSTRING:
616 case LDB_OP_EXTENDED:
617 /* we can't index with fancy bitops yet */
626 filter a candidate dn_list from an indexed search into a set of results
627 extracting just the given attributes
629 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
630 const struct ldb_dn *base,
631 enum ldb_scope scope,
632 const struct dn_list *dn_list,
633 const char * const attrs[], struct ldb_message ***res)
638 for (i = 0; i < dn_list->count; i++) {
639 struct ldb_message *msg;
643 msg = talloc(module, struct ldb_message);
648 dn = ldb_dn_explode(msg, dn_list->dn[i]);
654 ret = ltdb_search_dn1(module, dn, msg);
657 /* the record has disappeared? yes, this can happen */
663 /* an internal error */
669 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
670 ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
682 search the database with a LDAP-like expression using indexes
683 returns -1 if an indexed search is not possible, in which
684 case the caller should call ltdb_search_full()
686 int ltdb_search_indexed(struct ldb_module *module,
687 const struct ldb_dn *base,
688 enum ldb_scope scope,
689 struct ldb_parse_tree *tree,
690 const char * const attrs[], struct ldb_message ***res)
692 struct ltdb_private *ltdb = module->private_data;
693 struct dn_list *dn_list;
696 if (ltdb->cache->indexlist->num_elements == 0 &&
697 scope != LDB_SCOPE_BASE) {
698 /* no index list? must do full search */
702 dn_list = talloc(module, struct dn_list);
703 if (dn_list == NULL) {
707 if (scope == LDB_SCOPE_BASE) {
708 /* with BASE searches only one DN can match */
709 dn_list->dn = talloc_array(dn_list, char *, 1);
710 if (dn_list->dn == NULL) {
711 ldb_oom(module->ldb);
714 dn_list->dn[0] = ldb_dn_linearize(dn_list, base);
715 if (dn_list->dn[0] == NULL) {
716 ldb_oom(module->ldb);
722 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
726 /* we've got a candidate list - now filter by the full tree
727 and extract the needed attributes */
728 ret = ldb_index_filter(module, tree, base, scope, dn_list,
732 talloc_free(dn_list);
738 add a index element where this is the first indexed DN for this value
740 static int ltdb_index_add1_new(struct ldb_context *ldb,
741 struct ldb_message *msg,
742 struct ldb_message_element *el,
745 struct ldb_message_element *el2;
747 /* add another entry */
748 el2 = talloc_realloc(msg, msg->elements,
749 struct ldb_message_element, msg->num_elements+1);
755 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
756 if (!msg->elements[msg->num_elements].name) {
759 msg->elements[msg->num_elements].num_values = 0;
760 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
761 if (!msg->elements[msg->num_elements].values) {
764 msg->elements[msg->num_elements].values[0].length = strlen(dn);
765 msg->elements[msg->num_elements].values[0].data = (uint8_t *)dn;
766 msg->elements[msg->num_elements].num_values = 1;
774 add a index element where this is not the first indexed DN for this
777 static int ltdb_index_add1_add(struct ldb_context *ldb,
778 struct ldb_message *msg,
779 struct ldb_message_element *el,
786 /* for multi-valued attributes we can end up with repeats */
787 for (i=0;i<msg->elements[idx].num_values;i++) {
788 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
793 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
795 msg->elements[idx].num_values+1);
799 msg->elements[idx].values = v2;
801 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
802 msg->elements[idx].values[msg->elements[idx].num_values].data = (uint8_t *)dn;
803 msg->elements[idx].num_values++;
809 add an index entry for one message element
811 static int ltdb_index_add1(struct ldb_module *module, char *dn,
812 struct ldb_message_element *el, int v_idx)
814 struct ldb_context *ldb = module->ldb;
815 struct ldb_message *msg;
816 struct ldb_dn *dn_key;
820 msg = talloc(module, struct ldb_message);
826 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
832 talloc_steal(msg, dn_key);
834 ret = ltdb_search_dn1(module, dn_key, msg);
842 msg->num_elements = 0;
843 msg->elements = NULL;
846 for (i=0;i<msg->num_elements;i++) {
847 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
852 if (i == msg->num_elements) {
853 ret = ltdb_index_add1_new(ldb, msg, el, dn);
855 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
859 ret = ltdb_store(module, msg, TDB_REPLACE);
867 static int ltdb_index_add0(struct ldb_module *module, char *dn,
868 struct ldb_message_element *elements, int num_el)
870 struct ltdb_private *ltdb = module->private_data;
878 if (ltdb->cache->indexlist->num_elements == 0) {
879 /* no indexed fields */
883 for (i = 0; i < num_el; i++) {
884 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
889 for (j = 0; j < elements[i].num_values; j++) {
890 ret = ltdb_index_add1(module, dn, &elements[i], j);
902 add the index entries for a new record
905 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
907 struct ltdb_private *ltdb = module->private_data;
911 dn = ldb_dn_linearize(ltdb, msg->dn);
916 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
925 delete an index entry for one message element
927 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
928 struct ldb_message_element *el, int v_idx)
930 struct ldb_context *ldb = module->ldb;
931 struct ldb_message *msg;
932 struct ldb_dn *dn_key;
940 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
945 msg = talloc(dn_key, struct ldb_message);
951 ret = ltdb_search_dn1(module, dn_key, msg);
958 /* it wasn't indexed. Did we have an earlier error? If we did then
964 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
966 ldb_debug(ldb, LDB_DEBUG_ERROR,
967 "ERROR: dn %s not found in %s\n", dn,
968 ldb_dn_linearize(dn_key, dn_key));
969 /* it ain't there. hmmm */
974 if (j != msg->elements[i].num_values - 1) {
975 memmove(&msg->elements[i].values[j],
976 &msg->elements[i].values[j+1],
977 (msg->elements[i].num_values-(j+1)) *
978 sizeof(msg->elements[i].values[0]));
980 msg->elements[i].num_values--;
982 if (msg->elements[i].num_values == 0) {
983 ret = ltdb_delete_noindex(module, dn_key);
985 ret = ltdb_store(module, msg, TDB_REPLACE);
994 delete the index entries for a record
997 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
999 struct ltdb_private *ltdb = module->private_data;
1004 if (ldb_dn_is_special(msg->dn)) {
1008 dn = ldb_dn_linearize(ltdb, msg->dn);
1013 /* find the list of indexed fields */
1014 if (ltdb->cache->indexlist->num_elements == 0) {
1015 /* no indexed fields */
1019 for (i = 0; i < msg->num_elements; i++) {
1020 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1021 NULL, LTDB_IDXATTR);
1025 for (j = 0; j < msg->elements[i].num_values; j++) {
1026 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1040 traversal function that deletes all @INDEX records
1042 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1044 const char *dn = "DN=" LTDB_INDEX ":";
1045 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1046 return tdb_delete(tdb, key);
1052 traversal function that adds @INDEX records during a re index
1054 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1056 struct ldb_module *module = state;
1057 struct ldb_message *msg;
1062 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1063 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1067 msg = talloc(module, struct ldb_message);
1072 ret = ltdb_unpack_data(module, &data, msg);
1078 /* check if the DN key has changed, perhaps due to the
1079 case insensitivity of an element changing */
1080 key2 = ltdb_key(module, msg->dn);
1081 if (key2.dptr == NULL) {
1082 /* probably a corrupt record ... darn */
1083 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1084 ldb_dn_linearize(msg, msg->dn));
1088 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1089 tdb_delete(tdb, key);
1090 tdb_store(tdb, key2, data, 0);
1092 talloc_free(key2.dptr);
1094 if (msg->dn == NULL) {
1095 dn = (char *)key.dptr + 3;
1097 dn = ldb_dn_linearize(msg->dn, msg->dn);
1100 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1108 force a complete reindex of the database
1110 int ltdb_reindex(struct ldb_module *module)
1112 struct ltdb_private *ltdb = module->private_data;
1115 if (ltdb_cache_reload(module) != 0) {
1119 /* first traverse the database deleting any @INDEX records */
1120 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1125 /* now traverse adding any indexes for normal LDB records */
1126 ret = tdb_traverse(ltdb->tdb, re_index, module);