s4:dsdb Remove potentially confusing 'partition' control from result
[amitay/samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 73c070aa0aa6c96349f89e24a98994aece61ccd4..00f3856857600e1351908e121849d7bb5038c6ab 100644 (file)
@@ -51,8 +51,8 @@ struct replmd_private {
        struct la_entry *la_list;
        uint32_t num_ncs;
        struct nc_entry {
+               struct nc_entry *prev, *next;
                struct ldb_dn *dn;
-               struct GUID guid;
                uint64_t mod_usn;
        } *ncs;
 };
@@ -68,16 +68,21 @@ struct replmd_replicated_request {
 
        const struct dsdb_schema *schema;
 
-       struct dsdb_extended_replicated_objects *objs;
-
        /* the controls we pass down */
        struct ldb_control **controls;
 
+       /* details for the mode where we apply a bunch of inbound replication meessages */
+       bool apply_mode;
        uint32_t index_current;
+       struct dsdb_extended_replicated_objects *objs;
 
        struct ldb_message *search_msg;
+
+       uint64_t seq_num;
+
 };
 
+static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
 
 /*
   initialise the module
@@ -100,130 +105,97 @@ static int replmd_init(struct ldb_module *module)
 }
 
 
-static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
-{
-       return ldb_dn_compare(n1->dn, n2->dn);
-}
-
 /*
-  build the list of partition DNs for use by replmd_notify()
+ * Callback for most write operations in this module:
+ * 
+ * notify the repl task that a object has changed. The notifies are
+ * gathered up in the replmd_private structure then written to the
+ * @REPLCHANGED object in each partition during the prepare_commit
  */
-static int replmd_load_NCs(struct ldb_module *module)
+static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
 {
-       const char *attrs[] = { "namingContexts", NULL };
-       struct ldb_result *res = NULL;
-       int i, ret;
-       TALLOC_CTX *tmp_ctx;
-       struct ldb_context *ldb;
-       struct ldb_message_element *el;
+       int ret;
+       struct replmd_replicated_request *ac = 
+               talloc_get_type_abort(req->context, struct replmd_replicated_request);
        struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+               talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
+       struct nc_entry *modified_partition;
+       struct ldb_control *partition_ctrl;
+       const struct dsdb_control_current_partition *partition;
 
-       if (replmd_private->ncs != NULL) {
-               return LDB_SUCCESS;
-       }
+       struct ldb_control **controls;
 
-       ldb = ldb_module_get_ctx(module);
-       tmp_ctx = talloc_new(module);
-
-       /* load the list of naming contexts */
-       ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
-                        LDB_SCOPE_BASE, attrs, NULL);
-       if (ret != LDB_SUCCESS ||
-           res->count != 1) {
-               DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
 
-       el = ldb_msg_find_element(res->msgs[0], "namingContexts");
-       if (el == NULL) {
-               DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
-               return LDB_ERR_OPERATIONS_ERROR;                
-       }
+       /* Remove the 'partition' control from what we pass up the chain */
+       controls = controls_except_specified(ares->controls, ares, partition_ctrl);
 
-       replmd_private->num_ncs = el->num_values;
-       replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
-                                          replmd_private->num_ncs);
-       if (replmd_private->ncs == NULL) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ares->error != LDB_SUCCESS) {
+               return ldb_module_done(ac->req, controls,
+                                       ares->response, ares->error);
        }
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               replmd_private->ncs[i].dn = 
-                       ldb_dn_from_ldb_val(replmd_private->ncs, 
-                                           ldb, &el->values[i]);
-               replmd_private->ncs[i].mod_usn = 0;
+       if (ares->type != LDB_REPLY_DONE) {
+               ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
+               return ldb_module_done(ac->req, NULL,
+                                      NULL, LDB_ERR_OPERATIONS_ERROR);
        }
 
