s4:dsdb Load new partitions in a running LDB if metadata changes
authorAndrew Bartlett <abartlet@samba.org>
Wed, 14 Oct 2009 04:16:34 +0000 (15:16 +1100)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 21 Oct 2009 11:43:53 +0000 (22:43 +1100)
This allows one instance of LDB to add a partition, and another to use
it without first closing the database.

Andrew Bartlett

source4/dsdb/samdb/ldb_modules/partition.c
source4/dsdb/samdb/ldb_modules/partition_init.c

index ea5dff7dce562032ea11ee8a16077ff90d98656e..d1892f0c5447a721725c25a31eba2d6c932a229b 100644 (file)
@@ -621,7 +621,6 @@ static int partition_delete(struct ldb_module *module, struct ldb_request *req)
 /* rename */
 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
 {
-       int ret;
        /* Find backend */
        struct dsdb_partition *backend, *backend2;
        
index 3fbd2c128a090546442d04c36fb79a0702449f7a..a5e83e734db7a290932dd82c1cb0d7fe7d4730d8 100644 (file)
@@ -156,38 +156,7 @@ static int partition_reload_metadata(struct ldb_module *module, struct partition
        } else {
                talloc_free(msg);
        }
-       return LDB_SUCCESS;
-}
 
-int partition_reload_if_required(struct ldb_module *module, 
-                                struct partition_private_data *data)
-       
-{
-       uint64_t seq;
-       int ret;
-       TALLOC_CTX *mem_ctx = talloc_new(data);
-       if (!data) {
-               /* Not initilised yet */
-               return LDB_SUCCESS;
-       }
-       if (!mem_ctx) {
-               ldb_oom(ldb_module_get_ctx(module));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       ret = partition_primary_sequence_number(module, mem_ctx, LDB_SEQ_HIGHEST_SEQ, &seq);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(mem_ctx);
-               return ret;
-       }
-       if (seq != data->metadata_seq) {
-               ret = partition_reload_metadata(module, data, mem_ctx, NULL);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(mem_ctx);
-                       return ret;
-               }
-               data->metadata_seq = seq;
-       }
-       talloc_free(mem_ctx);
        return LDB_SUCCESS;
 }
 
@@ -300,6 +269,150 @@ static int new_partition_from_dn(struct ldb_context *ldb, struct partition_priva
        return ret;
 }
 
