s4:dsdb Use 'partition modified' information to update @REPLCHANGED
authorAndrew Bartlett <abartlet@samba.org>
Fri, 16 Oct 2009 05:20:15 +0000 (16:20 +1100)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 21 Oct 2009 11:43:54 +0000 (22:43 +1100)
This major rework of repl_meta_data changes it from using a static
list of partitions to a dynamic list created from the controls placed
on returned ldb results.

To process these in one place, the similar but distinct callbacks are
combined into a single replmd_op_callback(), which handles both the
'normal operation' and 'inbound replication' case.

This allows new partitions to be created, and replication events for
these new partitions to be scheduled immediately.

Also in this commit: We no longer specify the target partition for new
or modified objects - instead we allow the partitions module to use
the DN as normal.  THis avoids the issue where we would create two
partition head records.

Andrew Bartlett

source4/dsdb/samdb/ldb_modules/repl_meta_data.c

index f6589d4b9d65808620ae8952f37b25e7fdefc2e7..2dd8b5c3038e34a5c27d07d0cf6fe5c87db1ef69 100644 (file)
@@ -51,8 +51,8 @@ struct replmd_private {
        struct la_entry *la_list;
        uint32_t num_ncs;
        struct nc_entry {
+               struct nc_entry *prev, *next;
                struct ldb_dn *dn;
-               struct GUID guid;
                uint64_t mod_usn;
        } *ncs;
 };
@@ -68,18 +68,21 @@ struct replmd_replicated_request {
 
        const struct dsdb_schema *schema;
 
-       struct dsdb_extended_replicated_objects *objs;
-
        /* the controls we pass down */
        struct ldb_control **controls;
 
+       /* details for the mode where we apply a bunch of inbound replication meessages */
+       bool apply_mode;
        uint32_t index_current;
+       struct dsdb_extended_replicated_objects *objs;
 
        struct ldb_message *search_msg;
 
        uint64_t seq_num;
+
 };
 
+static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
 
 /*
   initialise the module
@@ -102,130 +105,85 @@ static int replmd_init(struct ldb_module *module)
 }
 
 
-static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
-{
-       return ldb_dn_compare(n1->dn, n2->dn);
-}
-
 /*
-  build the list of partition DNs for use by replmd_notify()
+ * Callback for most write operations in this module:
+ * 
+ * notify the repl task that a object has changed. The notifies are
+ * gathered up in the replmd_private structure then written to the
+ * @REPLCHANGED object in each partition during the prepare_commit
  */
-static int replmd_load_NCs(struct ldb_module *module)
+static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
 {
-       const char *attrs[] = { "namingContexts", NULL };
-       struct ldb_result *res = NULL;
-       int i, ret;
-       TALLOC_CTX *tmp_ctx;
-       struct ldb_context *ldb;
-       struct ldb_message_element *el;
+       int ret;
+       struct replmd_replicated_request *ac = 
+               talloc_get_type_abort(req->context, struct replmd_replicated_request);
        struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-
-       if (replmd_private->ncs != NULL) {
-               return LDB_SUCCESS;
-       }
-
-       ldb = ldb_module_get_ctx(module);
-       tmp_ctx = talloc_new(module);
-
-       /* load the list of naming contexts */
-       ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
-                        LDB_SCOPE_BASE, attrs, NULL);
-       if (ret != LDB_SUCCESS ||
-           res->count != 1) {
-               DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+               talloc_get_type_abort(ldb_module_get_private(ac->module), struct replmd_private);
+       struct nc_entry *modified_partition;
+       struct ldb_control *partition_ctrl;
+       const struct dsdb_control_current_partition *partition;
 
-       el = ldb_msg_find_element(res->msgs[0], "namingContexts");
-       if (el == NULL) {
-               DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
-               return LDB_ERR_OPERATIONS_ERROR;                
+       if (ares->error != LDB_SUCCESS) {
+               return ldb_module_done(ac->req, ares->controls,
+                                       ares->response, ares->error);
        }
 
-       replmd_private->num_ncs = el->num_values;
-       replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
-                                          replmd_private->num_ncs);
-       if (replmd_private->ncs == NULL) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ares->type != LDB_REPLY_DONE) {
+               ldb_set_errstring(ldb_module_get_ctx(ac->module), "Invalid reply type for notify\n!");
+               return ldb_module_done(ac->req, NULL,
+                                      NULL, LDB_ERR_OPERATIONS_ERROR);
        }
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               replmd_private->ncs[i].dn = 
-                       ldb_dn_from_ldb_val(replmd_private->ncs, 
-                                           ldb, &el->values[i]);
-               replmd_private->ncs[i].mod_usn = 0;
+       partition_ctrl = ldb_reply_get_control(ares, DSDB_CONTROL_CURRENT_PARTITION_OID);
+       if (!partition_ctrl) {
+               return ldb_module_done(ac->req, NULL,
+                                      NULL, LDB_ERR_OPERATIONS_ERROR);
        }