-       talloc_free(res);
-
-       /* now find the GUIDs of each of those DNs */
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               const char *attrs2[] = { "objectGUID", NULL };
-               ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
-                                LDB_SCOPE_BASE, attrs2, NULL);
-               if (ret != LDB_SUCCESS ||
-                   res->count != 1) {
-                       /* this happens when the schema is first being
-                          setup */
-                       talloc_free(replmd_private->ncs);
-                       replmd_private->ncs = NULL;
-                       replmd_private->num_ncs = 0;
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
-               replmd_private->ncs[i].guid = 
-                       samdb_result_guid(res->msgs[0], "objectGUID");
-               talloc_free(res);
-       }       
-
-       /* sort the NCs into order, most to least specific */
-       qsort(replmd_private->ncs, replmd_private->num_ncs,
-             sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
+       if (!partition_ctrl) {
+               return ldb_module_done(ac->req, NULL,
+                                      NULL, LDB_ERR_OPERATIONS_ERROR);
+       }
 
+       partition = talloc_get_type_abort(partition_ctrl->data,
+                                   struct dsdb_control_current_partition);
        
-       talloc_free(tmp_ctx);
-       
-       return LDB_SUCCESS;
-}
-
-
-/*
- * notify the repl task that a object has changed. The notifies are
- * gathered up in the replmd_private structure then written to the
- * @REPLCHANGED object in each partition during the prepare_commit
- */
-static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
-{
-       int ret, i;
-       struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-
-       ret = replmd_load_NCs(module);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-       if (replmd_private->num_ncs == 0) {
-               return LDB_SUCCESS;
-       }
+       if (ac->seq_num > 0) {
+               for (modified_partition = replmd_private->ncs; modified_partition; 
+                    modified_partition = modified_partition->next) {
+                       if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
+                               break;
+                       }
+               }
+               
+               if (modified_partition == NULL) {
+                       modified_partition = talloc_zero(replmd_private, struct nc_entry);
+                       if (!modified_partition) {
+                               ldb_oom(ldb_module_get_ctx(ac->module));
+                               return ldb_module_done(ac->req, NULL,
+                                                      NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
+                       if (!modified_partition->dn) {
+                               ldb_oom(ldb_module_get_ctx(ac->module));
+                               return ldb_module_done(ac->req, NULL,
+                                                      NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       DLIST_ADD(replmd_private->ncs, modified_partition);
+               }
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
-                       break;
+               if (ac->seq_num > modified_partition->mod_usn) {
+                       modified_partition->mod_usn = ac->seq_num;
                }
        }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
-                        ldb_dn_get_linearized(dn)));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
-       if (uSN > replmd_private->ncs[i].mod_usn) {
-           replmd_private->ncs[i].mod_usn = uSN;
+       if (ac->apply_mode) {
+               talloc_free(ares);
+               ac->index_current++;
+               
+               ret = replmd_replicated_apply_next(ac);
+               if (ret != LDB_SUCCESS) {
+                       return ldb_module_done(ac->req, NULL, NULL, ret);
+               }
+               return ret;
+       } else {
+               /* free the partition control container here, for the
+                * common path.  Other cases will have it cleaned up
+                * eventually with the ares */
+               talloc_free(partition_ctrl);
+               return ldb_module_done(ac->req, 
+                                      controls_except_specified(controls, ares, partition_ctrl),
+                                      ares->response, LDB_SUCCESS);
        }
-
-       return LDB_SUCCESS;
 }
 
 
@@ -233,25 +205,19 @@ static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t
  */
 static int replmd_notify_store(struct ldb_module *module)
 {
-       int i;
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
        struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct nc_entry *modified_partition;
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
+       for (modified_partition = replmd_private->ncs; modified_partition; 
+            modified_partition = modified_partition->next) {
                int ret;
 
-               if (replmd_private->ncs[i].mod_usn == 0) {
-                       /* this partition has not changed in this
-                          transaction */
-                       continue;
-               }
-
-               ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
-                                             replmd_private->ncs[i].mod_usn);
+               ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
                if (ret != LDB_SUCCESS) {
                        DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
-                                ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
+                                ldb_dn_get_linearized(modified_partition->dn)));
                        return ret;
                }
        }
