r20681: implement the DSDB_EXTENDED_REPLICATED_OBJECTS operation.
authorStefan Metzmacher <metze@samba.org>
Thu, 11 Jan 2007 09:54:10 +0000 (09:54 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:40:21 +0000 (14:40 -0500)
the merging of existing objects is not implemented yet...

there are a few ifdef REPLMD_FULL_ASYNC because we need to workarouns
ldb's async infrastructure (which don't handle full async sub requests nicely)

metze
(This used to be commit da4ff0e7ccde47b3e092313ba22422350cf50f78)

source4/dsdb/samdb/ldb_modules/config.mk
source4/dsdb/samdb/ldb_modules/repl_meta_data.c

index f2706c0995f8b0478995bd6b44f57ea10a38acde..663f2cfb632aa9f3dfdcde58447f66007bc336a9 100644 (file)
@@ -15,7 +15,7 @@ PUBLIC_DEPENDENCIES = \
 # Start MODULE ldb_repl_mata_data
 [MODULE::ldb_repl_meta_data]
 SUBSYSTEM = ldb
-PRIVATE_DEPENDENCIES = SAMDB LIBTALLOC LIBNDR NDR_MISC NDR_DRSUAPI
+PRIVATE_DEPENDENCIES = SAMDB LIBTALLOC LIBNDR NDR_MISC NDR_DRSUAPI NDR_DRSBLOBS
 INIT_FUNCTION = repl_meta_data_module_init
 OBJ_FILES = \
                repl_meta_data.o
index 803142bbb7e88dd494d1b7f01dfdef6364f044ba..487a6146af3f745d9ddd7a74f484ebf32fa9c128 100644 (file)
 #include "lib/ldb/include/ldb.h"
 #include "lib/ldb/include/ldb_errors.h"
 #include "lib/ldb/include/ldb_private.h"
-#include "librpc/gen_ndr/ndr_misc.h"
 #include "dsdb/samdb/samdb.h"
+#include "librpc/gen_ndr/ndr_misc.h"
+#include "librpc/gen_ndr/ndr_drsblobs.h"
+
+struct replmd_replicated_request {
+       struct ldb_module *module;
+       struct ldb_handle *handle;
+       struct ldb_request *orig_req;
+
+       struct dsdb_extended_replicated_objects *objs;
+
+       uint32_t index_current;
+
+       struct {
+               TALLOC_CTX *mem_ctx;
+               struct ldb_request *search_req;
+               struct ldb_message *search_msg;
+               int search_ret;
+               struct ldb_request *change_req;
+               int change_ret;
+       } sub;
+};
+
+static struct replmd_replicated_request *replmd_replicated_init_handle(struct ldb_module *module,
+                                                                      struct ldb_request *req,
+                                                                      struct dsdb_extended_replicated_objects *objs)
+{
+       struct replmd_replicated_request *ar;
+       struct ldb_handle *h;
+
+       h = talloc_zero(req, struct ldb_handle);
+       if (h == NULL) {
+               ldb_set_errstring(module->ldb, "Out of Memory");
+               return NULL;
+       }
+
+       h->module       = module;
+       h->state        = LDB_ASYNC_PENDING;
+       h->status       = LDB_SUCCESS;
+
+       ar = talloc_zero(h, struct replmd_replicated_request);
+       if (ar == NULL) {
+               ldb_set_errstring(module->ldb, "Out of Memory");
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->private_data = ar;
+
+       ar->module      = module;
+       ar->handle      = h;
+       ar->orig_req    = req;
+       ar->objs        = objs;
+
+       req->handle = h;
+
+       return ar;
+}
 
 static struct ldb_message_element *replmd_find_attribute(const struct ldb_message *msg, const char *name)
 {
@@ -311,10 +367,281 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        return replmd_modify_originating(module, req);
 }
 
+static int replmd_replicated_request_reply_helper(struct replmd_replicated_request *ar, int ret)
+{
+       struct ldb_reply *ares = NULL;
+
+       ar->handle->status = ret;
+       ar->handle->state = LDB_ASYNC_DONE;
+
+       if (!ar->orig_req->callback) {
+               return LDB_SUCCESS;
+       }
+       
+       /* we're done and need to report the success to the caller */
+       ares = talloc_zero(ar, struct ldb_reply);
+       if (!ares) {
+               ar->handle->status = LDB_ERR_OPERATIONS_ERROR;
+               ar->handle->state = LDB_ASYNC_DONE;
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ares->type      = LDB_REPLY_EXTENDED;
+       ares->response  = NULL;
+
+       return ar->orig_req->callback(ar->module->ldb, ar->orig_req->context, ares);
+}
+
+static int replmd_replicated_request_done(struct replmd_replicated_request *ar)
+{
+       return replmd_replicated_request_reply_helper(ar, LDB_SUCCESS);
+}
+
+static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
+{
+       return replmd_replicated_request_reply_helper(ar, ret);
+}
+
+static int replmd_replicated_request_werror(struct replmd_replicated_request *ar, WERROR status)
+{
+       int ret = LDB_ERR_OTHER;
+       /* TODO: do some error mapping */
+       return replmd_replicated_request_reply_helper(ar, ret);
+}
+
+static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
+
+static int replmd_replicated_apply_add_callback(struct ldb_context *ldb,
+                                               void *private_data,
+                                               struct ldb_reply *ares)
+{
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       struct replmd_replicated_request *ar = talloc_get_type(private_data,
+                                              struct replmd_replicated_request);
+
+       ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.change_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.change_ret);
+       }
+
+       talloc_free(ar->sub.mem_ctx);
+       ZERO_STRUCT(ar->sub);
+
+       ar->index_current++;
+
+       return replmd_replicated_apply_next(ar);
+#else
+       return LDB_SUCCESS;
+#endif
+}
+
+static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
+{
+       NTSTATUS nt_status;
+       struct ldb_message *msg;
+       struct replPropertyMetaDataBlob *md;
+       struct ldb_val md_value;
+       uint32_t i;
+       uint64_t seq_num;
+       int ret;
+
+       msg = ar->objs->objects[ar->index_current].msg;
+       md = ar->objs->objects[ar->index_current].meta_data;
+
+       ret = ldb_sequence_number(ar->module->ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNCreated", seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       ret = samdb_msg_add_uint64(ar->module->ldb, msg, msg, "uSNChanged", seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       md = ar->objs->objects[ar->index_current].meta_data;
+       for (i=0; i < md->ctr.ctr1.count; i++) {
+               md->ctr.ctr1.array[i].local_usn = seq_num;
+       }
+       nt_status = ndr_push_struct_blob(&md_value, msg, md,
+                                        (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
+       if (!NT_STATUS_IS_OK(nt_status)) {
+               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+       }
+       ret = ldb_msg_add_value(msg, "replPropertyMetaData", &md_value, NULL);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       ret = ldb_build_add_req(&ar->sub.change_req,
+                               ar->module->ldb,
+                               ar->sub.mem_ctx,
+                               msg,
+                               NULL,
+                               ar,
+                               replmd_replicated_apply_add_callback);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       return ldb_next_request(ar->module, ar->sub.change_req);
+#else
+       ret = ldb_next_request(ar->module, ar->sub.change_req);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+       ar->sub.change_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.change_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.change_ret);
+       }
+
+       talloc_free(ar->sub.mem_ctx);
+       ZERO_STRUCT(ar->sub);
+
+       ar->index_current++;
+
+       return LDB_SUCCESS;
+#endif
+}
+
+static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
+{
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+#error sorry replmd_replicated_apply_merge not implemented
+#else
+       ldb_debug(ar->module->ldb, LDB_DEBUG_FATAL,
+                 "replmd_replicated_apply_merge: ignore [%u]\n",
+                 ar->index_current);
+
+       talloc_free(ar->sub.mem_ctx);
+       ZERO_STRUCT(ar->sub);
+
+       ar->index_current++;
+
+       return LDB_SUCCESS;
+#endif
+}
+
+static int replmd_replicated_apply_search_callback(struct ldb_context *ldb,
+                                                  void *private_data,
+                                                  struct ldb_reply *ares)
+{
+       struct replmd_replicated_request *ar = talloc_get_type(private_data,
+                                              struct replmd_replicated_request);
+       bool is_done = false;
+
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+               ar->sub.search_msg = talloc_steal(ar->sub.mem_ctx, ares->message);
+               break;
+       case LDB_REPLY_REFERRAL:
+               /* we ignore referrals */
+               break;
+       case LDB_REPLY_EXTENDED:
+       case LDB_REPLY_DONE:
+               is_done = true;
+       }
+
+       talloc_free(ares);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       if (is_done) {
+               ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+               if (ar->sub.search_ret != LDB_SUCCESS) {
+                       return replmd_replicated_request_error(ar, ar->sub.search_ret);
+               }
+               if (ar->sub.search_msg) {
+                       return replmd_replicated_apply_merge(ar);
+               }
+               return replmd_replicated_apply_add(ar);
+       }
+#endif
+       return LDB_SUCCESS;
+}
+
+static int replmd_replicated_apply_search(struct replmd_replicated_request *ar)
+{
+       int ret;
+       char *tmp_str;
+       char *filter;
+
+       tmp_str = ldb_binary_encode(ar->sub.mem_ctx, ar->objs->objects[ar->index_current].guid_value);
+       if (!tmp_str) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+       filter = talloc_asprintf(ar->sub.mem_ctx, "(objectGUID=%s)", tmp_str);
+       if (!filter) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+       talloc_free(tmp_str);
+
+       ret = ldb_build_search_req(&ar->sub.search_req,
+                                  ar->module->ldb,
+                                  ar->sub.mem_ctx,
+                                  ar->objs->partition_dn,
+                                  LDB_SCOPE_SUBTREE,
+                                  filter,
+                                  NULL,
+                                  NULL,
+                                  ar,
+                                  replmd_replicated_apply_search_callback);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       return ldb_next_request(ar->module, ar->sub.search_req);
+#else
+       ret = ldb_next_request(ar->module, ar->sub.search_req);
+       if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
+
+       ar->sub.search_ret = ldb_wait(ar->sub.search_req->handle, LDB_WAIT_ALL);
+       if (ar->sub.search_ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ar->sub.search_ret);
+       }
+       if (ar->sub.search_msg) {
+               return replmd_replicated_apply_merge(ar);
+       }
+
+       return replmd_replicated_apply_add(ar);
+#endif
+}
+
+static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
+{
+       if (ar->index_current >= ar->objs->num_objects) {
+               return replmd_replicated_request_done(ar);
+       }
+
+       ar->sub.mem_ctx = talloc_new(ar);
+       if (!ar->sub.mem_ctx) return replmd_replicated_request_werror(ar, WERR_NOMEM);
+
+       return replmd_replicated_apply_search(ar);
+}
+
 static int replmd_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
 {
+       struct dsdb_extended_replicated_objects *objs;
+       struct replmd_replicated_request *ar;
+
        ldb_debug(module->ldb, LDB_DEBUG_TRACE, "replmd_extended_replicated_objects\n");
-       return LDB_ERR_OPERATIONS_ERROR;
+
+       objs = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
+       if (!objs) {
+               return LDB_ERR_PROTOCOL_ERROR;
+       }
+
+       ar = replmd_replicated_init_handle(module, req, objs);
+       if (!ar) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+#ifdef REPLMD_FULL_ASYNC /* TODO: active this code when ldb support full async code */ 
+       return replmd_replicated_apply_next(ar);
+#else
+       while (req->handle->state != LDB_ASYNC_DONE) {
+               replmd_replicated_apply_next(ar);
+       }
+
+       return LDB_SUCCESS;
+#endif
 }
 
 static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
@@ -326,11 +653,55 @@ static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
        return ldb_next_request(module, req);
 }
 
+static int replmd_wait_none(struct ldb_handle *handle) {
+       struct replmd_replicated_request *ar;
+    
+       if (!handle || !handle->private_data) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       ar = talloc_get_type(handle->private_data, struct replmd_replicated_request);
+       if (!ar) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* we do only sync calls */
+       if (handle->state != LDB_ASYNC_DONE) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       return handle->status;
+}
+
+static int replmd_wait_all(struct ldb_handle *handle) {
+
+       int ret;
+
+       while (handle->state != LDB_ASYNC_DONE) {
+               ret = replmd_wait_none(handle);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       return handle->status;
+}
+
+static int replmd_wait(struct ldb_handle *handle, enum ldb_wait_type type)
+{
+       if (type == LDB_WAIT_ALL) {
+               return replmd_wait_all(handle);
+       } else {
+               return replmd_wait_none(handle);
+       }
+}
+
 static const struct ldb_module_ops replmd_ops = {
        .name          = "repl_meta_data",
        .add           = replmd_add,
        .modify        = replmd_modify,
        .extended      = replmd_extended,
+       .wait          = replmd_wait
 };
 
 int repl_meta_data_module_init(void)