4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
42 struct tdb_context *itdb;
47 /* we put a @IDXVERSION attribute on index entries. This
48 allows us to tell if it was written by an older version
50 #define LTDB_INDEXING_VERSION 2
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
55 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56 ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
60 /* compare two DN entries in a dn_list. Take account of possible
61 * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
64 int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65 if (ret != 0) return ret;
66 if (v2->length > v1->length && v2->data[v1->length] != 0) {
74 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75 comparison with the dn returns -1 if not found
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
80 for (i=0; i<list->count; i++) {
81 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
87 find a entry in a dn_list. Uses a case sensitive comparison with the dn
88 returns -1 if not found
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
93 v.data = discard_const_p(unsigned char, dn);
94 v.length = strlen(dn);
95 return ltdb_dn_list_find_val(list, &v);
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
100 struct dn_list *list;
101 if (rec.dsize != sizeof(void *)) {
102 ldb_asprintf_errstring(ldb_module_get_ctx(module),
103 "Bad data size for idxptr %u", (unsigned)rec.dsize);
107 list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
109 ldb_asprintf_errstring(ldb_module_get_ctx(module),
110 "Bad type '%s' for idxptr",
111 talloc_get_name(*(struct dn_list **)rec.dptr));
114 if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115 ldb_asprintf_errstring(ldb_module_get_ctx(module),
116 "Bad parent '%s' for idxptr",
117 talloc_get_name(talloc_parent(list->dn)));
124 return the @IDX list in an index entry for a dn as a
127 static int ltdb_dn_list_load(struct ldb_module *module,
128 struct ldb_dn *dn, struct dn_list *list)
130 struct ldb_message *msg;
132 struct ldb_message_element *el;
133 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
135 struct dn_list *list2;
141 /* see if we have any in-memory index entries */
142 if (ltdb->idxptr == NULL ||
143 ltdb->idxptr->itdb == NULL) {
147 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148 key.dsize = strlen((char *)key.dptr);
150 rec = tdb_fetch(ltdb->idxptr->itdb, key);
151 if (rec.dptr == NULL) {
155 /* we've found an in-memory index entry */
156 list2 = ltdb_index_idxptr(module, rec, true);
159 return LDB_ERR_OPERATIONS_ERROR;
167 msg = ldb_msg_new(list);
169 return LDB_ERR_OPERATIONS_ERROR;
172 ret = ltdb_search_dn1(module, dn, msg);
173 if (ret != LDB_SUCCESS) {
177 /* TODO: check indexing version number */
179 el = ldb_msg_find_element(msg, LTDB_IDX);
185 /* we avoid copying the strings by stealing the list */
186 list->dn = talloc_steal(list, el->values);
187 list->count = el->num_values;
194 save a dn_list into a full @IDX style record
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
197 struct dn_list *list)
199 struct ldb_message *msg;
202 if (list->count == 0) {
203 ret = ltdb_delete_noindex(module, dn);
204 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
210 msg = ldb_msg_new(module);
212 ldb_module_oom(module);
213 return LDB_ERR_OPERATIONS_ERROR;
216 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217 if (ret != LDB_SUCCESS) {
219 ldb_module_oom(module);
220 return LDB_ERR_OPERATIONS_ERROR;
224 if (list->count > 0) {
225 struct ldb_message_element *el;
227 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228 if (ret != LDB_SUCCESS) {
229 ldb_module_oom(module);
231 return LDB_ERR_OPERATIONS_ERROR;
233 el->values = list->dn;
234 el->num_values = list->count;
237 ret = ltdb_store(module, msg, TDB_REPLACE);
243 save a dn_list into the database, in either @IDX or internal format
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
246 struct dn_list *list)
248 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
251 struct dn_list *list2;
253 if (ltdb->idxptr == NULL) {
254 return ltdb_dn_list_store_full(module, dn, list);
257 if (ltdb->idxptr->itdb == NULL) {
258 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259 if (ltdb->idxptr->itdb == NULL) {
260 return LDB_ERR_OPERATIONS_ERROR;
264 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265 key.dsize = strlen((char *)key.dptr);
267 rec = tdb_fetch(ltdb->idxptr->itdb, key);
268 if (rec.dptr != NULL) {
269 list2 = ltdb_index_idxptr(module, rec, false);
272 return LDB_ERR_OPERATIONS_ERROR;
275 list2->dn = talloc_steal(list2, list->dn);
276 list2->count = list->count;
280 list2 = talloc(ltdb->idxptr, struct dn_list);
282 return LDB_ERR_OPERATIONS_ERROR;
284 list2->dn = talloc_steal(list2, list->dn);
285 list2->count = list->count;
287 rec.dptr = (uint8_t *)&list2;
288 rec.dsize = sizeof(void *);
290 ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
292 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
298 traverse function for storing the in-memory index entries on disk
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
302 struct ldb_module *module = state;
303 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
305 struct ldb_context *ldb = ldb_module_get_ctx(module);
307 struct dn_list *list;
309 list = ltdb_index_idxptr(module, data, true);
311 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
316 v.length = key.dsize;
318 dn = ldb_dn_from_ldb_val(module, ldb, &v);
320 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
324 ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
326 if (ltdb->idxptr->error != 0) {
332 /* cleanup the idxptr mode when transaction commits */
333 int ltdb_index_transaction_commit(struct ldb_module *module)
335 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
338 if (ltdb->idxptr->itdb) {
339 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
340 tdb_close(ltdb->idxptr->itdb);
343 ret = ltdb->idxptr->error;
345 if (ret != LDB_SUCCESS) {
346 struct ldb_context *ldb = ldb_module_get_ctx(module);
347 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
350 talloc_free(ltdb->idxptr);
355 /* cleanup the idxptr mode when transaction cancels */
356 int ltdb_index_transaction_cancel(struct ldb_module *module)
358 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
359 if (ltdb->idxptr && ltdb->idxptr->itdb) {
360 tdb_close(ltdb->idxptr->itdb);
362 talloc_free(ltdb->idxptr);
369 return the dn key to be used for an index
370 the caller is responsible for freeing
372 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
373 const char *attr, const struct ldb_val *value,
374 const struct ldb_schema_attribute **ap)
378 const struct ldb_schema_attribute *a;
382 attr_folded = ldb_attr_casefold(ldb, attr);
387 a = ldb_schema_attribute_by_name(ldb, attr);
391 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
392 if (r != LDB_SUCCESS) {
393 const char *errstr = ldb_errstring(ldb);
394 /* canonicalisation can be refused. For example,
395 a attribute that takes wildcards will refuse to canonicalise
396 if the value contains a wildcard */
397 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
398 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
399 talloc_free(attr_folded);
402 if (ldb_should_b64_encode(ldb, &v)) {
403 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
404 if (!vstr) return NULL;
405 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
408 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
411 if (v.data != value->data) {
414 talloc_free(attr_folded);
420 see if a attribute value is in the list of indexed attributes
422 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
425 struct ldb_message_element *el;
427 el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
432 /* TODO: this is too expensive! At least use a binary search */
433 for (i=0; i<el->num_values; i++) {
434 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
442 in the following logic functions, the return value is treated as
445 LDB_SUCCESS: we found some matching index values
447 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
449 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
450 we'll need a full search
454 return a list of dn's that might match a simple indexed search (an
455 equality search only)
457 static int ltdb_index_dn_simple(struct ldb_module *module,
458 const struct ldb_parse_tree *tree,
459 const struct ldb_message *index_list,
460 struct dn_list *list)
462 struct ldb_context *ldb;
466 ldb = ldb_module_get_ctx(module);
471 /* if the attribute isn't in the list of indexed attributes then
472 this node needs a full search */
473 if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
474 return LDB_ERR_OPERATIONS_ERROR;
477 /* the attribute is indexed. Pull the list of DNs that match the
479 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
480 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
482 ret = ltdb_dn_list_load(module, dn, list);
488 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
491 return a list of dn's that might match a leaf indexed search
493 static int ltdb_index_dn_leaf(struct ldb_module *module,
494 const struct ldb_parse_tree *tree,
495 const struct ldb_message *index_list,
496 struct dn_list *list)
498 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
499 list->dn = talloc_array(list, struct ldb_val, 1);
500 if (list->dn == NULL) {
501 ldb_module_oom(module);
502 return LDB_ERR_OPERATIONS_ERROR;
504 list->dn[0] = tree->u.equality.value;
508 return ltdb_index_dn_simple(module, tree, index_list, list);
516 static bool list_intersect(struct ldb_context *ldb,
517 struct dn_list *list, const struct dn_list *list2)
519 struct dn_list *list3;
522 if (list->count == 0) {
526 if (list2->count == 0) {
533 /* the indexing code is allowed to return a longer list than
534 what really matches, as all results are filtered by the
535 full expression at the end - this shortcut avoids a lot of
536 work in some cases */
537 if (list->count < 2 && list2->count > 10) {
540 if (list2->count < 2 && list->count > 10) {
541 list->count = list2->count;
542 list->dn = list2->dn;
543 /* note that list2 may not be the parent of list2->dn,
544 as list2->dn may be owned by ltdb->idxptr. In that
545 case we expect this reparent call to fail, which is
547 talloc_reparent(list2, list, list2->dn);
551 list3 = talloc_zero(list, struct dn_list);
556 list3->dn = talloc_array(list3, struct ldb_val, list->count);
563 for (i=0;i<list->count;i++) {
564 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
565 list3->dn[list3->count] = list->dn[i];
570 list->dn = talloc_steal(list, list3->dn);
571 list->count = list3->count;
582 static bool list_union(struct ldb_context *ldb,
583 struct dn_list *list, const struct dn_list *list2)
587 if (list2->count == 0) {
592 if (list->count == 0) {
594 list->count = list2->count;
595 list->dn = list2->dn;
596 /* note that list2 may not be the parent of list2->dn,
597 as list2->dn may be owned by ltdb->idxptr. In that
598 case we expect this reparent call to fail, which is
600 talloc_reparent(list2, list, list2->dn);
604 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
610 /* we allow for duplicates here, and get rid of them later */
611 memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
612 memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
615 list->count += list2->count;
620 static int ltdb_index_dn(struct ldb_module *module,
621 const struct ldb_parse_tree *tree,
622 const struct ldb_message *index_list,
623 struct dn_list *list);
627 process an OR list (a union)
629 static int ltdb_index_dn_or(struct ldb_module *module,
630 const struct ldb_parse_tree *tree,
631 const struct ldb_message *index_list,
632 struct dn_list *list)
634 struct ldb_context *ldb;
637 ldb = ldb_module_get_ctx(module);
642 for (i=0; i<tree->u.list.num_elements; i++) {
643 struct dn_list *list2;
646 list2 = talloc_zero(list, struct dn_list);
648 return LDB_ERR_OPERATIONS_ERROR;
651 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
653 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
659 if (ret != LDB_SUCCESS) {
665 if (!list_union(ldb, list, list2)) {
667 return LDB_ERR_OPERATIONS_ERROR;
671 if (list->count == 0) {
672 return LDB_ERR_NO_SUCH_OBJECT;
682 static int ltdb_index_dn_not(struct ldb_module *module,
683 const struct ldb_parse_tree *tree,
684 const struct ldb_message *index_list,
685 struct dn_list *list)
687 /* the only way to do an indexed not would be if we could
688 negate the not via another not or if we knew the total
689 number of database elements so we could know that the
690 existing expression covered the whole database.
692 instead, we just give up, and rely on a full index scan
693 (unless an outer & manages to reduce the list)
695 return LDB_ERR_OPERATIONS_ERROR;
699 static bool ltdb_index_unique(struct ldb_context *ldb,
702 const struct ldb_schema_attribute *a;
703 a = ldb_schema_attribute_by_name(ldb, attr);
704 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
711 process an AND expression (intersection)
713 static int ltdb_index_dn_and(struct ldb_module *module,
714 const struct ldb_parse_tree *tree,
715 const struct ldb_message *index_list,
716 struct dn_list *list)
718 struct ldb_context *ldb;
722 ldb = ldb_module_get_ctx(module);
727 /* in the first pass we only look for unique simple
728 equality tests, in the hope of avoiding having to look
730 for (i=0; i<tree->u.list.num_elements; i++) {
731 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
734 if (subtree->operation != LDB_OP_EQUALITY ||
735 !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
739 ret = ltdb_index_dn(module, subtree, index_list, list);
740 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
742 return LDB_ERR_NO_SUCH_OBJECT;
744 if (ret == LDB_SUCCESS) {
745 /* a unique index match means we can
746 * stop. Note that we don't care if we return
747 * a few too many objects, due to later
753 /* now do a full intersection */
756 for (i=0; i<tree->u.list.num_elements; i++) {
757 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
758 struct dn_list *list2;
761 list2 = talloc_zero(list, struct dn_list);
763 ldb_module_oom(module);
764 return LDB_ERR_OPERATIONS_ERROR;
767 ret = ltdb_index_dn(module, subtree, index_list, list2);
769 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
774 return LDB_ERR_NO_SUCH_OBJECT;
777 if (ret != LDB_SUCCESS) {
778 /* this didn't adding anything */
784 talloc_reparent(list2, list, list->dn);
785 list->dn = list2->dn;
786 list->count = list2->count;
788 } else if (!list_intersect(ldb, list, list2)) {
790 return LDB_ERR_OPERATIONS_ERROR;
793 if (list->count == 0) {
795 return LDB_ERR_NO_SUCH_OBJECT;
798 if (list->count < 2) {
799 /* it isn't worth loading the next part of the tree */
805 /* none of the attributes were indexed */
806 return LDB_ERR_OPERATIONS_ERROR;
813 return a list of matching objects using a one-level index
815 static int ltdb_index_dn_one(struct ldb_module *module,
816 struct ldb_dn *parent_dn,
817 struct dn_list *list)
819 struct ldb_context *ldb;
824 ldb = ldb_module_get_ctx(module);
826 /* work out the index key from the parent DN */
827 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
828 val.length = strlen((char *)val.data);
829 key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
832 return LDB_ERR_OPERATIONS_ERROR;
835 ret = ltdb_dn_list_load(module, key, list);
837 if (ret != LDB_SUCCESS) {
841 if (list->count == 0) {
842 return LDB_ERR_NO_SUCH_OBJECT;
849 return a list of dn's that might match a indexed search or
850 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
852 static int ltdb_index_dn(struct ldb_module *module,
853 const struct ldb_parse_tree *tree,
854 const struct ldb_message *index_list,
855 struct dn_list *list)
857 int ret = LDB_ERR_OPERATIONS_ERROR;
859 switch (tree->operation) {
861 ret = ltdb_index_dn_and(module, tree, index_list, list);
865 ret = ltdb_index_dn_or(module, tree, index_list, list);
869 ret = ltdb_index_dn_not(module, tree, index_list, list);
872 case LDB_OP_EQUALITY:
873 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
876 case LDB_OP_SUBSTRING:
881 case LDB_OP_EXTENDED:
882 /* we can't index with fancy bitops yet */
883 ret = LDB_ERR_OPERATIONS_ERROR;
891 filter a candidate dn_list from an indexed search into a set of results
892 extracting just the given attributes
894 static int ltdb_index_filter(const struct dn_list *dn_list,
895 struct ltdb_context *ac,
896 uint32_t *match_count)
898 struct ldb_context *ldb;
899 struct ldb_message *msg;
902 ldb = ldb_module_get_ctx(ac->module);
904 for (i = 0; i < dn_list->count; i++) {
908 msg = ldb_msg_new(ac);
910 return LDB_ERR_OPERATIONS_ERROR;
913 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
916 return LDB_ERR_OPERATIONS_ERROR;
919 ret = ltdb_search_dn1(ac->module, dn, msg);
921 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
922 /* the record has disappeared? yes, this can happen */
927 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
928 /* an internal error */
930 return LDB_ERR_OPERATIONS_ERROR;
933 if (!ldb_match_msg(ldb, msg,
934 ac->tree, ac->base, ac->scope)) {
939 /* filter the attributes that the user wants */
940 ret = ltdb_filter_attrs(msg, ac->attrs);
944 return LDB_ERR_OPERATIONS_ERROR;
947 ret = ldb_module_send_entry(ac->req, msg, NULL);
948 if (ret != LDB_SUCCESS) {
949 ac->request_terminated = true;
960 remove any duplicated entries in a indexed result
962 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
966 if (list->count < 2) {
970 qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
973 for (i=1; i<list->count; i++) {
974 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
975 if (new_count != i) {
976 list->dn[new_count] = list->dn[i];
982 list->count = new_count;
986 search the database with a LDAP-like expression using indexes
987 returns -1 if an indexed search is not possible, in which
988 case the caller should call ltdb_search_full()
990 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
992 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
993 struct dn_list *dn_list;
996 /* see if indexing is enabled */
997 if (!ltdb->cache->attribute_indexes &&
998 !ltdb->cache->one_level_indexes &&
999 ac->scope != LDB_SCOPE_BASE) {
1000 /* fallback to a full search */
1001 return LDB_ERR_OPERATIONS_ERROR;
1004 dn_list = talloc_zero(ac, struct dn_list);
1005 if (dn_list == NULL) {
1006 ldb_module_oom(ac->module);
1007 return LDB_ERR_OPERATIONS_ERROR;
1010 switch (ac->scope) {
1011 case LDB_SCOPE_BASE:
1012 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1013 if (dn_list->dn == NULL) {
1014 ldb_module_oom(ac->module);
1015 talloc_free(dn_list);
1016 return LDB_ERR_OPERATIONS_ERROR;
1018 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1019 if (dn_list->dn[0].data == NULL) {
1020 ldb_module_oom(ac->module);
1021 talloc_free(dn_list);
1022 return LDB_ERR_OPERATIONS_ERROR;
1024 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1028 case LDB_SCOPE_ONELEVEL:
1029 if (!ltdb->cache->one_level_indexes) {
1030 talloc_free(dn_list);
1031 return LDB_ERR_OPERATIONS_ERROR;
1033 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1034 if (ret != LDB_SUCCESS) {
1035 talloc_free(dn_list);
1040 case LDB_SCOPE_SUBTREE:
1041 case LDB_SCOPE_DEFAULT:
1042 if (!ltdb->cache->attribute_indexes) {
1043 talloc_free(dn_list);
1044 return LDB_ERR_OPERATIONS_ERROR;
1046 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1047 if (ret != LDB_SUCCESS) {
1048 talloc_free(dn_list);
1051 ltdb_dn_list_remove_duplicates(dn_list);
1055 ret = ltdb_index_filter(dn_list, ac, match_count);
1056 talloc_free(dn_list);
1061 add an index entry for one message element
1063 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1064 struct ldb_message_element *el, int v_idx)
1066 struct ldb_context *ldb;
1067 struct ldb_dn *dn_key;
1069 const struct ldb_schema_attribute *a;
1070 struct dn_list *list;
1073 ldb = ldb_module_get_ctx(module);
1075 list = talloc_zero(module, struct dn_list);
1077 return LDB_ERR_OPERATIONS_ERROR;
1080 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1083 return LDB_ERR_OPERATIONS_ERROR;
1085 talloc_steal(list, dn_key);
1087 ret = ltdb_dn_list_load(module, dn_key, list);
1088 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1093 if (ltdb_dn_list_find_str(list, dn) != -1) {
1098 if (list->count > 0 &&
1099 a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1101 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1104 /* overallocate the list a bit, to reduce the number of
1105 * realloc trigered copies */
1106 alloc_len = ((list->count+1)+7) & ~7;
1107 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1108 if (list->dn == NULL) {
1110 return LDB_ERR_OPERATIONS_ERROR;
1112 list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1113 list->dn[list->count].length = strlen(dn);
1116 ret = ltdb_dn_list_store(module, dn_key, list);
1124 add index entries for one elements in a message
1126 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1127 struct ldb_message_element *el)
1130 for (i = 0; i < el->num_values; i++) {
1131 int ret = ltdb_index_add1(module, dn, el, i);
1132 if (ret != LDB_SUCCESS) {
1141 add index entries for all elements in a message
1143 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1144 struct ldb_message_element *elements, int num_el)
1146 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1153 if (ltdb->cache->indexlist->num_elements == 0) {
1154 /* no indexed fields */
1158 for (i = 0; i < num_el; i++) {
1160 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1163 ret = ltdb_index_add_el(module, dn, &elements[i]);
1164 if (ret != LDB_SUCCESS) {
1174 insert a one level index for a message
1176 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1178 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1179 struct ldb_message_element el;
1185 /* We index for ONE Level only if requested */
1186 if (!ltdb->cache->one_level_indexes) {
1190 pdn = ldb_dn_get_parent(module, msg->dn);
1192 return LDB_ERR_OPERATIONS_ERROR;
1195 dn = ldb_dn_get_linearized(msg->dn);
1198 return LDB_ERR_OPERATIONS_ERROR;
1201 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1202 if (val.data == NULL) {
1204 return LDB_ERR_OPERATIONS_ERROR;
1207 val.length = strlen((char *)val.data);
1208 el.name = LTDB_IDXONE;
1213 ret = ltdb_index_add1(module, dn, &el, 0);
1214 } else { /* delete */
1215 ret = ltdb_index_del_value(module, dn, &el, 0);
1224 add the index entries for a new element in a record
1225 The caller guarantees that these element values are not yet indexed
1227 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1228 struct ldb_message_element *el)
1230 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1231 if (ldb_dn_is_special(dn)) {
1234 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1237 return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1241 add the index entries for a new record
1243 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1248 if (ldb_dn_is_special(msg->dn)) {
1252 dn = ldb_dn_get_linearized(msg->dn);
1254 return LDB_ERR_OPERATIONS_ERROR;
1257 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1258 if (ret != LDB_SUCCESS) {
1262 return ltdb_index_onelevel(module, msg, 1);
1267 delete an index entry for one message element
1269 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1270 struct ldb_message_element *el, int v_idx)
1272 struct ldb_context *ldb;
1273 struct ldb_dn *dn_key;
1275 struct dn_list *list;
1277 ldb = ldb_module_get_ctx(module);
1283 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1285 return LDB_ERR_OPERATIONS_ERROR;
1288 list = talloc_zero(dn_key, struct dn_list);
1290 talloc_free(dn_key);
1291 return LDB_ERR_OPERATIONS_ERROR;
1294 ret = ltdb_dn_list_load(module, dn_key, list);
1295 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1296 /* it wasn't indexed. Did we have an earlier error? If we did then
1298 talloc_free(dn_key);
1302 if (ret != LDB_SUCCESS) {
1303 talloc_free(dn_key);
1307 i = ltdb_dn_list_find_str(list, dn);
1309 /* nothing to delete */
1310 talloc_free(dn_key);
1314 if (i != list->count-1) {
1315 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1318 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1320 ret = ltdb_dn_list_store(module, dn_key, list);
1322 talloc_free(dn_key);
1328 delete the index entries for a element
1329 return -1 on failure
1331 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1333 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1337 if (!ltdb->cache->attribute_indexes) {
1338 /* no indexed fields */
1346 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1349 for (i = 0; i < el->num_values; i++) {
1350 ret = ltdb_index_del_value(module, dn, el, i);
1351 if (ret != LDB_SUCCESS) {
1360 delete the index entries for a record
1361 return -1 on failure
1363 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1365 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1370 if (ldb_dn_is_special(msg->dn)) {
1374 ret = ltdb_index_onelevel(module, msg, 0);
1375 if (ret != LDB_SUCCESS) {
1379 if (!ltdb->cache->attribute_indexes) {
1380 /* no indexed fields */
1384 dn = ldb_dn_get_linearized(msg->dn);
1386 return LDB_ERR_OPERATIONS_ERROR;
1389 for (i = 0; i < msg->num_elements; i++) {
1390 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1391 if (ret != LDB_SUCCESS) {
1401 traversal function that deletes all @INDEX records
1403 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1405 const char *dn = "DN=" LTDB_INDEX ":";
1406 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1407 return tdb_delete(tdb, key);
1413 traversal function that adds @INDEX records during a re index
1415 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1417 struct ldb_context *ldb;
1418 struct ldb_module *module = (struct ldb_module *)state;
1419 struct ldb_message *msg;
1420 const char *dn = NULL;
1424 ldb = ldb_module_get_ctx(module);
1426 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1427 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1431 msg = ldb_msg_new(module);
1436 ret = ltdb_unpack_data(module, &data, msg);
1438 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1439 ldb_dn_get_linearized(msg->dn));
1444 /* check if the DN key has changed, perhaps due to the
1445 case insensitivity of an element changing */
1446 key2 = ltdb_key(module, msg->dn);
1447 if (key2.dptr == NULL) {
1448 /* probably a corrupt record ... darn */
1449 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1450 ldb_dn_get_linearized(msg->dn));
1454 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1455 tdb_delete(tdb, key);
1456 tdb_store(tdb, key2, data, 0);
1458 talloc_free(key2.dptr);
1460 if (msg->dn == NULL) {
1461 dn = (char *)key.dptr + 3;
1463 dn = ldb_dn_get_linearized(msg->dn);
1466 ret = ltdb_index_onelevel(module, msg, 1);
1467 if (ret != LDB_SUCCESS) {
1468 ldb_debug(ldb, LDB_DEBUG_ERROR,
1469 "Adding special ONE LEVEL index failed (%s)!",
1470 ldb_dn_get_linearized(msg->dn));
1475 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1479 if (ret != LDB_SUCCESS) return -1;
1485 force a complete reindex of the database
1487 int ltdb_reindex(struct ldb_module *module)
1489 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1492 if (ltdb_cache_reload(module) != 0) {
1493 return LDB_ERR_OPERATIONS_ERROR;
1496 /* first traverse the database deleting any @INDEX records */
1497 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1499 return LDB_ERR_OPERATIONS_ERROR;
1502 /* if we don't have indexes we have nothing todo */
1503 if (ltdb->cache->indexlist->num_elements == 0) {
1507 /* now traverse adding any indexes for normal LDB records */
1508 ret = tdb_traverse(ltdb->tdb, re_index, module);
1510 return LDB_ERR_OPERATIONS_ERROR;
1514 ltdb->idxptr->repack = true;