@@ -279,6 +245,15 @@ static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *modu
 
        ac->module = module;
        ac->req = req;
+
+       ac->schema = dsdb_get_schema(ldb);
+       if (!ac->schema) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "replmd_modify: no dsdb_schema loaded");
+               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
+               return NULL;
+       }
+
        return ac;
 }
 
@@ -425,42 +400,12 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
                  discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
 }
 
-static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ac;
-
-       ac = talloc_get_type(req->context, struct replmd_replicated_request);
-       ldb = ldb_module_get_ctx(ac->module);
-
-       if (!ares) {
-               return ldb_module_done(ac->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ac->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb,
-                                 "invalid ldb_reply_type in callback");
-               talloc_free(ares);
-               return ldb_module_done(ac->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       return ldb_module_done(ac->req, ares->controls,
-                               ares->response, LDB_SUCCESS);
-}
-
 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
         struct ldb_control *control;
         struct ldb_control **saved_controls;
        struct replmd_replicated_request *ac;
-       const struct dsdb_schema *schema;
        enum ndr_err_code ndr_err;
        struct ldb_request *down_req;
        struct ldb_message *msg;
@@ -469,17 +414,16 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        struct ldb_val guid_value;
        struct replPropertyMetaDataBlob nmd;
        struct ldb_val nmd_value;
-       uint64_t seq_num;
        const struct GUID *our_invocation_id;
        time_t t = time(NULL);
        NTTIME now;
        char *time_str;
        int ret;
        uint32_t i, ni=0;
-       int allow_add_guid=0;
-       int remove_current_guid=0;
+       bool allow_add_guid = false;
+       bool remove_current_guid = false;
 
-        /* check if there's a show deleted control */
+        /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
         control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
        if (control) {
                allow_add_guid = 1;
@@ -494,37 +438,29 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
 
-       schema = dsdb_get_schema(ldb);
-       if (!schema) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             "replmd_add: no dsdb_schema loaded");
-               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
-               return LDB_ERR_CONSTRAINT_VIOLATION;
-       }
-
        ac = replmd_ctx_init(module, req);
        if (!ac) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ac->schema = schema;
-
         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
        if ( guid_blob != NULL ) {
                if( !allow_add_guid ) {
                        ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                              "replmd_add: it's not allowed to add an object with objectGUID\n");
+                       talloc_free(ac);
                        return LDB_ERR_UNWILLING_TO_PERFORM;
                } else {
                        NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
                        if ( !NT_STATUS_IS_OK(status)) {
                                        ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                                      "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
+                               talloc_free(ac);
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        }
                        /* we remove this attribute as it can be a string and will not be treated 
                        correctly and then we will readd it latter on in the good format*/
-                       remove_current_guid = 1;
+                       remove_current_guid = true;
                }
        } else {
                /* a new GUID */
@@ -532,17 +468,18 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        }
 
        /* Get a sequence number from the backend */
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
-
        /* get our invocationId */
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
                ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                              "replmd_add: unable to find invocationId\n");
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -550,6 +487,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        msg = ldb_msg_copy_shallow(ac, req->op.add.message);
        if (msg == NULL) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -557,6 +495,8 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        unix_to_nt_time(&now, t);
        time_str = ldb_timestring(msg, t);
        if (!time_str) {
+               ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        if (remove_current_guid) {
@@ -572,21 +512,14 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        ldb_msg_remove_attr(msg, "uSNChanged");
        ldb_msg_remove_attr(msg, "replPropertyMetaData");
 
-       if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
-               ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
-               if (ret != LDB_SUCCESS) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-       }
-
        /*
         * readd replicated attributes
         */
        ret = ldb_msg_add_string(msg, "whenCreated", time_str);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
 
        /* build the replication meta_data */
@@ -598,6 +531,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                               nmd.ctr.ctr1.count);
        if (!nmd.ctr.ctr1.array) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -608,11 +542,12 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 
                if (e->name[0] == '@') continue;
 