-
-       talloc_free(res);
-
-       /* now find the GUIDs of each of those DNs */
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               const char *attrs2[] = { "objectGUID", NULL };
-               ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
-                                LDB_SCOPE_BASE, attrs2, NULL);
-               if (ret != LDB_SUCCESS ||
-                   res->count != 1) {
-                       /* this happens when the schema is first being
-                          setup */
-                       talloc_free(replmd_private->ncs);
-                       replmd_private->ncs = NULL;
-                       replmd_private->num_ncs = 0;
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
-               replmd_private->ncs[i].guid = 
-                       samdb_result_guid(res->msgs[0], "objectGUID");
-               talloc_free(res);
-       }       
-
-       /* sort the NCs into order, most to least specific */
-       qsort(replmd_private->ncs, replmd_private->num_ncs,
-             sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
-
+       partition = talloc_get_type_abort(partition_ctrl->data,
+                                   struct dsdb_control_current_partition);
        
-       talloc_free(tmp_ctx);
-       
-       return LDB_SUCCESS;
-}
-
-
-/*
- * notify the repl task that a object has changed. The notifies are
- * gathered up in the replmd_private structure then written to the
- * @REPLCHANGED object in each partition during the prepare_commit
- */
-static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
-{
-       int ret, i;
-       struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-
-       ret = replmd_load_NCs(module);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-       if (replmd_private->num_ncs == 0) {
-               return LDB_SUCCESS;
-       }
+       if (ac->seq_num > 0) {
+               for (modified_partition = replmd_private->ncs; modified_partition; 
+                    modified_partition = modified_partition->next) {
+                       if (ldb_dn_compare(modified_partition->dn, partition->dn) == 0) {
+                               break;
+                       }
+               }
+               
+               if (modified_partition == NULL) {
+                       modified_partition = talloc_zero(replmd_private, struct nc_entry);
+                       if (!modified_partition) {
+                               ldb_oom(ldb_module_get_ctx(ac->module));
+                               return ldb_module_done(ac->req, NULL,
+                                                      NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       modified_partition->dn = ldb_dn_copy(modified_partition, partition->dn);
+                       if (!modified_partition->dn) {
+                               ldb_oom(ldb_module_get_ctx(ac->module));
+                               return ldb_module_done(ac->req, NULL,
+                                                      NULL, LDB_ERR_OPERATIONS_ERROR);
+                       }
+                       DLIST_ADD(replmd_private->ncs, modified_partition);
+               }
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
-                       break;
+               if (ac->seq_num > modified_partition->mod_usn) {
+                       modified_partition->mod_usn = ac->seq_num;
                }
        }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
-                        ldb_dn_get_linearized(dn)));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
-       if (uSN > replmd_private->ncs[i].mod_usn) {
-           replmd_private->ncs[i].mod_usn = uSN;
+       if (ac->apply_mode) {
+               talloc_free(ares);
+               ac->index_current++;
+               
+               ret = replmd_replicated_apply_next(ac);
+               if (ret != LDB_SUCCESS) {
+                       return ldb_module_done(ac->req, NULL, NULL, ret);
+               }
+               return ret;
+       } else {
+               return ldb_module_done(ac->req, ares->controls,
+                                      ares->response, LDB_SUCCESS);
        }
-
-       return LDB_SUCCESS;
 }
 
 
@@ -235,25 +193,19 @@ static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t
  */
 static int replmd_notify_store(struct ldb_module *module)
 {
-       int i;
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
        struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct nc_entry *modified_partition;
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
+       for (modified_partition = replmd_private->ncs; modified_partition; 
+            modified_partition = modified_partition->next) {
                int ret;
 
-               if (replmd_private->ncs[i].mod_usn == 0) {
-                       /* this partition has not changed in this
-                          transaction */
-                       continue;
-               }
-
-               ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
-                                             replmd_private->ncs[i].mod_usn);
+               ret = dsdb_save_partition_usn(ldb, modified_partition->dn, modified_partition->mod_usn);
                if (ret != LDB_SUCCESS) {
                        DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
-                                ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
+                                ldb_dn_get_linearized(modified_partition->dn)));
                        return ret;
                }
        }