+/* Tell the rootDSE about the new partition */
+static int partition_register(struct ldb_context *ldb, struct dsdb_control_current_partition *ctrl) 
+{
+       struct ldb_request *req;
+       int ret;
+
+       req = talloc_zero(NULL, struct ldb_request);
+       if (req == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+               
+       req->operation = LDB_REQ_REGISTER_PARTITION;
+       req->op.reg_partition.dn = ctrl->dn;
+       req->callback = ldb_op_default_callback;
+
+       ldb_set_timeout(ldb, req, 0);
+       
+       req->handle = ldb_handle_new(req, ldb);
+       if (req->handle == NULL) {
+               talloc_free(req);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       
+       ret = ldb_request(ldb, req);
+       if (ret == LDB_SUCCESS) {
+               ret = ldb_wait(req->handle, LDB_WAIT_ALL);
+       }
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
+               talloc_free(req);
+               return LDB_ERR_OTHER;
+       }
+       talloc_free(req);
+
+       return LDB_SUCCESS;
+}
+
+/* Add a newly found partition to the global data */
+static int add_partition_to_data(struct ldb_context *ldb, struct partition_private_data *data,
+                                struct dsdb_partition *partition)
+{
+       int i, ret;
+       /* Count the partitions */
+       for (i=0; data->partitions && data->partitions[i]; i++) { /* noop */};
+       
+       /* Add partition to list of partitions */
+       data->partitions = talloc_realloc(data, data->partitions, struct dsdb_partition *, i + 2);
+       if (!data->partitions) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       data->partitions[i] = talloc_steal(data->partitions, partition);
+       data->partitions[i+1] = NULL;
+       
+       /* Sort again (should use binary insert) */
+       qsort(data->partitions, i+1,
+             sizeof(*data->partitions), partition_sort_compare);
+       
+       ret = partition_register(ldb, partition->ctrl);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       return LDB_SUCCESS;
+}
+
+int partition_reload_if_required(struct ldb_module *module, 
+                                struct partition_private_data *data)
+       
+{
+       uint64_t seq;
+       int ret, i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       TALLOC_CTX *mem_ctx = talloc_new(data);
+       if (!data) {
+               /* Not initilised yet */
+               return LDB_SUCCESS;
+       }
+       if (!mem_ctx) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ret = partition_primary_sequence_number(module, mem_ctx, LDB_SEQ_HIGHEST_SEQ, &seq);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(mem_ctx);
+               return ret;
+       }
+       if (seq != data->metadata_seq) {
+               struct ldb_message *msg;
+               struct ldb_message_element *partition_attributes;
+               ret = partition_reload_metadata(module, data, mem_ctx, &msg);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(mem_ctx);
+                       return ret;
+               }
+
+               data->metadata_seq = seq;
+
+               partition_attributes = ldb_msg_find_element(msg, "partition");
+
+               for (i=0; partition_attributes && i < partition_attributes->num_values; i++) {
+                       bool new_partition = true;
+                       struct dsdb_partition *partition;
+                       struct ldb_dn *dn = ldb_dn_from_ldb_val(mem_ctx, ldb, &partition_attributes->values[i]);
+                       if (!dn) {
+                               ldb_asprintf_errstring(ldb, 
+                                                      "partition_init: invalid DN in partition record: %s", (const char *)partition_attributes->values[i].data);
+                               talloc_free(mem_ctx);
+                               return LDB_ERR_CONSTRAINT_VIOLATION;
+                       }
+                       
+                       for (i=0; data->partitions && data->partitions[i]; i++) {
+                               if (ldb_dn_compare(data->partitions[i]->ctrl->dn, dn) == 0) {
+                                       new_partition = false;
+                                       break;
+                               }
+                       }
+                       if (new_partition == false) {
+                               continue;
+                       }
+                       
+                       /* We call ldb_dn_get_linearized() because the DN in
+                        * partition_attributes is already casefolded
+                        * correctly.  We don't want to mess that up as the
+                        * schema isn't loaded yet */
+                       ret = new_partition_from_dn(ldb, data, data->partitions, dn, 
+                                                   ldb_dn_get_linearized(dn),
+                                                   &partition);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(mem_ctx);
+                               return ret;
+                       }
+
+                       ret = add_partition_to_data(ldb, data, partition);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(mem_ctx);
+                               return ret;
+                       }
+               }
+       }
+       talloc_free(mem_ctx);
+       return LDB_SUCCESS;
+}
+
 /* Copy the metadata (@OPTIONS etc) for the new partition into the partition */
 
 static int new_partition_set_replicated_metadata(struct ldb_context *ldb, 
@@ -426,42 +539,9 @@ static int new_partition_set_replicated_metadata(struct ldb_context *ldb,
        return LDB_SUCCESS;
 }
 
-static int partition_register(struct ldb_context *ldb, struct dsdb_control_current_partition *ctrl, TALLOC_CTX *mem_ctx) 
-{
-       struct ldb_request *req;
-       int ret;
-
-       req = talloc_zero(mem_ctx, struct ldb_request);
-       if (req == NULL) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-               
-       req->operation = LDB_REQ_REGISTER_PARTITION;
-       req->op.reg_partition.dn = ctrl->dn;
-       req->callback = ldb_op_default_callback;
-
-       ldb_set_timeout(ldb, req, 0);
-       
-       req->handle = ldb_handle_new(req, ldb);
-       if (req->handle == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       
-       ret = ldb_request(ldb, req);
-       if (ret == LDB_SUCCESS) {
-               ret = ldb_wait(req->handle, LDB_WAIT_ALL);
-       }
-       if (ret != LDB_SUCCESS) {
-               ldb_debug(ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
-               talloc_free(mem_ctx);
-               return LDB_ERR_OTHER;
-       }
-       talloc_free(req);
-
-       return LDB_SUCCESS;
-}
-
+/* Extended operation to create a new partition, called when
+ * 'new_partition' detects that one is being added based on it's
+ * instanceType */
 int partition_create(struct ldb_module *module, struct ldb_request *req)
 {
        int i, ret;
@@ -554,23 +634,7 @@ int partition_create(struct ldb_module *module, struct ldb_request *req)
        }
        
        if (new_partition) {
-               /* Count the partitions */
-               for (i=0; data->partitions && data->partitions[i]; i++) { /* noop */};
-               
-               /* Add partition to list of partitions */
-               data->partitions = talloc_realloc(data, data->partitions, struct dsdb_partition *, i + 2);
-               if (!data->partitions) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               data->partitions[i] = talloc_steal(data->partitions, partition);
-               data->partitions[i+1] = NULL;
-               
-               /* Sort again (should use binary insert) */
-               qsort(data->partitions, i+1,
-                     sizeof(*data->partitions), partition_sort_compare);
-               
-               ret = partition_register(ldb, partition->ctrl, req);
+               ret = add_partition_to_data(ldb, data, partition);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -583,11 +647,8 @@ int partition_create(struct ldb_module *module, struct ldb_request *req)
 
 int partition_init(struct ldb_module *module)
 {
-       int ret, i;
+       int ret;
        TALLOC_CTX *mem_ctx = talloc_new(module);
-       struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       struct ldb_message_element *partition_attributes;
 
        struct partition_private_data *data;
 
@@ -600,65 +661,12 @@ int partition_init(struct ldb_module *module)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = partition_primary_sequence_number(module, mem_ctx, LDB_SEQ_HIGHEST_SEQ, &data->metadata_seq);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(mem_ctx);
-               return ret;
-       }
-
-       ret = partition_reload_metadata(module, data, mem_ctx, &msg);
+       /* This loads the partitions */
+       ret = partition_reload_if_required(module, data);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       partition_attributes = ldb_msg_find_element(msg, "partition");
-       if (!partition_attributes) {
-               data->partitions = NULL;
-       } else {
-               data->partitions = talloc_array(data, struct dsdb_partition *, partition_attributes->num_values + 1);
-               if (!data->partitions) {
-                       ldb_oom(ldb_module_get_ctx(module));
-                       talloc_free(mem_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-       }
-       for (i=0; partition_attributes && i < partition_attributes->num_values; i++) {
-               struct ldb_dn *dn = ldb_dn_from_ldb_val(mem_ctx, ldb, &partition_attributes->values[i]);
-               if (!dn) {
-                       ldb_asprintf_errstring(ldb_module_get_ctx(module), 
-                                              "partition_init: invalid DN in partition record: %s", (const char *)partition_attributes->values[i].data);
-                       talloc_free(mem_ctx);
-                       return LDB_ERR_CONSTRAINT_VIOLATION;
-               }
-       
-               /* We call ldb_dn_get_linearized() because the DN in
-                * partition_attributes is already casefolded
-                * correctly.  We don't want to mess that up as the
-                * schema isn't loaded yet */
-               ret = new_partition_from_dn(ldb, data, data->partitions, dn, 
-                                           ldb_dn_get_linearized(dn),
-                                           &data->partitions[i]);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(mem_ctx);
-                       return ret;
-               }
-       }
-
-       if (data->partitions) {
-               data->partitions[i] = NULL;
-
-               /* sort these into order, most to least specific */
-               qsort(data->partitions, partition_attributes->num_values,
-                     sizeof(*data->partitions), partition_sort_compare);
-       }
-
-       for (i=0; data->partitions && data->partitions[i]; i++) {
-               ret = partition_register(ldb, data->partitions[i]->ctrl, mem_ctx);
-               if (ret != LDB_SUCCESS) {
-                       return ret;
-               }
-       }
-
        ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_ERROR,