-               sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
+               sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
                if (!sa) {
                        ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                                      "replmd_add: attribute '%s' not defined in schema\n",
                                      e->name);
+                       talloc_free(ac);
                        return LDB_ERR_NO_SUCH_ATTRIBUTE;
                }
 
@@ -627,8 +562,8 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                m->version                      = 1;
                m->originating_change_time      = now;
                m->originating_invocation_id    = *our_invocation_id;
-               m->originating_usn              = seq_num;
-               m->local_usn                    = seq_num;
+               m->originating_usn              = ac->seq_num;
+               m->local_usn                    = ac->seq_num;
                ni++;
        }
 
@@ -638,8 +573,9 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        /*
         * sort meta data array, and move the rdn attribute entry to the end
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
@@ -650,6 +586,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                       (ndr_push_flags_fn_t)ndr_push_GUID);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
@@ -658,6 +595,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                       (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -667,33 +605,38 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
        ret = ldb_msg_add_string(msg, "whenChanged", time_str);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
        ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
 
        /*
         * sort the attributes by attid before storing the object
         */
-       replmd_ldb_message_sort(msg, schema);
+       replmd_ldb_message_sort(msg, ac->schema);
 
        ret = ldb_build_add_req(&down_req, ldb, ac,
                                msg,
@@ -701,16 +644,13 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                ac, replmd_op_callback,
                                req);
        if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
-       ret = replmd_notify(module, msg->dn, seq_num);
-       if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
                /* if a control is there remove if from the modified request */
        if (control && !save_controls(control, down_req, &saved_controls)) {
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -726,7 +666,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      struct ldb_message *msg,
                                      struct ldb_message_element *el,
                                      struct replPropertyMetaDataBlob *omd,
-                                     struct dsdb_schema *schema,
+                                     const struct dsdb_schema *schema,
                                      uint64_t *seq_num,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now)
@@ -789,13 +729,13 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
  * client is based on this object 
  */
 static int replmd_update_rpmd(struct ldb_module *module, 
+                             const struct dsdb_schema *schema, 
                              struct ldb_message *msg, uint64_t *seq_num)
 {
        const struct ldb_val *omd_value;
        enum ndr_err_code ndr_err;
        struct replPropertyMetaDataBlob omd;
        int i;
-       struct dsdb_schema *schema;
        time_t t = time(NULL);
        NTTIME now;
        const struct GUID *our_invocation_id;
@@ -818,7 +758,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
        /* search for the existing replPropertyMetaDataBlob */
        ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
-       if (ret != LDB_SUCCESS || res->count < 1) {
+       if (ret != LDB_SUCCESS || res->count != 1) {
                DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
                         ldb_dn_get_linearized(msg->dn)));
                return LDB_ERR_OPERATIONS_ERROR;
@@ -847,8 +787,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       schema = dsdb_get_schema(ldb);
-
        for (i=0; i<msg->num_elements; i++) {
                ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
                                                 our_invocation_id, now);
@@ -893,11 +831,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
                        return ret;
                }
 
-               ret = replmd_notify(module, msg->dn, *seq_num);
-               if (ret != LDB_SUCCESS) {
-                       return ret;
-               }
-
                el->num_values = 1;
                el->values = md_value;
        }
@@ -910,12 +843,12 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
        struct replmd_replicated_request *ac;
-       const struct dsdb_schema *schema;
        struct ldb_request *down_req;
        struct ldb_message *msg;
-       int ret;
+       struct ldb_result *res;
        time_t t = time(NULL);
        uint64_t seq_num = 0;
+       int ret;
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.mod.message->dn)) {
@@ -926,31 +859,20 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
-       schema = dsdb_get_schema(ldb);
-       if (!schema) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             "replmd_modify: no dsdb_schema loaded");
-               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
-               return LDB_ERR_CONSTRAINT_VIOLATION;
-       }
-
        ac = replmd_ctx_init(module, req);
        if (!ac) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ac->schema = schema;