@@ -281,6 +233,15 @@ static struct replmd_replicated_request *replmd_ctx_init(struct ldb_module *modu
 
        ac->module = module;
        ac->req = req;
+
+       ac->schema = dsdb_get_schema(ldb);
+       if (!ac->schema) {
+               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                             "replmd_modify: no dsdb_schema loaded");
+               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
+               return NULL;
+       }
+
        return ac;
 }
 
@@ -427,48 +388,12 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
                  discard_const_p(void, schema), (ldb_qsort_cmp_fn_t)replmd_ldb_message_element_attid_sort);
 }
 
-static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ac;
-       int ret;
-
-       ac = talloc_get_type(req->context, struct replmd_replicated_request);
-       ldb = ldb_module_get_ctx(ac->module);
-
-       if (!ares) {
-               return ldb_module_done(ac->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ac->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb,
-                                 "invalid ldb_reply_type in callback");
-               talloc_free(ares);
-               return ldb_module_done(ac->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       ret = replmd_notify(ac->module, req->op.add.message->dn, ac->seq_num);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
-       return ldb_module_done(ac->req, ares->controls,
-                               ares->response, LDB_SUCCESS);
-}
-
 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
         struct ldb_control *control;
         struct ldb_control **saved_controls;
        struct replmd_replicated_request *ac;
-       const struct dsdb_schema *schema;
        enum ndr_err_code ndr_err;
        struct ldb_request *down_req;
        struct ldb_message *msg;
@@ -501,21 +426,11 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_add\n");
 
-       schema = dsdb_get_schema(ldb);
-       if (!schema) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             "replmd_add: no dsdb_schema loaded");
-               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
-               return LDB_ERR_CONSTRAINT_VIOLATION;
-       }
-
        ac = replmd_ctx_init(module, req);
        if (!ac) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ac->schema = schema;
-
         guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
        if ( guid_blob != NULL ) {
                if( !allow_add_guid ) {
@@ -624,7 +539,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 
                if (e->name[0] == '@') continue;
 
-               sa = dsdb_attribute_by_lDAPDisplayName(schema, e->name);
+               sa = dsdb_attribute_by_lDAPDisplayName(ac->schema, e->name);
                if (!sa) {
                        ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                                      "replmd_add: attribute '%s' not defined in schema\n",
@@ -655,7 +570,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        /*
         * sort meta data array, and move the rdn attribute entry to the end
         */
-       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, schema, msg->dn);
+       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ac->schema, msg->dn);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
@@ -718,7 +633,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        /*
         * sort the attributes by attid before storing the object
         */
-       replmd_ldb_message_sort(msg, schema);
+       replmd_ldb_message_sort(msg, ac->schema);
 
        ret = ldb_build_add_req(&down_req, ldb, ac,
                                msg,
@@ -748,7 +663,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      struct ldb_message *msg,
                                      struct ldb_message_element *el,
                                      struct replPropertyMetaDataBlob *omd,
-                                     struct dsdb_schema *schema,
+                                     const struct dsdb_schema *schema,
                                      uint64_t *seq_num,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now)
@@ -811,7 +726,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
  * client is based on this object 
  */
 static int replmd_update_rpmd(struct ldb_module *module, 
-                             struct dsdb_schema *schema, 
+                             const struct dsdb_schema *schema, 
                              struct ldb_message *msg, uint64_t *seq_num)
 {
        const struct ldb_val *omd_value;
@@ -925,7 +840,6 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
        struct replmd_replicated_request *ac;
-       const struct dsdb_schema *schema;
        struct ldb_request *down_req;
        struct ldb_message *msg;
        struct ldb_result *res;
@@ -942,21 +856,11 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
-       schema = dsdb_get_schema(ldb);
-       if (!schema) {
-               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             "replmd_modify: no dsdb_schema loaded");
-               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
-               return LDB_ERR_CONSTRAINT_VIOLATION;
-       }
-
        ac = replmd_ctx_init(module, req);
        if (!ac) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ac->schema = schema;
-
        /* we have to copy the message as the caller might have it as a const */
        msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
        if (msg == NULL) {
@@ -1019,22 +923,20 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        return ldb_next_request(module, down_req);
 }
 
+static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares);
 
 /*
   handle a rename request
 
   On a rename we need to do an extra ldb_modify which sets the
-  whenChanged and uSNChanged attributes
+  whenChanged and uSNChanged attributes.  We do this in a callback after the success.
  */
 static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
-       int ret, i;
-       time_t t = time(NULL);
-       uint64_t seq_num = 0;
-       struct ldb_message *msg;
-       struct replmd_private *replmd_private = 
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct replmd_replicated_request *ac;
+       int ret;
+       struct ldb_request *down_req;
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.mod.message->dn)) {
@@ -1045,73 +947,94 @@ static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
 
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
 
-       /* Get a sequence number from the backend */
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ac = replmd_ctx_init(module, req);
+       if (!ac) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ret = ldb_build_rename_req(&down_req, ldb, ac,
+                                  ac->req->op.rename.olddn,
+                                  ac->req->op.rename.newdn,
+                                  ac->req->controls,
+                                  ac, replmd_rename_callback,
+                                  ac->req);
+
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
-       msg = ldb_msg_new(req);
-       if (msg == NULL) {
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       /* go on with the call chain */
+       return ldb_next_request(module, down_req);
+}
+
+/* After the rename is compleated, update the whenchanged etc */
+static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *ares)
+{
+       struct ldb_context *ldb;
+       struct replmd_replicated_request *ac;
+       struct ldb_request *down_req;
+       struct ldb_message *msg;
+       time_t t = time(NULL);
+       int ret;
 
-       msg->dn = req->op.rename.olddn;
+       ac = talloc_get_type(req->context, struct replmd_replicated_request);
+       ldb = ldb_module_get_ctx(ac->module);
 
-       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
-               talloc_free(msg);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ares->error != LDB_SUCCESS) {
+               return ldb_module_done(ac->req, ares->controls,
+                                       ares->response, ares->error);
        }
-       msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
 
-       if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
-               talloc_free(msg);
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (ares->type != LDB_REPLY_DONE) {
+               ldb_set_errstring(ldb,
+                                 "invalid ldb_reply_type in callback");
+               talloc_free(ares);
+               return ldb_module_done(ac->req, NULL, NULL,
+                                       LDB_ERR_OPERATIONS_ERROR);
        }
-       msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
 
-       ret = ldb_modify(ldb, msg);
-       talloc_free(msg);
+       /* Get a sequence number from the backend */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
        if (ret != LDB_SUCCESS) {
                return ret;
        }
 
-       ret = replmd_load_NCs(module);
-       if (ret != 0) {
-               return ret;
-       }
+       /* TODO:
+        * - replace the old object with the newly constructed one
+        */
 
-       /* now update the highest uSNs of the partitions that are
-          affected. Note that two partitions could be changing */
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
-                                       req->op.rename.olddn) == 0) {
-                       break;
-               }
-       }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
-                        ldb_dn_get_linearized(req->op.rename.olddn)));
+       msg = ldb_msg_new(ac);
+       if (msg == NULL) {
+               ldb_oom(ldb);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       replmd_private->ncs[i].mod_usn = seq_num;
 
-       for (i=0; i<replmd_private->num_ncs; i++) {
-               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
-                                       req->op.rename.newdn) == 0) {
-                       break;
-               }
+       msg->dn = ac->req->op.rename.newdn;
+
+       ret = ldb_build_mod_req(&down_req, ldb, ac,
+                               msg,
+                               req->controls,
+                               ac, replmd_op_callback,
+                               req);
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
        }
