Revert "s4:ldb Fix ldb_list_find() folowing the change from char * to TDB_DATA"
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index c99c2936d850236720c070fcfe0435b26f0581e7..b959471d163584598243d92b4d53d07e5c858fa9 100644 (file)
@@ -426,7 +426,8 @@ struct dn_list {
   caller frees
 */
 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
-                                    const char *attr, const struct ldb_val *value)
+                                    const char *attr, const struct ldb_val *value,
+                                    const struct ldb_schema_attribute **ap)
 {
        struct ldb_dn *ret;
        struct ldb_val v;
@@ -440,6 +441,9 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
        }
 
        a = ldb_schema_attribute_by_name(ldb, attr);
+       if (ap) {
+               *ap = a;
+       }
        r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
        if (r != LDB_SUCCESS) {
                const char *errstr = ldb_errstring(ldb);
@@ -451,7 +455,7 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
                talloc_free(attr_folded);
                return NULL;
        }
-       if (ldb_should_b64_encode(&v)) {
+       if (ldb_should_b64_encode(ldb, &v)) {
                char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
                if (!vstr) return NULL;
                ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
@@ -531,7 +535,7 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
 
        /* the attribute is indexed. Pull the list of DNs that match the 
           search criterion */
-       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
+       dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
        if (!dn) return LDB_ERR_OPERATIONS_ERROR;
 
        msg = talloc(list, struct ldb_message);
@@ -792,6 +796,18 @@ static int ltdb_index_dn_not(struct ldb_module *module,
        return LDB_ERR_OPERATIONS_ERROR;
 }
 
+
+static bool ltdb_index_unique(struct ldb_context *ldb,
+                             const char *attr)
+{
+       const struct ldb_schema_attribute *a;
+       a = ldb_schema_attribute_by_name(ldb, attr);
+       if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return true;
+       }
+       return false;
+}
+
 /*
   AND two index results
  */
@@ -802,7 +818,7 @@ static int ltdb_index_dn_and(struct ldb_module *module,
 {
        struct ldb_context *ldb;
        unsigned int i;
-       int ret;
+       int ret, pass;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -810,52 +826,71 @@ static int ltdb_index_dn_and(struct ldb_module *module,
        list->dn = NULL;
        list->count = 0;
 
-       for (i=0;i<tree->u.list.num_elements;i++) {
-               struct dn_list *list2;
-               int v;
-
-               list2 = talloc(module, struct dn_list);
-               if (list2 == NULL) {
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
-               v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
-
-               if (v == LDB_ERR_NO_SUCH_OBJECT) {
-                       /* 0 && X == 0 */
-                       talloc_free(list->dn);
-                       talloc_free(list2);
-                       return LDB_ERR_NO_SUCH_OBJECT;
-               }
-
-               if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
-                       talloc_free(list2);
-                       continue;
-               }
-
-               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-                       ret = LDB_SUCCESS;
-                       talloc_free(list->dn);
-                       list->dn = talloc_move(list, &list2->dn);
-                       list->count = list2->count;
-               } else {
-                       if (list_intersect(ldb, list, list2) == -1) {
-                               talloc_free(list2);
+       for (pass=0;pass<=1;pass++) {
+               /* in the first pass we only look for unique simple
+                  equality tests, in the hope of avoiding having to look
+                  at any others */
+               bool only_unique = pass==0?true:false;
+
+               for (i=0;i<tree->u.list.num_elements;i++) {
+                       struct dn_list *list2;
+                       int v;
+                       bool is_unique = false;
+                       const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
+
+                       if (subtree->operation == LDB_OP_EQUALITY &&
+                           ltdb_index_unique(ldb, subtree->u.equality.attr)) {
+                               is_unique = true;
+                       }
+                       if (is_unique != only_unique) continue;
+                       
+                       list2 = talloc(module, struct dn_list);
+                       if (list2 == NULL) {
                                return LDB_ERR_OPERATIONS_ERROR;
                        }
-               }
+                       
+                       v = ltdb_index_dn(module, subtree, index_list, list2);
 
-               talloc_free(list2);
-
-               if (list->count == 0) {
-                       talloc_free(list->dn);
-                       return LDB_ERR_NO_SUCH_OBJECT;
+                       if (v == LDB_ERR_NO_SUCH_OBJECT) {
+                               /* 0 && X == 0 */
+                               talloc_free(list->dn);
+                               talloc_free(list2);
+                               return LDB_ERR_NO_SUCH_OBJECT;
+                       }
+                       
+                       if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
+                               talloc_free(list2);
+                               continue;
+                       }
+                       
+                       if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
+                               ret = LDB_SUCCESS;
+                               talloc_free(list->dn);
+                               list->dn = talloc_move(list, &list2->dn);
+                               list->count = list2->count;
+                       } else {
+                               if (list_intersect(ldb, list, list2) == -1) {
+                                       talloc_free(list2);
+                                       return LDB_ERR_OPERATIONS_ERROR;
+                               }
+                       }
+                       
+                       talloc_free(list2);
+                       
+                       if (list->count == 0) {
+                               talloc_free(list->dn);
+                               return LDB_ERR_NO_SUCH_OBJECT;
+                       }
+                       
+                       if (list->count == 1) {
+                               /* it isn't worth loading the next part of the tree */
+                               return ret;
+                       }
                }
-       }
-
+       }       
        return ret;
 }
-
+       
 /*
   AND index results and ONE level special index
  */
@@ -882,7 +917,7 @@ static int ltdb_index_dn_one(struct ldb_module *module,
           search criterion */
        val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
        val.length = strlen((char *)val.data);
-       key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
+       key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
        if (!key) {
                talloc_free(list2);
                return LDB_ERR_OPERATIONS_ERROR;
@@ -1002,7 +1037,8 @@ static int ltdb_index_dn(struct ldb_module *module,
   extracting just the given attributes
 */
 static int ltdb_index_filter(const struct dn_list *dn_list,
-                            struct ltdb_context *ac)
+                            struct ltdb_context *ac, 
+                            uint32_t *match_count)
 {
        struct ldb_context *ldb;
        struct ldb_message *msg;
@@ -1058,6 +1094,8 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        ac->request_terminated = true;
                        return ret;
                }
+
+               (*match_count)++;
        }
 
        return LDB_SUCCESS;
@@ -1068,7 +1106,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
   returns -1 if an indexed search is not possible, in which
   case the caller should call ltdb_search_full()
 */
-int ltdb_search_indexed(struct ltdb_context *ac)
+int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
 {
        struct ldb_context *ldb;
        void *data = ldb_module_get_private(ac->module);
@@ -1121,21 +1159,17 @@ int ltdb_search_indexed(struct ltdb_context *ac)
 
        if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
                ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
-
-               if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
-                       talloc_free(dn_list);
-                       return ret;
-               }
        }
 
-       if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
+       if (ret == LDB_ERR_OPERATIONS_ERROR &&
+           ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
                ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
        }
 
        if (ret == LDB_SUCCESS) {
                /* we've got a candidate list - now filter by the full tree
                   and extract the needed attributes */
-               ret = ltdb_index_filter(dn_list, ac);
+               ret = ltdb_index_filter(dn_list, ac, match_count);
        }
 
        talloc_free(dn_list);
@@ -1185,7 +1219,8 @@ static int ltdb_index_add1_new(struct ldb_context *ldb,
 static int ltdb_index_add1_add(struct ldb_context *ldb,
                               struct ldb_message *msg,
                               int idx,
-                              const char *dn)
+                              const char *dn,
+                              const struct ldb_schema_attribute *a)
 {
        struct ldb_val *v2;
        unsigned int i;
@@ -1197,6 +1232,10 @@ static int ltdb_index_add1_add(struct ldb_context *ldb,
                }
        }
 
+       if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
+               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+       }
+
        v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
                              struct ldb_val,
                              msg->elements[idx].num_values+1);