-
        /* we have to copy the message as the caller might have it as a const */
        msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
        if (msg == NULL) {
+               ldb_oom(ldb);
                talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* TODO:
-        * - get the whole old object
-        * - if the old object doesn't exist report an error
         * - give an error when a readonly attribute should
         *   be modified
         * - merge the changed into the old object
@@ -959,8 +881,15 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
         *   attribute was changed
         */
 
-       ret = replmd_update_rpmd(module, msg, &seq_num);
+       ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
+       }
+
+       ret = replmd_update_rpmd(module, ac->schema, msg, &ac->seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
@@ -974,6 +903,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                                ac, replmd_op_callback,
                                req);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
        talloc_steal(down_req, msg);
@@ -983,12 +913,12 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        if (seq_num != 0) {
                if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
                        talloc_free(ac);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
                if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
                        talloc_free(ac);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
        }
 
@@ -996,22 +926,20 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        return ldb_next_request(module, down_req);
 }
 
+static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
 
 /*
   handle a rename request
 
   On a rename we need to do an extra ldb_modify which sets the
-  whenChanged and uSNChanged attributes
+  whenChanged and uSNChanged attributes.  We do this in a callback after the success.
  */
 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
-       int ret, i;
-       time_t t = time(NULL);
-       uint64_t seq_num = 0;
-       struct ldb_message *msg;
-       struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct replmd_replicated_request *ac;
+       int ret;
+       struct ldb_request *down_req;
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.mod.message->dn)) {
@@ -1022,73 +950,94 @@ static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
 
-       /* Get a sequence number from the backend */
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ac = replmd_ctx_init(module, req);
+       if (!ac) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ret = ldb_build_rename_req(&down_req, ldb, ac,
+                                  ac->req->op.rename.olddn,
+                                  ac->req->op.rename.newdn,
+                                  ac->req->controls,
+                                  ac, replmd_rename_callback,
+                                  ac->req);
+
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
-       msg = ldb_msg_new(req);
-       if (msg == NULL) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       /* go on with the call chain */
+       return ldb_next_request(module, down_req);
+}
 
-       msg->dn = req->op.rename.olddn;
+/* After the rename is compleated, update the whenchanged etc */
+static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
+{
+       struct ldb_context *ldb;
+       struct replmd_replicated_request *ac;
+       struct ldb_request *down_req;
+       struct ldb_message *msg;
+       time_t t = time(NULL);
+       int ret;
 
-       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
-               talloc_free(msg);
-               return LDB_ERR_OPERATIONS_ERROR;
+       ac = talloc_get_type(req->context, struct replmd_replicated_request);
+       ldb = ldb_module_get_ctx(ac->module);
+
+       if (ares->error != LDB_SUCCESS) {
+               return ldb_module_done(ac->req, ares->controls,
+                                       ares->response, ares->error);
        }
-       msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
 
-       if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
-               talloc_free(msg);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ares->type != LDB_REPLY_DONE) {
+               ldb_set_errstring(ldb,
+                                 "invalid ldb_reply_type in callback");
+               talloc_free(ares);
+               return ldb_module_done(ac->req, NULL, NULL,
+                                       LDB_ERR_OPERATIONS_ERROR);
        }
-       msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
 
-       ret = ldb_modify(ldb, msg);
-       talloc_free(msg);
+       /* Get a sequence number from the backend */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       ret = replmd_load_NCs(module);
-       if (ret != 0) {
-               return ret;
-       }
+       /* TODO:
+        * - replace the old object with the newly constructed one
+        */
 
-       /* now update the highest uSNs of the partitions that are
-          affected. Note that two partitions could be changing */
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
-                                       req->op.rename.olddn) == 0) {
-                       break;
-               }
-       }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
-                        ldb_dn_get_linearized(req->op.rename.olddn)));
+       msg = ldb_msg_new(ac);
+       if (msg == NULL) {
+               ldb_oom(ldb);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       replmd_private->ncs[i].mod_usn = seq_num;
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
-                                       req->op.rename.newdn) == 0) {
-                       break;
-               }
+       msg->dn = ac->req->op.rename.newdn;
+
+       ret = ldb_build_mod_req(&down_req, ldb, ac,
+                               msg,
+                               req->controls,
+                               ac, replmd_op_callback,
+                               req);
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
        }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