-       if (i == replmd_private->num_ncs) {
-               DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
-                        ldb_dn_get_linearized(req->op.rename.newdn)));
-               return LDB_ERR_OPERATIONS_ERROR;
+       talloc_steal(down_req, msg);
+
+       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
        }
-       replmd_private->ncs[i].mod_usn = seq_num;
        
-       /* go on with the call chain */
-       return ldb_next_request(module, req);
+       if (add_uint64_element(msg, "uSNChanged", ac->seq_num) != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
+       }
+
+       /* go on with the call chain - do the modify after the rename */
+       return ldb_next_request(ac->module, down_req);
 }
 
 
@@ -1127,44 +1050,6 @@ static int replmd_replicated_request_werror(struct replmd_replicated_request *ar
        return ret;
 }
 
-static int replmd_replicated_apply_next(struct replmd_replicated_request *ar);
-
-static int replmd_replicated_apply_add_callback(struct ldb_request *req,
-                                               struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ar = talloc_get_type(req->context,
-                                              struct replmd_replicated_request);
-       int ret;
-
-       ldb = ldb_module_get_ctx(ar->module);
-
-       if (!ares) {
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb, "Invalid reply type\n!");
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       talloc_free(ares);
-       ar->index_current++;
-
-       ret = replmd_replicated_apply_next(ar);
-       if (ret != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, NULL, NULL, ret);
-       }
-
-       return LDB_SUCCESS;
-}
-
 static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 {
        struct ldb_context *ldb;
@@ -1174,7 +1059,6 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        struct replPropertyMetaDataBlob *md;
        struct ldb_val md_value;
        uint32_t i;
-       uint64_t seq_num;
        int ret;
 
        /*
@@ -1190,7 +1074,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
        msg = ar->objs->objects[ar->index_current].msg;
        md = ar->objs->objects[ar->index_current].meta_data;
 
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1205,17 +1089,12 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
 
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
-       if (ret != LDB_SUCCESS) {
-               return replmd_replicated_request_error(ar, ret);
-       }
-
-       ret = replmd_notify(ar->module, msg->dn, seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1237,7 +1116,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
         * the meta data array is already sorted by the caller
         */
        for (i=0; i < md->ctr.ctr1.count; i++) {
-               md->ctr.ctr1.array[i].local_usn = seq_num;
+               md->ctr.ctr1.array[i].local_usn = ar->seq_num;
        }
        ndr_err = ndr_push_struct_blob(&md_value, msg, 
                                       lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
@@ -1266,7 +1145,7 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                                msg,
                                ar->controls,
                                ar,
-                               replmd_replicated_apply_add_callback,
+                               replmd_op_callback,
                                ar->req);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -1294,42 +1173,6 @@ static int replmd_replPropertyMetaData1_conflict_compare(struct replPropertyMeta
        return m1->originating_usn - m2->originating_usn;
 }
 
-static int replmd_replicated_apply_merge_callback(struct ldb_request *req,
-                                                 struct ldb_reply *ares)
-{
-       struct ldb_context *ldb;
-       struct replmd_replicated_request *ar = talloc_get_type(req->context,
-                                              struct replmd_replicated_request);
-       int ret;
-
-       ldb = ldb_module_get_ctx(ar->module);
-
-       if (!ares) {
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-       if (ares->error != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, ares->controls,
-                                       ares->response, ares->error);
-       }
-
-       if (ares->type != LDB_REPLY_DONE) {
-               ldb_set_errstring(ldb, "Invalid reply type\n!");
-               return ldb_module_done(ar->req, NULL, NULL,
-                                       LDB_ERR_OPERATIONS_ERROR);
-       }
-
-       talloc_free(ares);
-       ar->index_current++;
-
-       ret = replmd_replicated_apply_next(ar);
-       if (ret != LDB_SUCCESS) {
-               return ldb_module_done(ar->req, NULL, NULL, ret);
-       }
-
-       return LDB_SUCCESS;
-}
-
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
 {
        struct ldb_context *ldb;
@@ -1343,7 +1186,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        struct ldb_val nmd_value;
        uint32_t i,j,ni=0;
        uint32_t removed_attrs = 0;
-       uint64_t seq_num;
        int ret;
 
        ldb = ldb_module_get_ctx(ar->module);
@@ -1463,13 +1305,13 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
                  ar->index_current, msg->num_elements);
 
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
 
        for (i=0; i<ni; i++) {
-               nmd.ctr.ctr1.array[i].local_usn = seq_num;
+               nmd.ctr.ctr1.array[i].local_usn = ar->seq_num;
        }
 
        /* create the meta data value */
@@ -1490,7 +1332,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ar->seq_num);
        if (ret != LDB_SUCCESS) {
                return replmd_replicated_request_error(ar, ret);
        }
