dsdb-repl_meta_data: Handle renames better, considering only the RDN as given, and...
authorAndrew Bartlett <abartlet@samba.org>
Thu, 16 May 2013 05:19:20 +0000 (15:19 +1000)
committerStefan Metzmacher <metze@samba.org>
Tue, 28 May 2013 14:20:08 +0000 (16:20 +0200)
This ignores the full DN as given, because the parent compents might be out of date.

Andrew Bartlett

Reviewed-by: Stefan Metzmacher <metze@samba.org>
source4/dsdb/samdb/ldb_modules/repl_meta_data.c

index 651cdf1c046efdc1827b6a4f13e26ecece6ab511..5fc71ab1d2bced362dd86ca3e10f46a304f7d973 100644 (file)
@@ -94,6 +94,8 @@ struct replmd_replicated_request {
        bool is_urgent;
 };
 
+static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
+
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
        REPL_URGENT_ON_UPDATE = 2,
@@ -3866,7 +3868,11 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
                break;
 
        case LDB_REPLY_DONE:
-               ret = replmd_replicated_apply_add(ar);
+               if (ar->search_msg != NULL) {
+                       ret = replmd_replicated_apply_merge(ar);
+               } else {
+                       ret = replmd_replicated_apply_add(ar);
+               }
                if (ret != LDB_SUCCESS) {
                        return ldb_module_done(ar->req, NULL, NULL, ret);
                }
@@ -3892,7 +3898,11 @@ static int replmd_replicated_apply_search_for_parent(struct replmd_replicated_re
        ldb = ldb_module_get_ctx(ar->module);
 
        if (!ar->objs->objects[ar->index_current].parent_guid_value.data) {
-               return replmd_replicated_apply_add(ar);
+               if (ar->search_msg != NULL) {
+                       return replmd_replicated_apply_merge(ar);
+               } else {
+                       return replmd_replicated_apply_add(ar);
+               }
        }
 
        tmp_str = ldb_binary_encode(ar, ar->objs->objects[ar->index_current].parent_guid_value);
@@ -3931,84 +3941,52 @@ static int replmd_replicated_apply_search_for_parent(struct replmd_replicated_re
  */
 static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                           struct ldb_message *msg,
-                                          struct replPropertyMetaDataBlob *rmd,
-                                          struct replPropertyMetaDataBlob *omd,
-                                          struct ldb_request *parent,
-                                          bool *renamed)
+                                          struct ldb_request *parent)
 {
-       struct replPropertyMetaData1 *md_remote;
-       struct replPropertyMetaData1 *md_local;
-
-       *renamed = true;
-
-       if (ldb_dn_compare(msg->dn, ar->search_msg->dn) == 0) {
-               /* no rename */
-               return LDB_SUCCESS;
-       }
-
-       /* now we need to check for double renames. We could have a
-        * local rename pending which our replication partner hasn't
-        * received yet. We choose which one wins by looking at the
-        * attribute stamps on the two objects, the newer one wins
-        */
-       md_remote = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
-       md_local  = replmd_replPropertyMetaData1_find_attid(omd, DRSUAPI_ATTID_name);
-       /* if there is no name attribute then we have to assume the
-          object we've received is in fact newer */
-       if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING ||
-           !md_remote || !md_local ||
-           replmd_replPropertyMetaData1_is_newer(md_local, md_remote)) {
-               struct ldb_request *req;
-               int ret;
-               TALLOC_CTX *tmp_ctx = talloc_new(msg);
-               struct ldb_result *res;
-
-               DEBUG(4,("replmd_replicated_request rename %s => %s\n",
-                        ldb_dn_get_linearized(ar->search_msg->dn),
-                        ldb_dn_get_linearized(msg->dn)));
-
+       struct ldb_request *req;
+       int ret;
+       TALLOC_CTX *tmp_ctx = talloc_new(msg);
+       struct ldb_result *res;
 
-               res = talloc_zero(tmp_ctx, struct ldb_result);
-               if (!res) {
-                       talloc_free(tmp_ctx);
-                       return ldb_oom(ldb_module_get_ctx(ar->module));
-               }
-
-               /* pass rename to the next module
-                * so it doesn't appear as an originating update */
-               ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ar->module), tmp_ctx,
-                                          ar->search_msg->dn, msg->dn,
-                                          NULL,
-                                          ar,
-                                          replmd_op_rename_callback,
-                                          parent);
-               LDB_REQ_SET_LOCATION(req);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+       DEBUG(4,("replmd_replicated_request rename %s => %s\n",
+                ldb_dn_get_linearized(ar->search_msg->dn),
+                ldb_dn_get_linearized(msg->dn)));
 
-               ret = dsdb_request_add_controls(req, DSDB_MODIFY_RELAX);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
 
-               ret = ldb_next_request(ar->module, req);
+       res = talloc_zero(tmp_ctx, struct ldb_result);
+       if (!res) {
+               talloc_free(tmp_ctx);
+               return ldb_oom(ldb_module_get_ctx(ar->module));
+       }
 
