ldb: <= and >= indexed searching
authorAaron Haslett <aaronhaslett@catalyst.net.nz>
Mon, 4 Mar 2019 06:06:31 +0000 (19:06 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 8 Apr 2019 02:07:22 +0000 (02:07 +0000)
Full implementation of <= and >= indexed searching using iterate_range
backend operation.  Adds index_format_fn to ldb_schema_syntax so
requires an ABI version bump.  The function must be provided for any
type for which <= and >= indexing is required, and must return a
lexicographically ordered canonicalization of a value.  This causes
index entries to be written in correct order to the database, so
iterate_range on the index DNs can be used.

ldb_kv_index_key is modified to return an index DN with attribute name
but without value if an empty value is provided.  This is needed for
constructing keys that match the beginning or end of an index DN range.

Pair-programmed-with: Garming Sam <garming@catalyst.net.nz>

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/include/ldb.h
lib/ldb/ldb_key_value/ldb_kv_index.c

index 81bee934da587bf6a02689cfbffa868f747478b4..267b7af5a88880b5b44c94d771cc7d49df5bbad0 100644 (file)
@@ -376,7 +376,10 @@ typedef int (*ldb_attr_operator_t)(struct ldb_context *, enum ldb_parse_op opera
   ldif_read_fn         -> convert from ldif to binary format
   ldif_write_fn                -> convert from binary to ldif format
   canonicalise_fn      -> canonicalise a value, for use by indexing and dn construction
+  index_form_fn                -> get lexicographically sorted format for index
   comparison_fn                -> compare two values
+  operator_fn          -> override function for optimizing out unnecessary
+                               calls to canonicalise_fn and comparison_fn
 */
 
 struct ldb_schema_syntax {
@@ -384,6 +387,7 @@ struct ldb_schema_syntax {
        ldb_attr_handler_t ldif_read_fn;
        ldb_attr_handler_t ldif_write_fn;
        ldb_attr_handler_t canonicalise_fn;
+       ldb_attr_handler_t index_format_fn;
        ldb_attr_comparison_t comparison_fn;
        ldb_attr_operator_t operator_fn;
 };
index 4101d4b986d6fd87c9b3f23891c1dfc3781c6b4e..cad8fdc90bfe800299ada1134bd4c7046f3e729c 100644 (file)
@@ -884,6 +884,19 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
        const size_t min_data = 1;
        const size_t min_key_length = additional_key_length
                + indx_len + num_separators + min_data;
+       struct ldb_val empty;
+
+       /*
+        * Accept a NULL value as a request for a key with no value.  This is
+        * different from passing an empty value, which might be given
+        * significance by some canonicalise functions.
+        */
+       bool empty_val = value == NULL;
+       if (empty_val) {
+               empty.length = 0;
+               empty.data = discard_const_p(unsigned char, "");
+               value = &empty;
+       }
 
        if (attr[0] == '@') {
                attr_for_dn = attr;
@@ -903,21 +916,33 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
                if (ap) {
                        *ap = a;
                }
-               r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
-               if (r != LDB_SUCCESS) {
-                       const char *errstr = ldb_errstring(ldb);
-                       /* canonicalisation can be refused. For
-                          example, a attribute that takes wildcards
-                          will refuse to canonicalise if the value
-                          contains a wildcard */
-                       ldb_asprintf_errstring(ldb,
-                                              "Failed to create index "
-                                              "key for attribute '%s':%s%s%s",
-                                              attr, ldb_strerror(r),
-                                              (errstr?":":""),
-                                              (errstr?errstr:""));
-                       talloc_free(attr_folded);
-                       return NULL;
+
+               if (empty_val) {
+                       v = *value;
+               } else {
+                       ldb_attr_handler_t fn;
+                       if (a->syntax->index_format_fn) {
+                               fn = a->syntax->index_format_fn;
+                       } else {
+                               fn = a->syntax->canonicalise_fn;
+                       }
+                       r = fn(ldb, ldb, value, &v);
+                       if (r != LDB_SUCCESS) {
+                               const char *errstr = ldb_errstring(ldb);
+                               /* canonicalisation can be refused. For
+                                  example, a attribute that takes wildcards
+                                  will refuse to canonicalise if the value
+                                  contains a wildcard */
+                               ldb_asprintf_errstring(ldb,
+                                                      "Failed to create "
+                                                      "index key for "
+                                                      "attribute '%s':%s%s%s",
+                                                      attr, ldb_strerror(r),
+                                                      (errstr?":":""),
+                                                      (errstr?errstr:""));
+                               talloc_free(attr_folded);
+                               return NULL;
+                       }
                }
        }
        attr_len = strlen(attr_for_dn);
@@ -1034,7 +1059,7 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
                }
        }
 
-       if (v.data != value->data) {
+       if (v.data != value->data && !empty_val) {
                talloc_free(v.data);
        }
        talloc_free(attr_folded);
@@ -1615,6 +1640,280 @@ static int ldb_kv_index_dn_and(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+struct ldb_kv_ordered_index_context {
+       struct ldb_module *module;
+       int error;
+       struct dn_list *dn_list;
+};
+
+static int traverse_range_index(struct ldb_kv_private *ldb_kv,
+                               struct ldb_val key,
+                               struct ldb_val data,
+                               void *state)
+{
+
+       struct ldb_context *ldb;
+       struct ldb_kv_ordered_index_context *ctx =
+           (struct ldb_kv_ordered_index_context *)state;
+       struct ldb_module *module = ctx->module;
+       struct ldb_message_element *el = NULL;
+       struct ldb_message *msg = NULL;
+       int version;
+       size_t dn_array_size, additional_length;
+       unsigned int i;
+
+       ldb = ldb_module_get_ctx(module);
+
+       msg = ldb_msg_new(module);
+
+       ctx->error = ldb_unpack_data_only_attr_list_flags(ldb, &data,
+                                                         msg,
+                                                         NULL, 0,
+                                                         LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
+                                                         LDB_UNPACK_DATA_FLAG_NO_DN,
+                                                         NULL);
+
+       if (ctx->error != LDB_SUCCESS) {
+               talloc_free(msg);
+               return ctx->error;
+       }
+
+       el = ldb_msg_find_element(msg, LDB_KV_IDX);
+       if (!el) {
+               talloc_free(msg);
+               return LDB_SUCCESS;
+       }
+
+       version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
+
+       /*
+        * we avoid copying the strings by stealing the list.  We have
+        * to steal msg onto el->values (which looks odd) because we
+        * asked for the memory to be allocated on msg, not on each
+        * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
+        */
+       if (version != LDB_KV_GUID_INDEXING_VERSION) {
+               /* This is quite likely during the DB startup
+                  on first upgrade to using a GUID index */
+               ldb_debug_set(ldb_module_get_ctx(module),
+                             LDB_DEBUG_ERROR, __location__
+                             ": Wrong GUID index version %d expected %d",
+                             version, LDB_KV_GUID_INDEXING_VERSION);
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if (el->num_values == 0) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
+           || el->values[0].length == 0) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       dn_array_size = talloc_array_length(ctx->dn_list->dn);
+
+       additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
+
+       if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
+               size_t new_array_length;
+
+               if (dn_array_size * 2 < dn_array_size) {
+                       talloc_free(msg);
+                       ctx->error = LDB_ERR_OPERATIONS_ERROR;
+                       return ctx->error;
+               }
+
+               new_array_length = MAX(ctx->dn_list->count + additional_length,
+                                      dn_array_size * 2);
+
+               ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
+                                                 ctx->dn_list->dn,
+                                                 struct ldb_val,
+                                                 new_array_length);
+       }
+
+       if (ctx->dn_list->dn == NULL) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       /*
+        * The actual data is on msg, due to
+        * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
+        */
+       talloc_steal(ctx->dn_list->dn, msg);
+       for (i = 0; i < additional_length; i++) {
+               ctx->dn_list->dn[i + ctx->dn_list->count].data
+                       = &el->values[0].data[i * LDB_KV_GUID_SIZE];
+               ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
+
+       }
+
+       ctx->dn_list->count += additional_length;
+
+       talloc_free(msg->elements);
+
+       return LDB_SUCCESS;
+}
+
+static int ldb_kv_index_dn_ordered(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list, bool ascending)
+{
+       enum key_truncation truncation = KEY_NOT_TRUNCATED;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
+       struct ldb_val start_key, end_key;
+       struct ldb_dn *key_dn = NULL;
+       const struct ldb_schema_attribute *a = NULL;
+
+       struct ldb_kv_ordered_index_context ctx;
+       int ret;
+
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+
+       if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ldb_kv->cache->GUID_index_attribute == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* bail out if we're in a transaction, full search instead. */
+       if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ldb_kv->disallow_dn_filter &&
+           (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
+               /* in AD mode we do not support "(dn=...)" search filters */
+               list->dn = NULL;
+               list->count = 0;
+               return LDB_SUCCESS;
+       }
+       if (tree->u.comparison.attr[0] == '@') {
+               /* Do not allow a indexed search against an @ */
+               list->dn = NULL;
+               list->count = 0;
+               return LDB_SUCCESS;
+       }
+
+       a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
+
+       /*
+        * If there's no index format function defined for this attr, then
+        * the lexicographic order in the database doesn't correspond to the
+        * attr's ordering, so we can't use the iterate_range op.
+        */
+       if (a->syntax->index_format_fn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
+                                 &tree->u.comparison.value,
+                                 NULL, &truncation);
+       if (!key_dn) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else if (truncation == KEY_TRUNCATED) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         __location__
+                         ": ordered index violation: key dn truncated: %s\n",
+                         ldb_dn_get_linearized(key_dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_key = ldb_kv_key_dn(module, tmp_ctx, key_dn);
+       talloc_free(key_dn);
+       if (ldb_key.data == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
+                                 NULL, NULL, &truncation);
+       if (!key_dn) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else if (truncation == KEY_TRUNCATED) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         __location__
+                         ": ordered index violation: key dn truncated: %s\n",
+                         ldb_dn_get_linearized(key_dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ldb_key2 = ldb_kv_key_dn(module, tmp_ctx, key_dn);
+       talloc_free(key_dn);
+       if (ldb_key2.data == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ascending) {
+               ldb_key2.data[ldb_key2.length-2]++;
+               start_key = ldb_key;
+               end_key = ldb_key2;
+       } else {
+               start_key = ldb_key2;
+               end_key = ldb_key;
+       }
+
+       ctx.module = module;
+       ctx.error = 0;
+       ctx.dn_list = list;
+       ctx.dn_list->count = 0;
+       ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
+
+       ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
+                                           traverse_range_index, &ctx);
+
+       if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
+                      ldb_val_equal_exact_for_qsort);
+
+       talloc_free(tmp_ctx);
+
+       return LDB_SUCCESS;
+}
+
+static int ldb_kv_index_dn_greater(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list)
+{
+       return ldb_kv_index_dn_ordered(module,
+                                      ldb_kv,
+                                      tree,
+                                      list, true);
+}
+
+static int ldb_kv_index_dn_less(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list)
+{
+       return ldb_kv_index_dn_ordered(module,
+                                      ldb_kv,
+                                      tree,
+                                      list, false);
+}
+
 /*
   return a list of matching objects using a one-level index
  */
@@ -1760,9 +2059,15 @@ static int ldb_kv_index_dn(struct ldb_module *module,
                ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
                break;
 
-       case LDB_OP_SUBSTRING:
        case LDB_OP_GREATER:
+               ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
+               break;
+
        case LDB_OP_LESS:
+               ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
+               break;
+
+       case LDB_OP_SUBSTRING:
        case LDB_OP_PRESENT:
        case LDB_OP_APPROX:
        case LDB_OP_EXTENDED: