an experimental patch for fixing ldb bloat
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
index eedbda41706c656bd678dd9d6fe9be5d4e168a45..de0e9a49d6cbe0c10a933b7e7668c2d166a0de29 100644 (file)
 
 #include "ldb_tdb.h"
 
+/*
+  the idxptr code is a bit unusual. The way it works is to replace
+  @IDX elements in records during a transaction with @IDXPTR
+  elements. The @IDXPTR elements don't contain the actual index entry
+  values, but contain a pointer to a linked list of values. 
+
+  This means we are storing pointers in a database, which is normally
+  not allowed, but in this case we are storing them only for the
+  duration of a transaction, and re-writing them into the normal @IDX
+  format at the end of the transaction. That means no other processes
+  are ever exposed to the @IDXPTR values.
+
+  The advantage is that the linked list doesn't cause huge
+  fragmentation during a transaction. Without the @IDXPTR method we
+  often ended up with a ldb that was between 10x and 100x larger then
+  it needs to be due to massive fragmentation caused by re-writing
+  @INDEX records many times during indexing.
+ */
+struct ldb_index_pointer {
+       struct ldb_index_pointer *next, *prev;
+       struct ldb_val value;
+};
+
+struct ltdb_idxptr {
+       int num_dns;
+       const char **dn_list;
+       bool repack;
+};
+
+/*
+  add to the list of DNs that need to be fixed on transaction end
+ */
+static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
+{
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+       ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
+                                              const char *, ltdb->idxptr->num_dns+1);
+       if (ltdb->idxptr->dn_list == NULL) {
+               ltdb->idxptr->num_dns = 0;
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] = 
+               talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
+       if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ltdb->idxptr->num_dns++;
+       return LDB_SUCCESS;
+}
+
+/* free an idxptr record */
+static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+{
+       struct ldb_val val;
+       struct ldb_index_pointer *ptr;
+
+       if (el->num_values != 1) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val = el->values[0];
+       if (val.length != sizeof(void *)) {
+               return LDB_ERR_OPERATIONS_ERROR;                                
+       }
+                       
+       ptr = *(struct ldb_index_pointer **)val.data;
+       if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+
+       while (ptr) {
+               struct ldb_index_pointer *tmp = ptr;
+               DLIST_REMOVE(ptr, ptr);
+               talloc_free(tmp);
+       }
+
+       return 0;
+}
+
+
+/* convert from the IDXPTR format to a ldb_message_element format */
+static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+{
+       struct ldb_val val;
+       struct ldb_index_pointer *ptr, *tmp;
+       int i;
+       struct ldb_val *val2;
+
+       if (el->num_values != 1) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       val = el->values[0];
+       if (val.length != sizeof(void *)) {
+               return LDB_ERR_OPERATIONS_ERROR;                                
+       }
+                       
+       ptr = *(struct ldb_index_pointer **)val.data;
+       if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+
+       /* count the length of the list */
+       for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
+               i++;
+       }
+
+       /* allocate the new values array */
+       val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
+       if (val2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+       el->values = val2;
+       el->num_values = i;
+
+       /* populate the values array */
+       for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
+               el->values[i].length = tmp->value.length;
+               /* we need to over-allocate here as there are still some places
+                  in ldb that rely on null termination. */
+               el->values[i].data = talloc_size(el->values, tmp->value.length+1);
+               if (el->values[i].data == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
+               el->values[i].data[tmp->value.length] = 0;
+       }       
+
+       /* update the name */
+       el->name = LTDB_IDX;
+       
+               return LDB_SUCCESS;
+}
+
+
+/* convert to the IDXPTR format from a ldb_message_element format */
+static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
+{
+       struct ldb_index_pointer *ptr, *tmp;
+       int i;
+       struct ldb_val *val2;
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+
+       ptr = NULL;
+
+       for (i=0;i<el->num_values;i++) {
+               tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
+               if (tmp == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;                
+               }
+               tmp->value = el->values[i];
+               tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
+               if (tmp->value.data == NULL) {
+                       return LDB_ERR_OPERATIONS_ERROR;                
+               }
+               DLIST_ADD(ptr, tmp);
+       }
+
+       /* allocate the new values array */
+       val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
+       if (val2 == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+       el->values = val2;
+       el->num_values = 1;
+
+       el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
+       el->values[0].length = sizeof(ptr);
+
+       /* update the name */
+       el->name = LTDB_IDXPTR;
+
+               return LDB_SUCCESS;     
+}
+
+
+/* enable the idxptr mode when transactions start */
+int ltdb_index_transaction_start(struct ldb_module *module)
+{
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+       ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
+       return 0;
+}
+
+/*
+  a wrapper around ltdb_search_dn1() which translates pointer based index records
+  and maps them into normal ldb message structures
+ */
+static int ltdb_search_dn1_wrap(struct ldb_module *module, 
+                               struct ldb_dn *dn, struct ldb_message *msg)
+{
+       int ret, i;
+       ret = ltdb_search_dn1(module, dn, msg);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /* if this isn't a @INDEX record then don't munge it */
+       if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
+               return ret;
+       }
+
+       for (i=0;i<msg->num_elements;i++) {
+               struct ldb_message_element *el = &msg->elements[i];
+               if (strcmp(el->name, LTDB_IDXPTR) == 0) {
+                       ret = ltdb_convert_from_idxptr(module, el);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
+                       }
+               }
+       }
+
+       return ret;
+}
+
+
+
+/*
+  fixup the idxptr for one DN
+ */
+static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
+{
+       struct ldb_dn *dn;
+       struct ldb_message *msg = ldb_msg_new(module);
+       int ret;
+       
+       dn = ldb_dn_new(msg, module->ldb, strdn);
+       if (ltdb_search_dn1_wrap(module, dn, msg) == LDB_SUCCESS) {
+               ret = ltdb_store(module, msg, TDB_REPLACE);
+       }
+       talloc_free(msg);
+       return ret;
+}
+
+/* cleanup the idxptr mode when transaction commits */
+int ltdb_index_transaction_commit(struct ldb_module *module)
+{
+       int i;
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+
+       /* fix all the DNs that we have modified */
+       if (ltdb->idxptr) {
+               for (i=0;i<ltdb->idxptr->num_dns;i++) {
+                       ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
+               }
+
+               if (ltdb->idxptr->repack) {
+                       tdb_repack(ltdb->tdb);
+               }
+       }
+
+       talloc_free(ltdb->idxptr);
+       ltdb->idxptr = NULL;
+       return 0;
+}
+
+/* cleanup the idxptr mode when transaction cancels */
+int ltdb_index_transaction_cancel(struct ldb_module *module)
+{
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+       talloc_free(ltdb->idxptr);
+       ltdb->idxptr = NULL;
+       return 0;
+}
+
+                       
+
+/* a wrapper around ltdb_store() for the index code which 
+   stores in IDXPTR format when idxptr mode is enabled 
+
+   WARNING: This modifies the msg which is passed in
+*/
+int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
+{
+       struct ltdb_private *ltdb =
+               talloc_get_type(module->private_data, struct ltdb_private);
+       int ret;
+
+       if (ltdb->idxptr) {
+               int i;
+               struct ldb_message *msg2 = ldb_msg_new(module);
+
+               /* free any old pointer */
+               ret = ltdb_search_dn1(module, msg->dn, msg2);
+               if (ret == 0) {
+                       for (i=0;i<msg2->num_elements;i++) {
+                               struct ldb_message_element *el = &msg2->elements[i];
+                               if (strcmp(el->name, LTDB_IDXPTR) == 0) {
+                                       ret = ltdb_free_idxptr(module, el);
+                                       if (ret != LDB_SUCCESS) {
+                                               return ret;
+                                       }
+                               }
+                       }
+               }
+               talloc_free(msg2);
+
+               for (i=0;i<msg->num_elements;i++) {
+                       struct ldb_message_element *el = &msg->elements[i];
+                       if (strcmp(el->name, LTDB_IDX) == 0) {
+                               ret = ltdb_convert_to_idxptr(module, el);
+                               if (ret != LDB_SUCCESS) {
+                                       return ret;
+                               }
+                       }
+               }
+
+               if (ltdb_idxptr_add(module, msg) != 0) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+       }
+
+       ret = ltdb_store(module, msg, flgs);
+       return ret;
+}
+
+
 /*
   find an element in a list, using the given comparison function and
   assuming that the list is already sorted using comp_fn
@@ -213,7 +535,7 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, dn, msg);
+       ret = ltdb_search_dn1_wrap(module, dn, msg);
        talloc_free(dn);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -559,7 +881,7 @@ static int ltdb_index_dn_one(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, key, msg);
+       ret = ltdb_search_dn1_wrap(module, key, msg);
        talloc_free(key);
        if (ret != LDB_SUCCESS) {
                return ret;
@@ -687,7 +1009,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               ret = ltdb_search_dn1(ac->module, dn, msg);
+               ret = ltdb_search_dn1_wrap(ac->module, dn, msg);
                talloc_free(dn);
                if (ret == LDB_ERR_NO_SUCH_OBJECT) {
                        /* the record has disappeared? yes, this can happen */