-               if (ret == LDB_SUCCESS) {
-                       ret = ldb_wait(req->handle, LDB_WAIT_ALL);
-               }
+       /* pass rename to the next module
+        * so it doesn't appear as an originating update */
+       ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ar->module), tmp_ctx,
+                                  ar->search_msg->dn, msg->dn,
+                                  NULL,
+                                  ar,
+                                  replmd_op_rename_callback,
+                                  parent);
+       LDB_REQ_SET_LOCATION(req);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
+       ret = dsdb_request_add_controls(req, DSDB_MODIFY_RELAX);
+       if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       /* we're going to keep our old object */
-       DEBUG(4,(__location__ ": Keeping object %s and rejecting older rename to %s\n",
-                ldb_dn_get_linearized(ar->search_msg->dn),
-                ldb_dn_get_linearized(msg->dn)));
-       return LDB_SUCCESS;
+       ret = ldb_next_request(ar->module, req);
+
+       if (ret == LDB_SUCCESS) {
+               ret = ldb_wait(req->handle, LDB_WAIT_ALL);
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
 }
 
 
@@ -4062,9 +4040,22 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
                                                     "isDeleted", false);
 
-       /* handle renames that come in over DRS */
-       ret = replmd_replicated_handle_rename(ar, msg, rmd, &omd,
-                                             ar->req, &renamed);
+       if (strcmp(ldb_dn_get_linearized(msg->dn), ldb_dn_get_linearized(ar->search_msg->dn)) == 0) {
+               ret = LDB_SUCCESS;
+       } else {
+               /*
+                * handle renames, even just by case that come in over
+                * DRS.  Changes in the parent DN don't hit us here,
+                * because the search for a parent will clean up those
+                * components.
+                *
+                * We also have already filtered out the case where
+                * the peer has an older name to what we have (see
+                * replmd_replicated_apply_search_callback())
+                */
+               renamed = true;
+               ret = replmd_replicated_handle_rename(ar, msg, ar->req);
+       }
 
        /*
         * This particular error code means that we already tried the
@@ -4348,17 +4339,91 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
                break;
 
        case LDB_REPLY_DONE:
+       {
+               struct replPropertyMetaData1 *md_remote;
+               struct replPropertyMetaData1 *md_local;
+
+               struct replPropertyMetaDataBlob omd;
+               const struct ldb_val *omd_value;
+               struct replPropertyMetaDataBlob *rmd;
+               struct ldb_message *msg;
+
                ar->objs->objects[ar->index_current].last_known_parent = NULL;
 
-               if (ar->search_msg != NULL) {
-                       ret = replmd_replicated_apply_merge(ar);
-               } else {
+               /*
+                * This is the ADD case, find the appropriate parent,
+                * as this object doesn't exist locally:
+                */
+               if (ar->search_msg == NULL) {
                        ret = replmd_replicated_apply_search_for_parent(ar);
+                       if (ret != LDB_SUCCESS) {
+                               return ldb_module_done(ar->req, NULL, NULL, ret);
+                       }
+                       talloc_free(ares);
+                       return LDB_SUCCESS;
+               }
+
+               /*
+                * Otherwise, in the MERGE case, work out if we are
+                * attempting a rename, and if so find the parent the
+                * newly renamed object wants to belong under (which
+                * may not be the parent in it's attached string DN
+                */
+               rmd = ar->objs->objects[ar->index_current].meta_data;
+               ZERO_STRUCT(omd);
+               omd.version = 1;
+
+               /* find existing meta data */
+               omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
+               if (omd_value) {
+                       enum ndr_err_code ndr_err;
+                       ndr_err = ndr_pull_struct_blob(omd_value, ar, &omd,
+                                                      (ndr_pull_flags_fn_t)ndr_pull_replPropertyMetaDataBlob);
+                       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+                               NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
+                               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+                       }
+
+                       if (omd.version != 1) {
+                               return replmd_replicated_request_werror(ar, WERR_DS_DRA_INTERNAL_ERROR);
+                       }
+               }
+
+               /*
+                * now we need to check for double renames. We could have a
+                * local rename pending which our replication partner hasn't
+                * received yet. We choose which one wins by looking at the
+                * attribute stamps on the two objects, the newer one wins
+                */
+               md_remote = replmd_replPropertyMetaData1_find_attid(rmd, DRSUAPI_ATTID_name);
+               md_local  = replmd_replPropertyMetaData1_find_attid(&omd, DRSUAPI_ATTID_name);
+               /* if there is no name attribute then we have to assume the
+                  object we've received is in fact newer */
+               if (ar->objs->dsdb_repl_flags & DSDB_REPL_FLAG_PRIORITISE_INCOMING ||
+                   !md_remote || !md_local ||
+                   replmd_replPropertyMetaData1_is_newer(md_local, md_remote)) {
+                       ret = replmd_replicated_apply_search_for_parent(ar);
+               } else {
+                       msg = ar->objs->objects[ar->index_current].msg;
+
+                       /* Otherwise, just merge on the existing object, force no rename */
+                       DEBUG(4,(__location__ ": Keeping object %s and rejecting older rename to %s\n",
+                                ldb_dn_get_linearized(ar->search_msg->dn),
+                                ldb_dn_get_linearized(msg->dn)));
+
+                       /*
+                        * This assignment ensures that the strcmp()
+                        * in replmd_replicated_apply_merge() avoids
+                        * the rename call
+                        */
+                       msg->dn = ar->search_msg->dn;
+                       ret = replmd_replicated_apply_merge(ar);
                }
                if (ret != LDB_SUCCESS) {
                        return ldb_module_done(ar->req, NULL, NULL, ret);
                }
        }
+       }
 
        talloc_free(ares);
        return LDB_SUCCESS;