-                        ldb_dn_get_linearized(req->op.rename.newdn)));
-               return LDB_ERR_OPERATIONS_ERROR;
+       talloc_steal(down_req, msg);
+
+       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
        }
-       replmd_private->ncs[i].mod_usn = seq_num;
        
-       /* go on with the call chain */
-       return ldb_next_request(module, req);
+       if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
+       }
+
+       /* go on with the call chain - do the modify after the rename */
+       return ldb_next_request(ac->module, down_req);
 }
 
 
@@ -1104,44 +1053,6 @@ static int replmd_replicated_request_werror(struct replmd_replicated_request *ar
        return ret;
 }
 
-static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
-
-static int replmd_replicated_apply_add_callback(struct ldb_request *req,
-                                               struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ar = talloc_get_type(req->context,
-                                              struct replmd_replicated_request);
-       int ret;
-
-       ldb = ldb_module_get_ctx(ar->module);
-
-       if (!ares) {
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb, "Invalid reply type\n!");
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       talloc_free(ares);
-       ar->index_current++;
-
-       ret = replmd_replicated_apply_next(ar);
-       if (ret != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, NULL, NULL, ret);
-       }
-
-       return LDB_SUCCESS;
-}
-
 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 {
        struct ldb_context *ldb;
@@ -1151,7 +1062,6 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        struct replPropertyMetaDataBlob *md;
        struct ldb_val md_value;
        uint32_t i;
-       uint64_t seq_num;
        int ret;
 
        /*
@@ -1167,7 +1077,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        msg = ar->objs->objects[ar->index_current].msg;
        md = ar->objs->objects[ar->index_current].meta_data;
 
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1182,17 +1092,12 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
-       if (ret != LDB_SUCCESS) {
-               return replmd_replicated_request_error(ar, ret);
-       }
-
-       ret = replmd_notify(ar->module, msg->dn, seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1214,7 +1119,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
         * the meta data array is already sorted by the caller
         */
        for (i=0; i < md->ctr.ctr1.count; i++) {
-               md->ctr.ctr1.array[i].local_usn = seq_num;
+               md->ctr.ctr1.array[i].local_usn = ar->seq_num;
        }
        ndr_err = ndr_push_struct_blob(&md_value, msg, 
                                       lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
@@ -1243,7 +1148,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                                msg,
                                ar->controls,
                                ar,
-                               replmd_replicated_apply_add_callback,
+                               replmd_op_callback,
                                ar->req);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -1271,42 +1176,6 @@ static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMeta
        return m1->originating_usn - m2->originating_usn;
 }
 
-static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
-                                                 struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ar = talloc_get_type(req->context,
-                                              struct replmd_replicated_request);
-       int ret;
-
-       ldb = ldb_module_get_ctx(ar->module);
-
-       if (!ares) {
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb, "Invalid reply type\n!");
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       talloc_free(ares);
-       ar->index_current++;
-
-       ret = replmd_replicated_apply_next(ar);
-       if (ret != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, NULL, NULL, ret);
-       }
-
-       return LDB_SUCCESS;
-}
-
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
 {
        struct ldb_context *ldb;
@@ -1320,7 +1189,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        struct ldb_val nmd_value;
        uint32_t i,j,ni=0;
        uint32_t removed_attrs = 0;
-       uint64_t seq_num;
        int ret;
 
        ldb = ldb_module_get_ctx(ar->module);
@@ -1440,13 +1308,13 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
                  ar->index_current, msg->num_elements);
 
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
 
        for (i=0; i<ni; i++) {
-               nmd.ctr.ctr1.array[i].local_usn = seq_num;
+               nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
        }
 
        /* create the meta data value */
