pyldb: avoid segfault when adding an element with no name
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
index 9a4a0db7e74f57ece4694d23401dc29a346ebb1d..9f0de7d260e5e831c521920a6721aaf50d93201f 100644 (file)
@@ -199,7 +199,9 @@ static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
 }
 
 /* enable the idxptr mode when transactions start */
-int ldb_kv_index_transaction_start(struct ldb_module *module)
+int ldb_kv_index_transaction_start(
+       struct ldb_module *module,
+       size_t cache_size)
 {
        struct ldb_kv_private *ldb_kv = talloc_get_type(
            ldb_module_get_private(module), struct ldb_kv_private);
@@ -208,6 +210,16 @@ int ldb_kv_index_transaction_start(struct ldb_module *module)
                return ldb_oom(ldb_module_get_ctx(module));
        }
 
+       ldb_kv->idxptr->itdb = tdb_open(
+               NULL,
+               cache_size,
+               TDB_INTERNAL,
+               O_RDWR,
+               0);
+       if (ldb_kv->idxptr->itdb == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        return LDB_SUCCESS;
 }
 
@@ -394,7 +406,13 @@ normal_index:
                                dn,
                                msg,
                                LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
-                                   LDB_UNPACK_DATA_FLAG_NO_DN);
+                               LDB_UNPACK_DATA_FLAG_NO_DN |
+                               /*
+                                * The entry point ldb_kv_search_indexed is
+                                * only called from the read-locked
+                                * ldb_kv_search.
+                                */
+                               LDB_UNPACK_DATA_FLAG_READ_LOCKED);
        if (ret != LDB_SUCCESS) {
                talloc_free(msg);
                return ret;
@@ -717,14 +735,6 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
                return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
        }
 