@@ -1223,6 +1262,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
        struct ldb_dn *dn_key;
        int ret;
        unsigned int i;
+       const struct ldb_schema_attribute *a;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -1232,7 +1272,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
        if (!dn_key) {
                talloc_free(msg);
                return LDB_ERR_OPERATIONS_ERROR;
@@ -1260,7 +1300,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
        if (i == msg->num_elements) {
                ret = ltdb_index_add1_new(ldb, msg, dn);
        } else {
-               ret = ltdb_index_add1_add(ldb, msg, i, dn);
+               ret = ltdb_index_add1_add(ldb, msg, i, dn, a);
        }
 
        if (ret == LDB_SUCCESS) {
@@ -1343,7 +1383,7 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                return LDB_SUCCESS;
        }
 
-       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
+       dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
        if (!dn_key) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -1370,14 +1410,14 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
        i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
        if (i == -1) {
                struct ldb_ldif ldif;
-
-               ldb_debug(ldb, LDB_DEBUG_ERROR,
-                               "ERROR: dn %s not found in %s\n", dn,
-                               ldb_dn_get_linearized(dn_key));
+               char *ldif_string;
                ldif.changetype = LDB_CHANGETYPE_NONE;
                ldif.msg = msg;
-               ldb_ldif_write_file(ldb, stdout, &ldif);
-               sleep(100);
+               ldif_string = ldb_ldif_write_string(ldb, NULL, &ldif);
+               ldb_debug(ldb, LDB_DEBUG_ERROR,
+                         "ERROR: dn %s not found in %s", dn,
+                         ldif_string);
+               talloc_free(ldif_string);
                /* it ain't there. hmmm */
                talloc_free(dn_key);
                return LDB_SUCCESS;
@@ -1459,6 +1499,10 @@ int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int
        const char *dn;
        int ret;
 
+       if (ldb_dn_is_special(msg->dn)) {
+               return LDB_SUCCESS;
+       }
+
        /* We index for ONE Level only if requested */
        ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
        if (ret != 0) {
@@ -1537,6 +1581,8 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
 
        ret = ltdb_unpack_data(module, &data, msg);
        if (ret != 0) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
+                         ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return -1;
        }
@@ -1546,7 +1592,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
        key2 = ltdb_key(module, msg->dn);
        if (key2.dptr == NULL) {
                /* probably a corrupt record ... darn */
-               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
                                                        ldb_dn_get_linearized(msg->dn));
                talloc_free(msg);
                return 0;
@@ -1568,7 +1614,7 @@ static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *
                ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
        } else {
                ldb_debug(ldb, LDB_DEBUG_ERROR,
-                       "Adding special ONE LEVEL index failed (%s)!\n",
+                       "Adding special ONE LEVEL index failed (%s)!",
                        ldb_dn_get_linearized(msg->dn));
        }