@@ -895,7 +1217,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
        }
        talloc_steal(msg, dn_key);
 
-       ret = ltdb_search_dn1(module, dn_key, msg);
+       ret = ltdb_search_dn1_wrap(module, dn_key, msg);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(msg);
                return ret;
@@ -920,7 +1242,7 @@ static int ltdb_index_add1(struct ldb_module *module, const char *dn,
        }
 
        if (ret == LDB_SUCCESS) {
-               ret = ltdb_store(module, msg, TDB_REPLACE);
+               ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
        }
 
        talloc_free(msg);
@@ -1007,7 +1329,7 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_search_dn1(module, dn_key, msg);
+       ret = ltdb_search_dn1_wrap(module, dn_key, msg);
        if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
                talloc_free(dn_key);
                return ret;
@@ -1022,9 +1344,15 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
 
        i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
        if (i == -1) {
+               struct ldb_ldif ldif;
+
                ldb_debug(ldb, LDB_DEBUG_ERROR,
                                "ERROR: dn %s not found in %s\n", dn,
                                ldb_dn_get_linearized(dn_key));
+               ldif.changetype = LDB_CHANGETYPE_NONE;
+               ldif.msg = msg;
+               ldb_ldif_write_file(module->ldb, stdout, &ldif);
+               sleep(100);
                /* it ain't there. hmmm */
                talloc_free(dn_key);
                return LDB_SUCCESS;
@@ -1041,7 +1369,7 @@ int ltdb_index_del_value(struct ldb_module *module, const char *dn,
        if (msg->elements[i].num_values == 0) {
                ret = ltdb_delete_noindex(module, dn_key);
        } else {
-               ret = ltdb_store(module, msg, TDB_REPLACE);
+               ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
        }
 
        talloc_free(dn_key);
@@ -1250,8 +1578,8 @@ int ltdb_reindex(struct ldb_module *module)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (tdb_repack(ltdb->tdb) != 0) {
-               return LDB_ERR_OPERATIONS_ERROR;                
+       if (ltdb->idxptr) {
+               ltdb->idxptr->repack = true;
        }
 
        return LDB_SUCCESS;