@@ -1506,11 +1348,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
        }
 
-       ret = replmd_notify(ar->module, msg->dn, seq_num);
-       if (ret != LDB_SUCCESS) {
-               return replmd_replicated_request_error(ar, ret);
-       }
-
        if (DEBUGLVL(4)) {
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
                DEBUG(4, ("DRS replication modify message:\n%s\n", s));
@@ -1523,7 +1360,7 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                msg,
                                ar->controls,
                                ar,
-                               replmd_replicated_apply_merge_callback,
+                               replmd_op_callback,
                                ar->req);
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -1580,6 +1417,7 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        char *tmp_str;
        char *filter;
        struct ldb_request *search_req;
+       struct ldb_search_options_control *options;
 
        if (ar->index_current >= ar->objs->num_objects) {
                /* done with it, go to next stage */
@@ -1599,7 +1437,7 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        ret = ldb_build_search_req(&search_req,
                                   ldb,
                                   ar,
-                                  ar->objs->partition_dn,
+                                  NULL,
                                   LDB_SCOPE_SUBTREE,
                                   filter,
                                   NULL,
@@ -1612,7 +1450,22 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       
+
+       /* we need to cope with cross-partition links, so search for
+          the GUID over all partitions */
+       options = talloc(search_req, struct ldb_search_options_control);
+       if (options == NULL) {
+               DEBUG(0, (__location__ ": out of memory\n"));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       options->search_options = LDB_SEARCH_OPTION_PHANTOM_ROOT;
+
+       ret = ldb_request_add_control(search_req,
+                                     LDB_CONTROL_SEARCH_OPTIONS_OID,
+                                     true, options);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
 
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
@@ -2015,7 +1868,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        struct replmd_replicated_request *ar;
        struct ldb_control **ctrls;
        int ret, i;
-       struct dsdb_control_current_partition *partition_ctrl;
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -2039,6 +1891,8 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        if (!ar)
                return LDB_ERR_OPERATIONS_ERROR;
 
+       /* Set the flags to have the replmd_op_callback run over the full set of objects */
+       ar->apply_mode = true;
        ar->objs = objs;
        ar->schema = dsdb_get_schema(ldb);
        if (!ar->schema) {
@@ -2061,27 +1915,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
                return ret;
        }
 
-       /*
-         add the DSDB_CONTROL_CURRENT_PARTITION_OID control. This
-         tells the partition module which partition this request is
-         directed at. That is important as the partition roots appear
-         twice in the directory, once as mount points in the top
-         level store, and once as the roots of each partition. The
-         replication code wants to operate on the root of the
-         partitions, not the top level mount points
-        */
-       partition_ctrl = talloc(req, struct dsdb_control_current_partition);
-       if (partition_ctrl == NULL) {
-               if (!partition_ctrl) return replmd_replicated_request_werror(ar, WERR_NOMEM);
-       }
-       partition_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
-       partition_ctrl->dn = objs->partition_dn;
-
-       ret = ldb_request_add_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID, false, partition_ctrl);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
        ar->controls = req->controls;
        req->controls = ctrls;