@@ -1467,7 +1335,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1483,11 +1351,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
        }
 
-       ret = replmd_notify(ar->module, msg->dn, seq_num);
-       if (ret != LDB_SUCCESS) {
-               return replmd_replicated_request_error(ar, ret);
-       }
-
        if (DEBUGLVL(4)) {
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
                DEBUG(4, ("DRS replication modify message:\n%s\n", s));
@@ -1500,7 +1363,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                msg,
                                ar->controls,
                                ar,
-                               replmd_replicated_apply_merge_callback,
+                               replmd_op_callback,
                                ar->req);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -1557,6 +1420,7 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        char *tmp_str;
        char *filter;
        struct ldb_request *search_req;
+       struct ldb_search_options_control *options;
 
        if (ar->index_current >= ar->objs->num_objects) {
                /* done with it, go to next stage */
@@ -1576,7 +1440,7 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        ret = ldb_build_search_req(&search_req,
                                   ldb,
                                   ar,
-                                  ar->objs->partition_dn,
+                                  NULL,
                                   LDB_SCOPE_SUBTREE,
                                   filter,
                                   NULL,
@@ -1589,7 +1453,22 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       
+
+       /* we need to cope with cross-partition links, so search for
+          the GUID over all partitions */
+       options = talloc(search_req, struct ldb_search_options_control);
+       if (options == NULL) {
+               DEBUG(0, (__location__ ": out of memory\n"));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
+
+       ret = ldb_request_add_control(search_req,
+                                     LDB_CONTROL_SEARCH_OPTIONS_OID,
+                                     true, options);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
 
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -1992,7 +1871,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        struct replmd_replicated_request *ar;
        struct ldb_control **ctrls;
        int ret, i;
-       struct dsdb_control_current_partition *partition_ctrl;
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -2016,6 +1894,8 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        if (!ar)
                return LDB_ERR_OPERATIONS_ERROR;
 
+       /* Set the flags to have the replmd_op_callback run over the full set of objects */
+       ar->apply_mode = true;
        ar->objs = objs;
        ar->schema = dsdb_get_schema(ldb);
        if (!ar->schema) {
@@ -2038,27 +1918,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
                return ret;
        }
 
-       /*
-         add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
-         tells the partition module which partition this request is
-         directed at. That is important as the partition roots appear
-         twice in the directory, once as mount points in the top
-         level store, and once as the roots of each partition. The
-         replication code wants to operate on the root of the
-         partitions, not the top level mount points
-        */
-       partition_ctrl = talloc(req, struct dsdb_control_current_partition);
-       if (partition_ctrl == NULL) {
-               if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
-       }
-       partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
-       partition_ctrl->dn = objs->partition_dn;
-
-       ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
        ar->controls = req->controls;
        req->controls = ctrls;
 
@@ -2104,6 +1963,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
 {                                         
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct dsdb_schema *schema = dsdb_get_schema(ldb);
        struct drsuapi_DsReplicaObjectIdentifier3 target;
        struct ldb_message *msg;
        struct ldb_message_element *ret_el;
@@ -2182,7 +2042,7 @@ linked_attributes[0]:
        }
 
        /* find the attribute being modified */
-       attr = dsdb_attribute_by_attributeID_id(dsdb_get_schema(ldb), la->attid);
+       attr = dsdb_attribute_by_attributeID_id(schema, la->attid);
        if (attr == NULL) {
                DEBUG(0, (__location__ ": Unable to find attributeID 0x%x\n", la->attid));
                talloc_free(tmp_ctx);
@@ -2220,7 +2080,7 @@ linked_attributes[0]:
        ret_el->values[0].data = (uint8_t *)ldb_dn_get_extended_linearized(tmp_ctx, target_dn, 1);
        ret_el->values[0].length = strlen((char *)ret_el->values[0].data);
 
-       ret = replmd_update_rpmd(module, msg, &seq_num);
+       ret = replmd_update_rpmd(module, schema, msg, &seq_num);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;