-       if (ldb_kv->idxptr->itdb == NULL) {
-               ldb_kv->idxptr->itdb =
-                   tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
-               if (ldb_kv->idxptr->itdb == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-       }
-
        key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
        if (key.dptr == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
@@ -880,6 +890,19 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
        const size_t min_data = 1;
        const size_t min_key_length = additional_key_length
                + indx_len + num_separators + min_data;
+       struct ldb_val empty;
+
+       /*
+        * Accept a NULL value as a request for a key with no value.  This is
+        * different from passing an empty value, which might be given
+        * significance by some canonicalise functions.
+        */
+       bool empty_val = value == NULL;
+       if (empty_val) {
+               empty.length = 0;
+               empty.data = discard_const_p(unsigned char, "");
+               value = ∅
+       }
 
        if (attr[0] == '@') {
                attr_for_dn = attr;
@@ -899,21 +922,33 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
                if (ap) {
                        *ap = a;
                }
-               r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
-               if (r != LDB_SUCCESS) {
-                       const char *errstr = ldb_errstring(ldb);
-                       /* canonicalisation can be refused. For
-                          example, a attribute that takes wildcards
-                          will refuse to canonicalise if the value
-                          contains a wildcard */
-                       ldb_asprintf_errstring(ldb,
-                                              "Failed to create index "
-                                              "key for attribute '%s':%s%s%s",
-                                              attr, ldb_strerror(r),
-                                              (errstr?":":""),
-                                              (errstr?errstr:""));
-                       talloc_free(attr_folded);
-                       return NULL;
+
+               if (empty_val) {
+                       v = *value;
+               } else {
+                       ldb_attr_handler_t fn;
+                       if (a->syntax->index_format_fn) {
+                               fn = a->syntax->index_format_fn;
+                       } else {
+                               fn = a->syntax->canonicalise_fn;
+                       }
+                       r = fn(ldb, ldb, value, &v);
+                       if (r != LDB_SUCCESS) {
+                               const char *errstr = ldb_errstring(ldb);
+                               /* canonicalisation can be refused. For
+                                  example, a attribute that takes wildcards
+                                  will refuse to canonicalise if the value
+                                  contains a wildcard */
+                               ldb_asprintf_errstring(ldb,
+                                                      "Failed to create "
+                                                      "index key for "
+                                                      "attribute '%s':%s%s%s",
+                                                      attr, ldb_strerror(r),
+                                                      (errstr?":":""),
+                                                      (errstr?errstr:""));
+                               talloc_free(attr_folded);
+                               return NULL;
+                       }
                }
        }
        attr_len = strlen(attr_for_dn);
@@ -1030,7 +1065,7 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
                }
        }
 
-       if (v.data != value->data) {
+       if (v.data != value->data && !empty_val) {
                talloc_free(v.data);
        }
        talloc_free(attr_folded);
@@ -1259,6 +1294,14 @@ static bool list_intersect(struct ldb_context *ldb,
                return true;
        }
 
+       /*
+        * In both of the below we check for strict and in that
+        * case do not optimise the intersection of this list,
+        * we must never return an entry not in this
+        * list.  This allows the index for
+        * SCOPE_ONELEVEL to be trusted.
+        */
+
        /* the indexing code is allowed to return a longer list than
           what really matches, as all results are filtered by the
           full expression at the end - this shortcut avoids a lot of
@@ -1603,6 +1646,311 @@ static int ldb_kv_index_dn_and(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+struct ldb_kv_ordered_index_context {
+       struct ldb_module *module;
+       int error;
+       struct dn_list *dn_list;
+};
+
+static int traverse_range_index(struct ldb_kv_private *ldb_kv,
+                               struct ldb_val key,
+                               struct ldb_val data,
+                               void *state)
+{
+
+       struct ldb_context *ldb;
+       struct ldb_kv_ordered_index_context *ctx =
+           (struct ldb_kv_ordered_index_context *)state;
+       struct ldb_module *module = ctx->module;
+       struct ldb_message_element *el = NULL;
+       struct ldb_message *msg = NULL;
+       int version;
+       size_t dn_array_size, additional_length;
+       unsigned int i;
+
+       ldb = ldb_module_get_ctx(module);
+
+       msg = ldb_msg_new(module);
+
+       ctx->error = ldb_unpack_data_only_attr_list_flags(ldb, &data,
+                                                         msg,
+                                                         NULL, 0,
+                                                         LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
+                                                         LDB_UNPACK_DATA_FLAG_NO_DN,
+                                                         NULL);
+
+       if (ctx->error != LDB_SUCCESS) {
+               talloc_free(msg);
+               return ctx->error;
+       }
+
+       el = ldb_msg_find_element(msg, LDB_KV_IDX);
+       if (!el) {
+               talloc_free(msg);
+               return LDB_SUCCESS;
+       }
+
+       version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
+
+       /*
+        * we avoid copying the strings by stealing the list.  We have
+        * to steal msg onto el->values (which looks odd) because we
+        * asked for the memory to be allocated on msg, not on each
+        * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
+        */
+       if (version != LDB_KV_GUID_INDEXING_VERSION) {
+               /* This is quite likely during the DB startup
+                  on first upgrade to using a GUID index */
+               ldb_debug_set(ldb_module_get_ctx(module),
+                             LDB_DEBUG_ERROR, __location__
+                             ": Wrong GUID index version %d expected %d",
+                             version, LDB_KV_GUID_INDEXING_VERSION);
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if (el->num_values == 0) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
+           || el->values[0].length == 0) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       dn_array_size = talloc_array_length(ctx->dn_list->dn);
+
+       additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
+
+       if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
+               size_t new_array_length;
+
+               if (dn_array_size * 2 < dn_array_size) {
+                       talloc_free(msg);
+                       ctx->error = LDB_ERR_OPERATIONS_ERROR;
+                       return ctx->error;
+               }
+
+               new_array_length = MAX(ctx->dn_list->count + additional_length,
+                                      dn_array_size * 2);
+
+               ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
+                                                 ctx->dn_list->dn,
+                                                 struct ldb_val,
+                                                 new_array_length);
+       }
+
+       if (ctx->dn_list->dn == NULL) {
+               talloc_free(msg);
+               ctx->error = LDB_ERR_OPERATIONS_ERROR;
+               return ctx->error;
+       }
+
+       /*
+        * The actual data is on msg, due to
+        * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
+        */
+       talloc_steal(ctx->dn_list->dn, msg);
+       for (i = 0; i < additional_length; i++) {
+               ctx->dn_list->dn[i + ctx->dn_list->count].data
+                       = &el->values[0].data[i * LDB_KV_GUID_SIZE];
+               ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
+
+       }
+
+       ctx->dn_list->count += additional_length;
+
+       talloc_free(msg->elements);
+
+       return LDB_SUCCESS;
+}
+
+/*
+ * >= and <= indexing implemented using lexicographically sorted keys
+ *
+ * We only run this in GUID indexing mode and when there is no write
+ * transaction (only implicit read locks are being held). Otherwise, we would
+ * have to deal with the in-memory index cache.
+ *
+ * We rely on the implementation of index_format_fn on a schema syntax which
+ * will can help us to construct keys which can be ordered correctly, and we
+ * terminate using schema agnostic start and end keys.
+ *
+ * index_format_fn must output values which can be memcmp-able to produce the
+ * correct ordering as defined by the schema syntax class.
+ */
+static int ldb_kv_index_dn_ordered(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list, bool ascending)
+{
+       enum key_truncation truncation = KEY_NOT_TRUNCATED;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
+       struct ldb_val start_key, end_key;
+       struct ldb_dn *key_dn = NULL;
+       const struct ldb_schema_attribute *a = NULL;
+
+       struct ldb_kv_ordered_index_context ctx;
+       int ret;
+
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+
+       if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ldb_kv->cache->GUID_index_attribute == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* bail out if we're in a transaction, full search instead. */
+       if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (ldb_kv->disallow_dn_filter &&
+           (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
+               /* in AD mode we do not support "(dn=...)" search filters */
+               list->dn = NULL;
+               list->count = 0;
+               return LDB_SUCCESS;
+       }
+       if (tree->u.comparison.attr[0] == '@') {
+               /* Do not allow a indexed search against an @ */
+               list->dn = NULL;
+               list->count = 0;
+               return LDB_SUCCESS;
+       }
+
+       a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
+
+       /*
+        * If there's no index format function defined for this attr, then
+        * the lexicographic order in the database doesn't correspond to the
+        * attr's ordering, so we can't use the iterate_range op.
+        */
+       if (a->syntax->index_format_fn == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
+                                 &tree->u.comparison.value,
+                                 NULL, &truncation);
+       if (!key_dn) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else if (truncation == KEY_TRUNCATED) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         __location__
+                         ": ordered index violation: key dn truncated: %s\n",
+                         ldb_dn_get_linearized(key_dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_key = ldb_kv_key_dn(module, tmp_ctx, key_dn);
+       talloc_free(key_dn);
+       if (ldb_key.data == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
+                                 NULL, NULL, &truncation);
+       if (!key_dn) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else if (truncation == KEY_TRUNCATED) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         __location__
+                         ": ordered index violation: key dn truncated: %s\n",
+                         ldb_dn_get_linearized(key_dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ldb_key2 = ldb_kv_key_dn(module, tmp_ctx, key_dn);
+       talloc_free(key_dn);
+       if (ldb_key2.data == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * In order to avoid defining a start and end key for the search, we
+        * notice that each index key is of the form:
+        *
+        *     DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
+        *
+        * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
+        * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
+        * particular attribute.
+        *
+        * Our LMDB backend uses the default memcmp for key comparison.
+        */
+
+       /* Eliminate NUL byte at the end of the empty key */
+       ldb_key2.length--;
+
+       if (ascending) {
+               /* : becomes ; for pseudo end-key */
+               ldb_key2.data[ldb_key2.length-1]++;
+               start_key = ldb_key;
+               end_key = ldb_key2;
+       } else {
+               start_key = ldb_key2;
+               end_key = ldb_key;
+       }
+
+       ctx.module = module;
+       ctx.error = 0;
+       ctx.dn_list = list;
+       ctx.dn_list->count = 0;
+       ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
+
+       ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
+                                           traverse_range_index, &ctx);
+
+       if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
+                      ldb_val_equal_exact_for_qsort);
+
+       talloc_free(tmp_ctx);
+
+       return LDB_SUCCESS;
+}
+
+static int ldb_kv_index_dn_greater(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list)
+{
+       return ldb_kv_index_dn_ordered(module,
+                                      ldb_kv,
+                                      tree,
+                                      list, true);
+}
+
+static int ldb_kv_index_dn_less(struct ldb_module *module,
+                                  struct ldb_kv_private *ldb_kv,
+                                  const struct ldb_parse_tree *tree,
+                                  struct dn_list *list)
+{
+       return ldb_kv_index_dn_ordered(module,
+                                      ldb_kv,
+                                      tree,
+                                      list, false);
+}
+
 /*
   return a list of matching objects using a one-level index
  */
@@ -1660,7 +2008,13 @@ static int ldb_kv_index_dn_one(struct ldb_module *module,
                               struct dn_list *list,
                               enum key_truncation *truncation)
 {
-       /* Ensure we do not shortcut on intersection for this list */
+       /*
+        * Ensure we do not shortcut on intersection for this list.
+        * We must never be lazy and return an entry not in this
+        * list.  This allows the index for
+        * SCOPE_ONELEVEL to be trusted.
+        */
+
        list->strict = true;
        return ldb_kv_index_dn_attr(
            module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
@@ -1742,9 +2096,15 @@ static int ldb_kv_index_dn(struct ldb_module *module,
                ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
                break;
 
-       case LDB_OP_SUBSTRING:
        case LDB_OP_GREATER:
+               ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
+               break;
+
        case LDB_OP_LESS:
+               ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
+               break;
+
+       case LDB_OP_SUBSTRING:
        case LDB_OP_PRESENT:
        case LDB_OP_APPROX:
        case LDB_OP_EXTENDED:
@@ -1868,7 +2228,13 @@ static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
                                      keys[i],
                                      msg,
                                      LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
-                                         LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
+                                     LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
+                                     /*
+                                      * The entry point ldb_kv_search_indexed is
+                                      * only called from the read-locked
+                                      * ldb_kv_search.
+                                      */
+                                     LDB_UNPACK_DATA_FLAG_READ_LOCKED);
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /*
                         * the record has disappeared? yes, this can
@@ -2304,7 +2670,7 @@ static int ldb_kv_index_add1(struct ldb_module *module,
                        ldb_debug(ldb, LDB_DEBUG_WARNING,
                                  __location__
                                  ": unique index violation on %s in %s, "
-                                 "conficts with %*.*s in %s",
+                                 "conflicts with %*.*s in %s",
                                  el->name, ldb_dn_get_linearized(msg->dn),
                                  (int)list->dn[0].length,
                                  (int)list->dn[0].length,
@@ -2323,7 +2689,7 @@ static int ldb_kv_index_add1(struct ldb_module *module,
                                          LDB_DEBUG_WARNING,
                                          __location__
                                          ": unique index violation on %s in "
-                                         "%s, conficts with %s %*.*s in %s",
+                                         "%s, conflicts with %s %*.*s in %s",
                                          el->name,
                                          ldb_dn_get_linearized(msg->dn),
                                          ldb_kv->cache->GUID_index_attribute,
@@ -2911,12 +3277,7 @@ static int re_key(struct ldb_kv_private *ldb_kv,
 
        ldb = ldb_module_get_ctx(module);
 
-       if (key.length > 4 &&
-           memcmp(key.data, "DN=@", 4) == 0) {
-               return 0;
-       }
-
-       is_record = ldb_kv_key_is_record(key);
+       is_record = ldb_kv_key_is_normal_record(key);
        if (is_record == false) {
                return 0;
        }
@@ -2998,12 +3359,7 @@ static int re_index(struct ldb_kv_private *ldb_kv,
 
        ldb = ldb_module_get_ctx(module);
 
-       if (key.length > 4 &&
-           memcmp(key.data, "DN=@", 4) == 0) {
-               return 0;
-       }
-
-       is_record = ldb_kv_key_is_record(key);
+       is_record = ldb_kv_key_is_normal_record(key);
        if (is_record == false) {
                return 0;
        }
@@ -3074,6 +3430,7 @@ int ldb_kv_reindex(struct ldb_module *module)
            ldb_module_get_private(module), struct ldb_kv_private);
        int ret;
        struct ldb_kv_reindex_context ctx;
+       size_t index_cache_size = 0;
 
        /*
         * Only triggered after a modification, but make clear we do
@@ -3094,7 +3451,16 @@ int ldb_kv_reindex(struct ldb_module *module)
         */
        ldb_kv_index_transaction_cancel(module);
 
-       ret = ldb_kv_index_transaction_start(module);
+       /*
+        * Calculate the size of the index cache that we'll need for
+        * the re-index
+        */
+       index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
+       if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
+               index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
+       }
+
+       ret = ldb_kv_index_transaction_start(module, index_cache_size);
        if (ret != LDB_SUCCESS) {
                return ret;
        }