dsdb: audit samdb and password changes
[sfrench/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index b4dacccd8e83069f4f0e0ad9be5703116119fc36..7395f52524ebe427da01cb98671076718900e347 100644 (file)
@@ -39,7 +39,9 @@
 #include "ldb_module.h"
 #include "dsdb/samdb/samdb.h"
 #include "dsdb/common/proto.h"
+#include "dsdb/common/util.h"
 #include "../libds/common/flags.h"
+#include "librpc/gen_ndr/irpc.h"
 #include "librpc/gen_ndr/ndr_misc.h"
 #include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/security/security.h"
 #include "lib/util/dlinklist.h"
 #include "dsdb/samdb/ldb_modules/util.h"
-#include "lib/util/binsearch.h"
 #include "lib/util/tsort.h"
 
+#undef DBGC_CLASS
+#define DBGC_CLASS            DBGC_DRS_REPL
+
+/* the RMD_VERSION for linked attributes starts from 1 */
+#define RMD_VERSION_INITIAL   1
+
 /*
  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
  * Deleted Objects Container
@@ -59,8 +66,6 @@ static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000UL
 struct replmd_private {
        TALLOC_CTX *la_ctx;
        struct la_entry *la_list;
-       TALLOC_CTX *bl_ctx;
-       struct la_backlink *la_backlinks;
        struct nc_entry {
                struct nc_entry *prev, *next;
                struct ldb_dn *dn;
@@ -69,11 +74,13 @@ struct replmd_private {
        } *ncs;
        struct ldb_dn *schema_dn;
        bool originating_updates;
+       bool sorted_links;
 };
 
 struct la_entry {
        struct la_entry *next, *prev;
        struct drsuapi_DsReplicaLinkedAttribute *la;
+       uint32_t dsdb_repl_flags;
 };
 
 struct replmd_replicated_request {
@@ -86,6 +93,13 @@ struct replmd_replicated_request {
        /* the controls we pass down */
        struct ldb_control **controls;
 
+       /*
+        * Backlinks for the replmd_add() case (we want to create
+        * backlinks after creating the user, but before the end of
+        * the ADD request) 
+        */
+       struct la_backlink *la_backlinks;
+
        /* details for the mode where we apply a bunch of inbound replication meessages */
        bool apply_mode;
        uint32_t index_current;
@@ -102,6 +116,23 @@ struct replmd_replicated_request {
 
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
 static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete);
+static int replmd_check_upgrade_links(struct ldb_context *ldb,
+                                     struct parsed_dn *dns, uint32_t count,
+                                     struct ldb_message_element *el,
+                                     const char *ldap_oid);
+static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
+                                         struct la_entry *la);
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted);
+
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid);
 
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
@@ -224,7 +255,6 @@ static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
        return false;
 }
 
-
 static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
 
 /*
@@ -236,16 +266,37 @@ static int replmd_init(struct ldb_module *module)
 {
        struct replmd_private *replmd_private;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-
+       static const char *samba_dsdb_attrs[] = { SAMBA_COMPATIBLE_FEATURES_ATTR, NULL };
+       struct ldb_dn *samba_dsdb_dn;
+       struct ldb_result *res;
+       int ret;
+       TALLOC_CTX *frame = talloc_stackframe();
        replmd_private = talloc_zero(module, struct replmd_private);
        if (replmd_private == NULL) {
                ldb_oom(ldb);
+               TALLOC_FREE(frame);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        ldb_module_set_private(module, replmd_private);
 
        replmd_private->schema_dn = ldb_get_schema_basedn(ldb);
 
+       samba_dsdb_dn = ldb_dn_new(frame, ldb, "@SAMBA_DSDB");
+       if (!samba_dsdb_dn) {
+               TALLOC_FREE(frame);
+               return ldb_oom(ldb);
+       }
+
+       ret = dsdb_module_search_dn(module, frame, &res, samba_dsdb_dn,
+                                   samba_dsdb_attrs, DSDB_FLAG_NEXT_MODULE, NULL);
+       if (ret == LDB_SUCCESS) {
+               replmd_private->sorted_links
+                       = ldb_msg_check_string_attribute(res->msgs[0],
+                                                        SAMBA_COMPATIBLE_FEATURES_ATTR,
+                                                        SAMBA_SORTED_LINKS_FEATURE);
+       }
+       TALLOC_FREE(frame);
+
        return ldb_next_init(module);
 }
 
@@ -258,16 +309,14 @@ static void replmd_txn_cleanup(struct replmd_private *replmd_private)
        replmd_private->la_list = NULL;
        replmd_private->la_ctx = NULL;
 
-       talloc_free(replmd_private->bl_ctx);
-       replmd_private->la_backlinks = NULL;
-       replmd_private->bl_ctx = NULL;
 }
 
 
 struct la_backlink {
        struct la_backlink *next, *prev;
        const char *attr_name;
-       struct GUID forward_guid, target_guid;
+       struct ldb_dn *forward_dn;
+       struct GUID target_guid;
        bool active;
 };
 
@@ -330,7 +379,7 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
        int ret;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *msg;
-       TALLOC_CTX *tmp_ctx = talloc_new(bl);
+       TALLOC_CTX *frame = talloc_stackframe();
        char *dn_string;
 
        /*
@@ -339,39 +388,45 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
          - construct ldb_message
               - either an add or a delete
         */
-       ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->target_guid, &target_dn, parent);
+       ret = dsdb_module_dn_by_guid(module, frame, &bl->target_guid, &target_dn, parent);
        if (ret != LDB_SUCCESS) {
-               DEBUG(2,(__location__ ": WARNING: Failed to find target DN for linked attribute with GUID %s\n",
-                        GUID_string(bl, &bl->target_guid)));
+               struct GUID_txt_buf guid_str;
+               DBG_WARNING("Failed to find target DN for linked attribute with GUID %s\n",
+                           GUID_buf_string(&bl->target_guid, &guid_str));
+               DBG_WARNING("Please run 'samba-tool dbcheck' to resolve any missing backlinks.\n");
+               talloc_free(frame);
                return LDB_SUCCESS;
        }
 
-       ret = dsdb_module_dn_by_guid(module, tmp_ctx, &bl->forward_guid, &source_dn, parent);
-       if (ret != LDB_SUCCESS) {
-               ldb_asprintf_errstring(ldb, "Failed to find source DN for linked attribute with GUID %s\n",
-                                      GUID_string(bl, &bl->forward_guid));
-               talloc_free(tmp_ctx);
-               return ret;
+       msg = ldb_msg_new(frame);
+       if (msg == NULL) {
+               ldb_module_oom(module);
+               talloc_free(frame);
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       msg = ldb_msg_new(tmp_ctx);
-       if (msg == NULL) {
+       source_dn = ldb_dn_copy(frame, bl->forward_dn);
+       if (!source_dn) {
                ldb_module_oom(module);
-               talloc_free(tmp_ctx);
+               talloc_free(frame);
                return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               /* Filter down to the attributes we want in the backlink */
+               const char *accept[] = { "GUID", "SID", NULL };
+               ldb_dn_extended_filter(source_dn, accept);
        }
 
        /* construct a ldb_message for adding/deleting the backlink */
        msg->dn = target_dn;
-       dn_string = ldb_dn_get_extended_linearized(tmp_ctx, source_dn, 1);
+       dn_string = ldb_dn_get_extended_linearized(frame, bl->forward_dn, 1);
        if (!dn_string) {
                ldb_module_oom(module);
-               talloc_free(tmp_ctx);
+               talloc_free(frame);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        ret = ldb_msg_add_steal_string(msg, bl->attr_name, dn_string);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
+               talloc_free(frame);
                return ret;
        }
        msg->elements[0].flags = bl->active?LDB_FLAG_MOD_ADD:LDB_FLAG_MOD_DELETE;
@@ -399,27 +454,36 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
                                       ldb_dn_get_linearized(source_dn),
                                       ldb_dn_get_linearized(target_dn),
                                       ldb_errstring(ldb));
-               talloc_free(tmp_ctx);
+               talloc_free(frame);
                return ret;
        }
-       talloc_free(tmp_ctx);
+       talloc_free(frame);
        return ret;
 }
 
 /*
   add a backlink to the list of backlinks to add/delete in the prepare
   commit
+
+  forward_dn is stolen onto the defereed context
  */
-static int replmd_add_backlink(struct ldb_module *module,
-                              struct replmd_private *replmd_private,
-                              const struct dsdb_schema *schema,
-                              struct GUID *forward_guid,
-                              struct GUID *target_guid, bool active,
-                              const struct dsdb_attribute *schema_attr,
-                              bool immediate)
+static int replmd_defer_add_backlink(struct ldb_module *module,
+                                    struct replmd_private *replmd_private,
+                                    const struct dsdb_schema *schema,
+                                    struct replmd_replicated_request *ac,
+                                    struct ldb_dn *forward_dn,
+                                    struct GUID *target_guid, bool active,
+                                    const struct dsdb_attribute *schema_attr,
+                                    struct ldb_request *parent)
 {
        const struct dsdb_attribute *target_attr;
        struct la_backlink *bl;
+       
+       bl = talloc(ac, struct la_backlink);
+       if (bl == NULL) {
+               ldb_module_oom(module);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
 
        target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
        if (!target_attr) {
@@ -432,64 +496,50 @@ static int replmd_add_backlink(struct ldb_module *module,
                return LDB_SUCCESS;
        }
 
-       /* see if its already in the list */
-       for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
-               if (GUID_equal(forward_guid, &bl->forward_guid) &&
-                   GUID_equal(target_guid, &bl->target_guid) &&
-                   (target_attr->lDAPDisplayName == bl->attr_name ||
-                    strcmp(target_attr->lDAPDisplayName, bl->attr_name) == 0)) {
-                       break;
-               }
-       }
-
-       if (bl) {
-               /* we found an existing one */
-               if (bl->active == active) {
-                       return LDB_SUCCESS;
-               }
-               DLIST_REMOVE(replmd_private->la_backlinks, bl);
-               talloc_free(bl);
-               return LDB_SUCCESS;
-       }
-
-       if (replmd_private->bl_ctx == NULL) {
-               replmd_private->bl_ctx = talloc_new(replmd_private);
-               if (replmd_private->bl_ctx == NULL) {
-                       ldb_module_oom(module);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-       }
-
-       /* its a new one */
-       bl = talloc(replmd_private->bl_ctx, struct la_backlink);
-       if (bl == NULL) {
-               ldb_module_oom(module);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       /* Ensure the schema does not go away before the bl->attr_name is used */
-       if (!talloc_reference(bl, schema)) {
-               talloc_free(bl);
-               ldb_module_oom(module);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        bl->attr_name = target_attr->lDAPDisplayName;
-       bl->forward_guid = *forward_guid;
+       bl->forward_dn = talloc_steal(bl, forward_dn);
        bl->target_guid = *target_guid;
        bl->active = active;
 
-       /* the caller may ask for this backlink to be processed
-          immediately */
-       if (immediate) {
-               int ret = replmd_process_backlink(module, bl, NULL);
-               talloc_free(bl);
-               return ret;
+       DLIST_ADD(ac->la_backlinks, bl);
+
+       return LDB_SUCCESS;
+}
+
+/*
+  add a backlink to the list of backlinks to add/delete in the prepare
+  commit
+ */
+static int replmd_add_backlink(struct ldb_module *module,
+                              struct replmd_private *replmd_private,
+                              const struct dsdb_schema *schema,
+                              struct ldb_dn *forward_dn,
+                              struct GUID *target_guid, bool active,
+                              const struct dsdb_attribute *schema_attr,
+                              struct ldb_request *parent)
+{
+       const struct dsdb_attribute *target_attr;
+       struct la_backlink bl;
+       int ret;
+       
+       target_attr = dsdb_attribute_by_linkID(schema, schema_attr->linkID ^ 1);
+       if (!target_attr) {
+               /*
+                * windows 2003 has a broken schema where the
+                * definition of msDS-IsDomainFor is missing (which is
+                * supposed to be the backlink of the
+                * msDS-HasDomainNCs attribute
+                */
+               return LDB_SUCCESS;
        }
 
-       DLIST_ADD(replmd_private->la_backlinks, bl);
+       bl.attr_name = target_attr->lDAPDisplayName;
+       bl.forward_dn = forward_dn;
+       bl.target_guid = *target_guid;
+       bl.active = active;
 
-       return LDB_SUCCESS;
+       ret = replmd_process_backlink(module, &bl, parent);
+       return ret;
 }
 
 
@@ -527,9 +577,41 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               struct GUID_txt_buf guid_txt;
+               struct ldb_message *msg = NULL;
+               char *s = NULL;
+
+               if (ac->apply_mode == false) {
+                       DBG_NOTICE("Originating update failure. Error is: %s\n",
+                                  ldb_strerror(ares->error));
+                       return ldb_module_done(ac->req, controls,
+                                              ares->response, ares->error);
+               }
+
+               msg = ac->objs->objects[ac->index_current].msg;
+               /*
+                * Set at DBG_NOTICE as once these start to happe, they
+                * will happen a lot until resolved, due to repeated
+                * replication.  The caller will probably print the
+                * ldb error string anyway.
+                */
+               DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
+                          ldb_dn_get_linearized(msg->dn),
+                          ldb_strerror(ares->error));
+
+               s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
+                                                    ac,
+                                                    LDB_CHANGETYPE_ADD,
+                                                    msg);
+
+               DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
+                        ac->search_msg == NULL ? "ADD" : "MODIFY",
+                        GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
+                                        &guid_txt),
+                        s);
+               talloc_free(s);
                return ldb_module_done(ac->req, controls,
-                                       ares->response, ares->error);
+                                      ares->response, ares->error);
        }
 
        if (ares->type != LDB_REPLY_DONE) {
@@ -538,6 +620,23 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
                                       NULL, LDB_ERR_OPERATIONS_ERROR);
        }
 
+       if (ac->apply_mode == false) {
+               struct la_backlink *bl;
+               /*
+                * process our backlink list after an replmd_add(),
+                * creating and deleting backlinks as necessary (this
+                * code is sync).  The other cases are handled inline
+                * with the modify.
+                */
+               for (bl=ac->la_backlinks; bl; bl=bl->next) {
+                       ret = replmd_process_backlink(ac->module, bl, ac->req);
+                       if (ret != LDB_SUCCESS) {
+                               return ldb_module_done(ac->req, NULL,
+                                                      NULL, ret);
+                       }
+               }
+       }
+       
        if (!partition_ctrl) {
                ldb_set_errstring(ldb_module_get_ctx(ac->module),"No partition control on reply");
                return ldb_module_done(ac->req, NULL,
@@ -837,76 +936,126 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
 }
 
 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime);
 
+static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
+
+static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
+                         struct ldb_message_element *el, struct parsed_dn **pdn,
+                         const char *ldap_oid, struct ldb_request *parent);
+
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn);
 
 /*
   fix up linked attributes in replmd_add.
   This involves setting up the right meta-data in extended DN
   components, and creating backlinks to the object
  */
-static int replmd_add_fix_la(struct ldb_module *module,
+static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                             struct replmd_private *replmd_private,
                             struct ldb_message_element *el,
-                            uint64_t seq_num, const struct GUID *invocationId, NTTIME now,
-                            struct GUID *guid, const struct dsdb_attribute *sa, struct ldb_request *parent)
+                            struct replmd_replicated_request *ac,
+                            NTTIME now,
+                            struct ldb_dn *forward_dn,
+                            const struct dsdb_attribute *sa,
+                            struct ldb_request *parent)
 {
        unsigned int i;
-       TALLOC_CTX *tmp_ctx = talloc_new(el->values);
+       TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-
+       struct parsed_dn *pdn;
        /* We will take a reference to the schema in replmd_add_backlink */
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, NULL);
+       struct ldb_val *new_values = NULL;
+       int ret;
 
-       for (i=0; i<el->num_values; i++) {
-               struct ldb_val *v = &el->values[i];
-               struct dsdb_dn *dsdb_dn = dsdb_dn_parse(tmp_ctx, ldb, v, sa->syntax->ldap_oid);
-               struct GUID target_guid;
-               NTSTATUS status;
-               int ret;
+       if (dsdb_check_single_valued_link(sa, el) == LDB_SUCCESS) {
+               el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
+       } else {
+               ldb_asprintf_errstring(ldb,
+                                      "Attribute %s is single valued but "
+                                      "more than one value has been supplied",
+                                      el->name);
+               talloc_free(tmp_ctx);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
+       
+       ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
+                            sa->syntax->ldap_oid, parent);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               if (dsdb_dn == NULL) {
-                       talloc_free(tmp_ctx);
-                       return LDB_ERR_INVALID_DN_SYNTAX;
-               }
+       ret = check_parsed_dn_duplicates(module, el, pdn);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               /* note that the DN already has the extended
-                  components from the extended_dn_store module */
-               status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &target_guid, "GUID");
-               if (!NT_STATUS_IS_OK(status) || GUID_all_zero(&target_guid)) {
-                       ret = dsdb_module_guid_by_dn(module, dsdb_dn->dn, &target_guid, parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-                       ret = dsdb_set_extended_dn_guid(dsdb_dn->dn, &target_guid, "GUID");
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-               }
+       new_values = talloc_array(tmp_ctx, struct ldb_val, el->num_values);
+       if (new_values == NULL) {
+               ldb_module_oom(module);
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
 
-               ret = replmd_build_la_val(el->values, v, dsdb_dn, invocationId,
-                                         seq_num, seq_num, now, 0, false);
+       for (i = 0; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
+                                         &ac->our_invocation_id,
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               ret = replmd_add_backlink(module, replmd_private,
-                                         schema, guid, &target_guid, true, sa,
-                                         false);
+               ret = replmd_defer_add_backlink(module, replmd_private,
+                                               schema, ac,
+                                               forward_dn, &p->guid, true, sa,
+                                               parent);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
+
+               new_values[i] = *p->v;
        }
+       el->values = talloc_steal(mem_ctx, new_values);
 
        talloc_free(tmp_ctx);
        return LDB_SUCCESS;
 }
 
+static int replmd_add_make_extended_dn(struct ldb_request *req,
+                                      const DATA_BLOB *guid_blob,
+                                      struct ldb_dn **_extended_dn)
+{
+       int ret;
+        const DATA_BLOB *sid_blob;
+       /* Calculate an extended DN for any linked attributes */
+       struct ldb_dn *extended_dn = ldb_dn_copy(req, req->op.add.message->dn);
+       if (!extended_dn) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ret = ldb_dn_set_extended_component(extended_dn, "GUID", guid_blob);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       sid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectSID");
+       if (sid_blob != NULL) {
+               ret = ldb_dn_set_extended_component(extended_dn, "SID", sid_blob);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+       *_extended_dn = extended_dn;
+       return LDB_SUCCESS;
+}
 
 /*
   intercept add requests
@@ -920,10 +1069,13 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        struct ldb_request *down_req;
        struct ldb_message *msg;
         const DATA_BLOB *guid_blob;
+        DATA_BLOB guid_blob_stack;
        struct GUID guid;
+       uint8_t guid_data[16];
        struct replPropertyMetaDataBlob nmd;
        struct ldb_val nmd_value;
-
+       struct ldb_dn *extended_dn = NULL;
+       
        /*
         * The use of a time_t here seems odd, but as the NTTIME
         * elements are actually declared as NTTIME_1sec in the IDL,
@@ -980,6 +1132,13 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        } else {
                /* a new GUID */
                guid = GUID_random();
+               
+               guid_blob_stack = data_blob_const(guid_data, sizeof(guid_data));
+               
+               /* This can't fail */
+               ndr_push_struct_into_fixed_blob(&guid_blob_stack, &guid,
+                                               (ndr_push_flags_fn_t)ndr_push_GUID);
+               guid_blob = &guid_blob_stack;
        }
 
        ac = replmd_ctx_init(module, req);
@@ -1078,10 +1237,28 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                }
 
                if (sa->linkID != 0 && functional_level > DS_DOMAIN_FUNCTION_2000) {
-                       ret = replmd_add_fix_la(module, replmd_private, e,
-                                               ac->seq_num,
-                                               &ac->our_invocation_id, now,
-                                               &guid, sa, req);
+                       if (extended_dn == NULL) {
+                               ret = replmd_add_make_extended_dn(req,
+                                                                 guid_blob,
+                                                                 &extended_dn);
+                               if (ret != LDB_SUCCESS) {
+                                       talloc_free(ac);
+                                       return ret;
+                               }
+                       }                       
+
+                       /*
+                        * Prepare the context for the backlinks and
+                        * create metadata for the forward links.  The
+                        * backlinks are created in
+                        * replmd_op_callback() after the successful
+                        * ADD of the object.
+                        */
+                       ret = replmd_add_fix_la(module, msg->elements,
+                                               replmd_private, e,
+                                               ac, now,
+                                               extended_dn,
+                                               sa, req);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(ac);
                                return ret;
@@ -1267,6 +1444,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now,
                                      bool is_schema_nc,
+                                     bool is_forced_rodc,
                                      struct ldb_request *req)
 {
        uint32_t i;
@@ -1405,6 +1583,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1 = &omd->ctr.ctr1.array[i];
        md1->version++;
        md1->attid = attid;
+
        if (md1->attid == DRSUAPI_ATTID_isDeleted) {
                const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
                const char* rdn;
@@ -1431,6 +1610,11 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1->originating_usn           = *seq_num;
        md1->local_usn                 = *seq_num;
 
+       if (is_forced_rodc) {
+               /* Force version to 0 to be overriden later via replication */
+               md1->version = 0;
+       }
+
        return LDB_SUCCESS;
 }
 
@@ -1451,7 +1635,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
                                       struct replPropertyMetaDataBlob *omd,
                                       struct replmd_replicated_request *ar,
                                       NTTIME now,
-                                      bool is_schema_nc)
+                                      bool is_schema_nc,
+                                      bool is_forced_rodc)
 {
        const char *rdn_name = ldb_dn_get_rdn_name(msg->dn);
        const struct dsdb_attribute *rdn_attr =
@@ -1482,7 +1667,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
        return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
                                          omd, ar->schema, &ar->seq_num,
                                          &ar->our_invocation_id,
-                                         now, is_schema_nc, ar->req);
+                                         now, is_schema_nc, is_forced_rodc,
+                                         ar->req);
 
 }
 
@@ -1529,6 +1715,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
        bool rmd_is_provided;
        bool rmd_is_just_resorted = false;
        const char *not_rename_attrs[4 + msg->num_elements];
+       bool is_forced_rodc = false;
 
        if (rename_attrs) {
                attrs = rename_attrs;
@@ -1545,6 +1732,17 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
        ldb = ldb_module_get_ctx(module);
 
+       ret = samdb_rodc(ldb, rodc);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
+               *rodc = false;
+       }
+
+       if (*rodc &&
+           ldb_request_get_control(req, DSDB_CONTROL_FORCE_RODC_LOCAL_CHANGE)) {
+               is_forced_rodc = true;
+       }
+
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
                /* this happens during an initial vampire while
@@ -1681,6 +1879,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                                         &omd, schema, seq_num,
                                                         our_invocation_id,
                                                         now, is_schema_nc,
+                                                        is_forced_rodc,
                                                         req);
                        if (ret != LDB_SUCCESS) {
                                return ret;
@@ -1736,13 +1935,11 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
                /*if we are RODC and this is a DRSR update then its ok*/
                if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
-                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)) {
+                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)
+                   && !is_forced_rodc) {
                        unsigned instanceType;
 
-                       ret = samdb_rodc(ldb, rodc);
-                       if (ret != LDB_SUCCESS) {
-                               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
-                       } else if (*rodc) {
+                       if (*rodc) {
                                ldb_set_errstring(ldb, "RODC modify is forbidden!");
                                return LDB_ERR_REFERRAL;
                        }
@@ -1788,140 +1985,14 @@ static int replmd_update_rpmd(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
-struct parsed_dn {
-       struct dsdb_dn *dsdb_dn;
-       struct GUID guid;
-       struct ldb_val *v;
-};
-
-struct compare_ctx {
-       struct GUID *guid;
-       struct ldb_context *ldb;
-       TALLOC_CTX *mem_ctx;
-       const char *ldap_oid;
-       int err;
-       const struct GUID *invocation_id;
-};
-
-
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
 {
-       return GUID_compare(&pdn1->guid, &pdn2->guid);
-}
-
-static int la_guid_compare_with_trusted_dn(struct compare_ctx *ctx,
-                                          struct parsed_dn *p)
-{
-       /*
-        * This works like a standard compare function in its return values,
-        * but has an extra trick to deal with errors: zero is returned and
-        * ctx->err is set to the ldb error code.
-        *
-        * That is, if (as is expected in most cases) you get a non-zero
-        * result, you don't need to check for errors.
-        */
-       NTSTATUS status;
-
-       if (GUID_all_zero(&p->guid)) {
-
-               status = dsdb_get_extended_dn_guid(p->dsdb_dn->dn, &p->guid, "GUID");
-               if (!NT_STATUS_IS_OK(status)) {
-                       ctx->err = LDB_ERR_OPERATIONS_ERROR;
-                       return 0;
-               }
-       }
-
-       return GUID_compare(ctx->guid, &p->guid);
-}
-
-
-
-static int parsed_dn_find(struct ldb_context *ldb, struct parsed_dn *pdn,
-                         unsigned int count,
-                         struct GUID *guid,
-                         struct ldb_dn *target_dn,
-                         struct parsed_dn **exact,
-                         struct parsed_dn **next,
-                         const char *ldap_oid)
-{
-       unsigned int i;
-       struct compare_ctx ctx;
-       if (pdn == NULL) {
-               *exact = NULL;
-               *next = NULL;
-               return LDB_SUCCESS;
-       }
-
-       if (unlikely(GUID_all_zero(guid))) {
-               /*
-                * When updating a link using DRS, we sometimes get a NULL
-                * GUID when a forward link has been deleted and its GUID has
-                * for some reason been forgotten. The best we can do is try
-                * and match by DN via a linear search. Note that this
-                * probably only happens in the ADD case, in which we only
-                * allow modification of link if it is already deleted, so
-                * this seems very close to an elaborate NO-OP, but we are not
-                * quite prepared to declare it so.
-                *
-                * If the DN is not in our list, we have to add it to the
-                * beginning of the list, where it would naturally sort.
-                */
-               struct parsed_dn *p;
-               if (target_dn == NULL) {
-                       /* We don't know the target DN, so we can't search for DN */
-                       DEBUG(1, ("parsed_dn_find has a NULL GUID for a linked "
-                                 "attribute but we don't have a DN to compare "
-                                 "it with\n"));
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *exact = NULL;
-               *next = NULL;
-
-               DEBUG(3, ("parsed_dn_find has a NULL GUID for a link to DN "
-                         "%s; searching through links for it",
-                         ldb_dn_get_linearized(target_dn)));
-
-               for (i = 0; i < count; i++) {
-                       int cmp;
-                       p = &pdn[i];
-                       cmp = ldb_dn_compare(p->dsdb_dn->dn, target_dn);
-                       if (cmp == 0) {
-                               dsdb_get_extended_dn_guid(p->dsdb_dn->dn,
-                                                         &p->guid, "GUID");
-                               *exact = p;
-                               return LDB_SUCCESS;
-                       }
-               }
-               /*
-                * Here we have a null guid which doesn't match any existing
-                * link. This is a bit unexpected because null guids occur
-                * when a forward link has been deleted and we are replicating
-                * that deletion.
-                *
-                * The best thing to do is weep into the logs and add the
-                * offending link to the beginning of the list which is
-                * at least the correct sort position.
-                */
-               DEBUG(1, ("parsed_dn_find has been given a NULL GUID for a "
-                         "link to unknown DN %s\n",
-                         ldb_dn_get_linearized(target_dn)));
-               *next = pdn;
-               return LDB_SUCCESS;
-       }
-
-       ctx.guid = guid;
-       ctx.ldb = ldb;
-       ctx.mem_ctx = pdn;
-       ctx.ldap_oid = ldap_oid;
-       ctx.err = 0;
-
-       BINARY_ARRAY_SEARCH_GTE(pdn, count, &ctx, la_guid_compare_with_trusted_dn,
-                               *exact, *next);
-
-       if (ctx.err != 0) {
-               return ctx.err;
+       int ret = ndr_guid_compare(&pdn1->guid, &pdn2->guid);
+       if (ret == 0) {
+               return data_blob_cmp(&pdn1->dsdb_dn->extra_part,
+                                    &pdn2->dsdb_dn->extra_part);
        }
-       return LDB_SUCCESS;
+       return ret;
 }
 
 /*
@@ -1963,12 +2034,17 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                dn = p->dsdb_dn->dn;
 
                status = dsdb_get_extended_dn_guid(dn, &p->guid, "GUID");
-               if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
+               if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) ||
+                   unlikely(GUID_all_zero(&p->guid))) {
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
-                               ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
-                                                      ldb_dn_get_linearized(dn));
+                               char *dn_str = NULL;
+                               dn_str = ldb_dn_get_extended_linearized(mem_ctx,
+                                                                       (dn), 1);
+                               ldb_asprintf_errstring(ldb,
+                                               "Unable to find GUID for DN %s\n",
+                                               dn_str);
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
@@ -1999,95 +2075,129 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
 }
 
 /*
-  build a new extended DN, including all meta data fields
-
-  RMD_FLAGS           = DSDB_RMD_FLAG_* bits
-  RMD_ADDTIME         = originating_add_time
-  RMD_INVOCID         = originating_invocation_id
-  RMD_CHANGETIME      = originating_change_time
-  RMD_ORIGINATING_USN = originating_usn
-  RMD_LOCAL_USN       = local_usn
-  RMD_VERSION         = version
+ * Get a series of trusted message element values. The result is sorted by
+ * GUID, even though the GUIDs might not be known. That works because we trust
+ * the database to give us the elements like that if the
+ * replmd_private->sorted_links flag is set.
+ *
+ * We also ensure that the links are in the Functional Level 2003
+ * linked attributes format.
  */
-static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
+static int get_parsed_dns_trusted(struct ldb_module *module,
+                                 struct replmd_private *replmd_private,
+                                 TALLOC_CTX *mem_ctx,
+                                 struct ldb_message_element *el,
+                                 struct parsed_dn **pdn,
+                                 const char *ldap_oid,
+                                 struct ldb_request *parent)
 {
-       struct ldb_dn *dn = dsdb_dn->dn;
-       const char *tstring, *usn_string, *flags_string;
-       struct ldb_val tval;
-       struct ldb_val iid;
-       struct ldb_val usnv, local_usnv;
-       struct ldb_val vers, flagsv;
-       NTSTATUS status;
+       unsigned int i;
        int ret;
-       const char *dnstring;
-       char *vstring;
-       uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
-
-       tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
-       if (!tstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       if (el == NULL) {
+               *pdn = NULL;
+               return LDB_SUCCESS;
        }
-       tval = data_blob_string_const(tstring);
 
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       usnv = data_blob_string_const(usn_string);
+       if (!replmd_private->sorted_links) {
+               /* We need to sort the list. This is the slow old path we want
+                  to avoid.
+                */
+               ret = get_parsed_dns(module, mem_ctx, el, pdn, ldap_oid,
+                                     parent);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       } else {
+               /* Here we get a list of 'struct parsed_dns' without the parsing */
+               *pdn = talloc_zero_array(mem_ctx, struct parsed_dn,
+                                        el->num_values);
+               if (!*pdn) {
+                       ldb_module_oom(module);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
 
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
+               for (i = 0; i < el->num_values; i++) {
+                       (*pdn)[i].v = &el->values[i];
+               }
        }
-       local_usnv = data_blob_string_const(usn_string);
 
-       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
-       if (!vstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       vers = data_blob_string_const(vstring);
+       /*
+        * This upgrades links to FL2003 style, and sorts the result
+        * if that was needed.
+        *
+        * TODO: Add a database feature that asserts we have no FL2000
+        *       style links to avoid this check or add a feature that
+        *       uses a similar check to find sorted/unsorted links
+        *       for an on-the-fly upgrade.
+        */
 
-       status = GUID_to_ndr_blob(invocation_id, dn, &iid);
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       ret = replmd_check_upgrade_links(ldb_module_get_ctx(module),
+                                        *pdn, el->num_values,
+                                        el,
+                                        ldap_oid);
+       if (ret != LDB_SUCCESS) {
+               return ret;
        }
 
-       flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
-       if (!flags_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       flagsv = data_blob_string_const(flags_string);
+       return LDB_SUCCESS;
+}
 
-       ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
-       if (ret != LDB_SUCCESS) return ret;
+/*
+   Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
+   otherwise an error code. For compatibility the error code differs depending
+   on whether or not the attribute is "member".
 
-       dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
-       if (dnstring == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       *v = data_blob_string_const(dnstring);
+   As always, the parsed_dn list is assumed to be sorted.
+ */
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn)
+{
+       unsigned int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
 
+       for (i = 1; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
+                       ldb_asprintf_errstring(ldb,
+                                              "Linked attribute %s has "
+                                              "multiple identical values",
+                                              el->name);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
+       }
        return LDB_SUCCESS;
 }
 
+/*
+  build a new extended DN, including all meta data fields
+
+  RMD_FLAGS           = DSDB_RMD_FLAG_* bits
+  RMD_ADDTIME         = originating_add_time
+  RMD_INVOCID         = originating_invocation_id
+  RMD_CHANGETIME      = originating_change_time
+  RMD_ORIGINATING_USN = originating_usn
+  RMD_LOCAL_USN       = local_usn
+  RMD_VERSION         = version
+ */
+static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
+                              struct dsdb_dn *dsdb_dn,
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime)
+{
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
+                                local_usn, local_usn, nttime,
+                                RMD_VERSION_INITIAL, false);
+}
+
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                                uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted);
+                               bool deleted);
 
 /*
   check if any links need upgrading from w2k format
@@ -2095,18 +2205,18 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
 static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      struct parsed_dn *dns, uint32_t count,
                                      struct ldb_message_element *el,
-                                     const struct GUID *invocation_id,
                                      const char *ldap_oid)
 {
        uint32_t i;
+       const struct GUID *invocation_id = NULL;
        for (i=0; i<count; i++) {
                NTSTATUS status;
                uint32_t version;
                int ret;
                if (dns[i].dsdb_dn == NULL) {
-                       dns[i].dsdb_dn = dsdb_dn_parse(dns, ldb, dns[i].v,
-                                                      ldap_oid);
-                       if (dns[i].dsdb_dn == NULL) {
+                       ret = really_parse_trusted_dn(dns, ldb, &dns[i],
+                                                     ldap_oid);
+                       if (ret != LDB_SUCCESS) {
                                return LDB_ERR_INVALID_DN_SYNTAX;
                        }
                }
@@ -2130,26 +2240,47 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                        continue;
                }
 
+               if (invocation_id == NULL) {
+                       invocation_id = samdb_ntds_invocation_id(ldb);
+                       if (invocation_id == NULL) {
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+               }
+
+
                /* it's an old one that needs upgrading */
                ret = replmd_update_la_val(el->values, dns[i].v,
                                           dns[i].dsdb_dn, dns[i].dsdb_dn,
-                                          invocation_id, 1, 1, 0, 0, false);
+                                          invocation_id, 1, 1, 0, false);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
        }
+
+       /*
+        * This sort() is critical for the operation of
+        * get_parsed_dns_trusted() because callers of this function
+        * expect a sorted list, and FL2000 style links are not
+        * sorted.  In particular, as well as the upgrade case,
+        * get_parsed_dns_trusted() is called from
+        * replmd_delete_remove_link() even in FL2000 mode
+        *
+        * We do not normally pay the cost of the qsort() due to the
+        * early return in the RMD_VERSION found case.
+        */
+       TYPESAFE_QSORT(dns, count, parsed_dn_compare);
        return LDB_SUCCESS;
 }
 
 /*
-  update an extended DN, including all meta data fields
+  Sets the value for a linked attribute, including all meta data fields
 
   see replmd_build_la_val for value names
  */
-static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted)
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
        const char *tstring, *usn_string, *flags_string;
@@ -2157,8 +2288,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        struct ldb_val iid;
        struct ldb_val usnv, local_usnv;
        struct ldb_val vers, flagsv;
-       const struct ldb_val *old_addtime;
-       uint32_t old_version;
+       const struct ldb_val *old_addtime = NULL;
        NTSTATUS status;
        int ret;
        const char *dnstring;
@@ -2198,7 +2328,10 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        if (ret != LDB_SUCCESS) return ret;
 
        /* get the ADDTIME from the original */
-       old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
+       if (old_dsdb_dn != NULL) {
+               old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
+                                                           "RMD_ADDTIME");
+       }
        if (old_addtime == NULL) {
                old_addtime = &tval;
        }
@@ -2223,12 +2356,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
        if (ret != LDB_SUCCESS) return ret;
 
-       /* increase the version by 1 */
-       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
-       if (NT_STATUS_IS_OK(status) && old_version >= version) {
-               version = old_version+1;
-       }
-       vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
+       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
        vers = data_blob_string_const(vstring);
        ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
        if (ret != LDB_SUCCESS) return ret;
@@ -2242,6 +2370,33 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        return LDB_SUCCESS;
 }
 
+/**
+ * Updates the value for a linked attribute, including all meta data fields
+ */
+static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                               bool deleted)
+{
+       uint32_t old_version;
+       uint32_t version = RMD_VERSION_INITIAL;
+       NTSTATUS status;
+
+       /*
+        * We're updating the linked attribute locally, so increase the version
+        * by 1 so that other DCs will see the change when it gets replicated out
+        */
+       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
+                                            "RMD_VERSION");
+
+       if (NT_STATUS_IS_OK(status)) {
+               version = old_version + 1;
+       }
+
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
+                                usn, local_usn, nttime, version, deleted);
+}
+
 /*
   handle adding a linked attribute
  */
@@ -2254,84 +2409,110 @@ static int replmd_modify_la_add(struct ldb_module *module,
                                const struct dsdb_attribute *schema_attr,
                                uint64_t seq_num,
                                time_t t,
-                               struct GUID *msg_guid,
+                               struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
 {
-       unsigned int i;
+       unsigned int i, j;
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = talloc_new(msg);
        int ret;
        struct ldb_val *new_values = NULL;
-       unsigned int num_new_values = 0;
-       unsigned old_num_values = old_el?old_el->num_values:0;
+       unsigned old_num_values = old_el ? old_el->num_values : 0;
+       unsigned num_values = 0;
+       unsigned max_num_values;
        const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
-
        unix_to_nt_time(&now, t);
 
-       ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid, parent);
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (!invocation_id) {
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* get the DNs to be added, fully parsed.
+        *
+        * We need full parsing because they came off the wire and we don't
+        * trust them, besides which we need their details to know where to put
+        * them.
+        */
+       ret = get_parsed_dns(module, tmp_ctx, el, &dns,
+                            schema_attr->syntax->ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid, parent);
+       /* get the existing DNs, lazily parsed */
+       ret = get_parsed_dns_trusted(module, replmd_private,
+                                    tmp_ctx, old_el, &old_dns,
+                                    schema_attr->syntax->ldap_oid, parent);
+
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
+       max_num_values = old_num_values + el->num_values;
+       if (max_num_values < old_num_values) {
+               DEBUG(0, ("we seem to have overflow in replmd_modify_la_add. "
+                         "old values: %u, new values: %u, sum: %u",
+                         old_num_values, el->num_values, max_num_values));
                talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = replmd_check_upgrade_links(ldb, old_dns, old_num_values,
-                                        old_el, invocation_id,
-                                        schema_attr->syntax->ldap_oid);
-       if (ret != LDB_SUCCESS) {
+       new_values = talloc_zero_array(tmp_ctx, struct ldb_val, max_num_values);
+
+       if (new_values == NULL) {
+               ldb_module_oom(module);
                talloc_free(tmp_ctx);
-               return ret;
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* for each new value, see if it exists already with the same GUID */
-       for (i=0; i<el->num_values; i++) {
-               struct parsed_dn *p;
+       /*
+        * For each new value, find where it would go in the list. If there is
+        * a matching GUID there, we update the existing value; otherwise we
+        * put it in place.
+        */
+       j = 0;
+       for (i = 0; i < el->num_values; i++) {
+               struct parsed_dn *exact;
                struct parsed_dn *next;
+               unsigned offset;
                int err = parsed_dn_find(ldb, old_dns, old_num_values,
                                         &dns[i].guid,
                                         dns[i].dsdb_dn->dn,
-                                        &p, &next,
-                                        schema_attr->syntax->ldap_oid);
+                                        dns[i].dsdb_dn->extra_part, 0,
+                                        &exact, &next,
+                                        schema_attr->syntax->ldap_oid,
+                                        true);
                if (err != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return err;
                }
-               if (p == NULL) {
-                       /* this is a new linked attribute value */
-                       new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val, num_new_values+1);
-                       if (new_values == NULL) {
-                               ldb_module_oom(module);
-                               talloc_free(tmp_ctx);
-                               return LDB_ERR_OPERATIONS_ERROR;
-                       }
-                       ret = replmd_build_la_val(new_values, &new_values[num_new_values], dns[i].dsdb_dn,
-                                                 invocation_id, seq_num, seq_num, now, 0, false);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-                       num_new_values++;
-               } else {
-                       /* this is only allowed if the GUID was
-                          previously deleted. */
-                       uint32_t rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
+
+               if (exact != NULL) {
+                       /*
+                        * We are trying to add one that exists, which is only
+                        * allowed if it was previously deleted.
+                        *
+                        * When we do undelete a link we change it in place.
+                        * It will be copied across into the right spot in due
+                        * course.
+                        */
+                       uint32_t rmd_flags;
+                       rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
+
                        if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                                struct GUID_txt_buf guid_str;
-                               ldb_asprintf_errstring(ldb, "Attribute %s already exists for target GUID %s",
-                                                      el->name, GUID_buf_string(&p->guid, &guid_str));
+                               ldb_asprintf_errstring(ldb,
+                                                      "Attribute %s already "
+                                                      "exists for target GUID %s",
+                                                      el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str));
                                talloc_free(tmp_ctx);
                                /* error codes for 'member' need to be
                                   special cased */
@@ -2341,37 +2522,90 @@ static int replmd_modify_la_add(struct ldb_module *module,
                                        return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                }
                        }
-                       ret = replmd_update_la_val(old_el->values, p->v, dns[i].dsdb_dn, p->dsdb_dn,
-                                                  invocation_id, seq_num, seq_num, now, 0, false);
+
+                       ret = replmd_update_la_val(new_values, exact->v,
+                                                  dns[i].dsdb_dn,
+                                                  exact->dsdb_dn,
+                                                  invocation_id, seq_num,
+                                                  seq_num, now, false);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+
+                       ret = replmd_add_backlink(module, replmd_private,
+                                                 schema,
+                                                 msg_dn,
+                                                 &dns[i].guid, 
+                                                 true,
+                                                 schema_attr,
+                                                 parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
+                               }
+                       continue;
+               }
+               /*
+                * Here we don't have an exact match.
+                *
+                * If next is NULL, this one goes beyond the end of the
+                * existing list, so we need to add all of those ones first.
+                *
+                * If next is not NULL, we need to add all the ones before
+                * next.
+                */
+               if (next == NULL) {
+                       offset = old_num_values;
+               } else {
+                       /* next should have been parsed, but let's make sure */
+                       if (next->dsdb_dn == NULL) {
+                               ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
+                                                             schema_attr->syntax->ldap_oid);
+                               if (ret != LDB_SUCCESS) {
+                                       return ret;
+                               }
                        }
+                       offset = MIN(next - old_dns, old_num_values);
+               }
+
+               /* put all the old ones before next on the list */
+               for (; j < offset; j++) {
+                       new_values[num_values] = *old_dns[j].v;
+                       num_values++;
                }
 
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_guid, &dns[i].guid,
-                                         true, schema_attr, true);
+                                         schema, msg_dn,
+                                         &dns[i].guid,
+                                         true, schema_attr,
+                                         parent);
+               /* Make the new linked attribute ldb_val. */
+               ret = replmd_build_la_val(new_values, &new_values[num_values],
+                                         dns[i].dsdb_dn, invocation_id,
+                                         seq_num, now);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+               num_values++;
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
        }
-
-       /* add the new ones on to the end of the old values, constructing a new el->values */
-       el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
-                                   struct ldb_val,
-                                   old_num_values+num_new_values);
-       if (el->values == NULL) {
-               ldb_module_oom(module);
-               return LDB_ERR_OPERATIONS_ERROR;
+       /* copy the rest of the old ones (if any) */
+       for (; j < old_num_values; j++) {
+               new_values[num_values] = *old_dns[j].v;
+               num_values++;
        }
 
-       memcpy(&el->values[old_num_values], new_values, num_new_values*sizeof(struct ldb_val));
-       el->num_values = old_num_values + num_new_values;
-
-       talloc_steal(msg->elements, el->values);
-       talloc_steal(el->values, new_values);
+       talloc_steal(msg->elements, new_values);
+       if (old_el != NULL) {
+               talloc_steal(msg->elements, old_el->values);
+       }
+       el->values = new_values;
+       el->num_values = num_values;
 
        talloc_free(tmp_ctx);
 
@@ -2395,29 +2629,34 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                                   const struct dsdb_attribute *schema_attr,
                                   uint64_t seq_num,
                                   time_t t,
-                                  struct GUID *msg_guid,
+                                  struct ldb_dn *msg_dn,
                                   struct ldb_request *parent)
 {
        unsigned int i;
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = NULL;
        int ret;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_control *vanish_links_ctrl = NULL;
        bool vanish_links = false;
        unsigned int num_to_delete = el->num_values;
+       uint32_t rmd_flags;
+       const struct GUID *invocation_id;
        NTTIME now;
 
        unix_to_nt_time(&now, t);
 
-       /* check if there is nothing to delete */
-       if ((!old_el || old_el->num_values == 0) &&
-           el->num_values == 0) {
-               return LDB_SUCCESS;
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (!invocation_id) {
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       if (!old_el || old_el->num_values == 0) {
+       if (old_el == NULL || old_el->num_values == 0) {
+               /* there is nothing to delete... */
+               if (num_to_delete == 0) {
+                       /* and we're deleting nothing, so that's OK */
+                       return LDB_SUCCESS;
+               }
                return LDB_ERR_NO_SUCH_ATTRIBUTE;
        }
 
@@ -2426,27 +2665,17 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid, parent);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid, parent);
+       ret = get_parsed_dns(module, tmp_ctx, el, &dns,
+                            schema_attr->syntax->ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       ret = get_parsed_dns_trusted(module, replmd_private,
+                                    tmp_ctx, old_el, &old_dns,
+                                    schema_attr->syntax->ldap_oid, parent);
 
-       ret = replmd_check_upgrade_links(ldb, old_dns, old_el->num_values,
-                                        old_el, invocation_id,
-                                        schema_attr->syntax->ldap_oid);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
@@ -2460,30 +2689,86 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                }
        }
 
+       /* we empty out el->values here to avoid damage if we return early. */
        el->num_values = 0;
        el->values = NULL;
 
-       /* see if we are being asked to delete any links that
-          don't exist or are already deleted */
-       for (i=0; i < num_to_delete; i++) {
+       /*
+        * If vanish links is set, we are actually removing members of
+        *  old_el->values; otherwise we are just marking them deleted.
+        *
+        * There is a special case when no values are given: we remove them
+        * all. When we have the vanish_links control we just have to remove
+        * the backlinks and change our element to replace the existing values
+        * with the empty list.
+        */
+
+       if (num_to_delete == 0) {
+               for (i = 0; i < old_el->num_values; i++) {
+                       struct parsed_dn *p = &old_dns[i];
+                       if (p->dsdb_dn == NULL) {
+                               ret = really_parse_trusted_dn(tmp_ctx, ldb, p,
+                                                             schema_attr->syntax->ldap_oid);
+                               if (ret != LDB_SUCCESS) {
+                                       return ret;
+                               }
+                       }
+                       ret = replmd_add_backlink(module, replmd_private,
+                                                 schema, msg_dn, &p->guid,
+                                                 false, schema_attr,
+                                                 parent);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+                       if (vanish_links) {
+                               continue;
+                       }
+
+                       rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
+                       if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
+                               continue;
+                       }
+
+                       ret = replmd_update_la_val(old_el->values, p->v,
+                                                  p->dsdb_dn, p->dsdb_dn,
+                                                  invocation_id, seq_num,
+                                                  seq_num, now, true);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+               }
+
+               if (vanish_links) {
+                       el->flags = LDB_FLAG_MOD_REPLACE;
+                       talloc_free(tmp_ctx);
+                       return LDB_SUCCESS;
+               }
+       }
+
+
+       for (i = 0; i < num_to_delete; i++) {
                struct parsed_dn *p = &dns[i];
-               struct parsed_dn *p2;
+               struct parsed_dn *exact = NULL;
                struct parsed_dn *next = NULL;
-               uint32_t rmd_flags;
                ret = parsed_dn_find(ldb, old_dns, old_el->num_values,
                                     &p->guid,
                                     NULL,
-                                    &p2, &next,
-                                    schema_attr->syntax->ldap_oid);
+                                    p->dsdb_dn->extra_part, 0,
+                                    &exact, &next,
+                                    schema_attr->syntax->ldap_oid,
+                                    true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
-
-               if (!p2) {
+               if (exact == NULL) {
                        struct GUID_txt_buf buf;
-                       ldb_asprintf_errstring(ldb, "Attribute %s doesn't exist for target GUID %s",
-                                              el->name, GUID_buf_string(&p->guid, &buf));
+                       ldb_asprintf_errstring(ldb, "Attribute %s doesn't "
+                                              "exist for target GUID %s",
+                                              el->name,
+                                              GUID_buf_string(&p->guid, &buf));
                        if (ldb_attr_cmp(el->name, "member") == 0) {
                                talloc_free(tmp_ctx);
                                return LDB_ERR_UNWILLING_TO_PERFORM;
@@ -2492,136 +2777,98 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                                return LDB_ERR_NO_SUCH_ATTRIBUTE;
                        }
                }
-               rmd_flags = dsdb_dn_rmd_flags(p2->dsdb_dn->dn);
-               if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
-                       struct GUID_txt_buf buf;
-                       const char *guid_str = GUID_buf_string(&p->guid, &buf);
-                       if (vanish_links) {
-                               DEBUG(0, ("Deleting deleted linked attribute %s to %s, "
-                                         "because vanish_links control is set\n",
-                                         el->name, guid_str));
-                               continue;
+
+               if (vanish_links) {
+                       if (CHECK_DEBUGLVL(5)) {
+                               rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
+                               if ((rmd_flags & DSDB_RMD_FLAG_DELETED)) {
+                                       struct GUID_txt_buf buf;
+                                       const char *guid_str = \
+                                               GUID_buf_string(&p->guid, &buf);
+                                       DEBUG(5, ("Deleting deleted linked "
+                                                 "attribute %s to %s, because "
+                                                 "vanish_links control is set\n",
+                                                 el->name, guid_str));
+                               }
                        }
-                       ldb_asprintf_errstring(ldb, "Attribute %s already deleted for target GUID %s",
-                                              el->name, guid_str);
-                       if (ldb_attr_cmp(el->name, "member") == 0) {
-                               talloc_free(tmp_ctx);
+
+                       /* remove the backlink */
+                       ret = replmd_add_backlink(module,
+                                                 replmd_private,
+                                                 schema, 
+                                                 msg_dn,
+                                                 &p->guid,
+                                                 false, schema_attr,
+                                                 parent);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+
+                       /* We flag the deletion and tidy it up later. */
+                       exact->v = NULL;
+                       continue;
+               }
+
+               rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
+
+               if (rmd_flags & DSDB_RMD_FLAG_DELETED) {
+                       struct GUID_txt_buf buf;
+                       const char *guid_str = GUID_buf_string(&p->guid, &buf);
+                       ldb_asprintf_errstring(ldb, "Attribute %s already "
+                                              "deleted for target GUID %s",
+                                              el->name, guid_str);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               talloc_free(tmp_ctx);
                                return LDB_ERR_UNWILLING_TO_PERFORM;
                        } else {
                                talloc_free(tmp_ctx);
                                return LDB_ERR_NO_SUCH_ATTRIBUTE;
                        }
                }
+
+               ret = replmd_update_la_val(old_el->values, exact->v,
+                                          exact->dsdb_dn, exact->dsdb_dn,
+                                          invocation_id, seq_num, seq_num,
+                                          now, true);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema, msg_dn,
+                                         &p->guid,
+                                         false, schema_attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
        }
 
        if (vanish_links) {
-               if (num_to_delete == old_el->num_values || num_to_delete == 0) {
-                       el->flags = LDB_FLAG_MOD_REPLACE;
+               unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
 
-                       for (i = 0; i < old_el->num_values; i++) {
-                               ret = replmd_add_backlink(module,
-                                                         replmd_private,
-                                                         schema, msg_guid,
-                                                         &old_dns[i].guid,
-                                                         false, schema_attr,
-                                                         true);
-                               if (ret != LDB_SUCCESS) {
-                                       talloc_free(tmp_ctx);
-                                       return ret;
-                               }
-                       }
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
                        talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               } else {
-                       unsigned int num_values = 0;
-                       unsigned int j = 0;
-                       struct parsed_dn *exact = NULL, *next = NULL;
-
-                       for (i = 0; i < old_el->num_values; i++) {
-                               ret = parsed_dn_find(ldb, dns, num_to_delete,
-                                                    &old_dns[i].guid,
-                                                    NULL,
-                                                    &exact, &next,
-                                                    schema_attr->syntax->ldap_oid);
-                               if (ret != LDB_SUCCESS) {
-                                       talloc_free(tmp_ctx);
-                                       return ret;
-                               }
-
-                               if (exact != NULL) {
-                                       /* The element is in the delete list.
-                                          mark it dead. */
-                                       ret = replmd_add_backlink(module,
-                                                                 replmd_private,
-                                                                 schema,
-                                                                 msg_guid,
-                                                                 &old_dns[i].guid,
-                                                                 false,
-                                                                 schema_attr,
-                                                                 true);
-                                       if (ret != LDB_SUCCESS) {
-                                               talloc_free(tmp_ctx);
-                                               return ret;
-                                       }
-                                       old_dns[i].v->length = 0;
-                               } else {
-                                       num_values++;
-                               }
-                       }
-                       for (i = 0; i < old_el->num_values; i++) {
-                               if (old_el->values[i].length != 0) {
-                                       old_el->values[j] = old_el->values[i];
-                                       j++;
-                                       if (j == num_values) {
-                                               break;
-                                       }
-                               }
-                       }
-                       old_el->num_values = num_values;
+                       return ldb_module_oom(module);
                }
-       } else {
-
-               /* for each new value, see if it exists already with the same GUID
-                  if it is not already deleted and matches the delete list then delete it
-               */
-               for (i=0; i<old_el->num_values; i++) {
-                       struct parsed_dn *p = &old_dns[i];
-                       struct parsed_dn *exact = NULL, *next = NULL;
-                       uint32_t rmd_flags;
-
-                       if (num_to_delete != 0) {
-                               ret = parsed_dn_find(ldb, dns, num_to_delete,
-                                                    &p->guid,
-                                                    NULL,
-                                                    &exact, &next,
-                                                    schema_attr->syntax->ldap_oid);
-                               if (ret != LDB_SUCCESS) {
-                                       talloc_free(tmp_ctx);
-                                       return ret;
-                               }
-                               if (exact == NULL) {
-                                       continue;
-                               }
-                       }
-
-                       rmd_flags = dsdb_dn_rmd_flags(p->dsdb_dn->dn);
-                       if (rmd_flags & DSDB_RMD_FLAG_DELETED) continue;
-
-                       ret = replmd_update_la_val(old_el->values, p->v, p->dsdb_dn, p->dsdb_dn,
-                                                  invocation_id, seq_num, seq_num, now, 0, true);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, msg_guid, &p->guid,
-                                                 false, schema_attr, true);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
+               for (i = 0; i < old_el->num_values; i++) {
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
+               old_el->num_values = j;
        }
+
        el->values = talloc_steal(msg->elements, old_el->values);
        el->num_values = old_el->num_values;
 
@@ -2646,168 +2893,222 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                    const struct dsdb_attribute *schema_attr,
                                    uint64_t seq_num,
                                    time_t t,
-                                   struct GUID *msg_guid,
+                                   struct ldb_dn *msg_dn,
                                    struct ldb_request *parent)
 {
-       unsigned int i;
+       unsigned int i, old_i, new_i;
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = talloc_new(msg);
        int ret;
        const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_val *new_values = NULL;
-       unsigned int num_new_values = 0;
-       unsigned int old_num_values = old_el?old_el->num_values:0;
+       const char *ldap_oid = schema_attr->syntax->ldap_oid;
+       unsigned int old_num_values;
+       unsigned int repl_num_values;
+       unsigned int max_num_values;
        NTTIME now;
 
        unix_to_nt_time(&now, t);
 
-       /* check if there is nothing to replace */
-       if ((!old_el || old_el->num_values == 0) &&
-           el->num_values == 0) {
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (!invocation_id) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * The replace operation is unlike the replace and delete cases in that
+        * we need to look at every existing link to see whether it is being
+        * retained or deleted. In other words, we can't avoid parsing the GUIDs.
+        *
+        * As we are trying to combine two sorted lists, the algorithm we use
+        * is akin to the merge phase of a merge sort. We interleave the two
+        * lists, doing different things depending on which side the current
+        * item came from.
+        *
+        * There are three main cases, with some sub-cases.
+        *
+        *  - a DN is in the old list but not the new one. It needs to be
+        *    marked as deleted (but left in the list).
+        *     - maybe it is already deleted, and we have less to do.
+        *
+        *  - a DN is in both lists. The old data gets replaced by the new,
+        *    and the list doesn't grow. The old link may have been marked as
+        *    deleted, in which case we undelete it.
+        *
+        *  - a DN is in the new list only. We add it in the right place.
+        */
+
+       old_num_values = old_el ? old_el->num_values : 0;
+       repl_num_values = el->num_values;
+       max_num_values = old_num_values + repl_num_values;
+
+       if (max_num_values == 0) {
+               /* There is nothing to do! */
                return LDB_SUCCESS;
        }
 
-       ret = get_parsed_dns(module, tmp_ctx, el, &dns, schema_attr->syntax->ldap_oid, parent);
+       ret = get_parsed_dns(module, tmp_ctx, el, &dns, ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns, schema_attr->syntax->ldap_oid, parent);
+       ret = check_parsed_dn_duplicates(module, el, dns);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
+       ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns,
+                            ldap_oid, parent);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
        }
 
        ret = replmd_check_upgrade_links(ldb, old_dns, old_num_values,
-                                        old_el, invocation_id,
-                                        schema_attr->syntax->ldap_oid);
+                                        old_el, ldap_oid);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       /* mark all the old ones as deleted */
-       for (i=0; i<old_num_values; i++) {
-               struct parsed_dn *old_p = &old_dns[i];
-               struct parsed_dn *exact = NULL, *next = NULL;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
-
-               if (rmd_flags & DSDB_RMD_FLAG_DELETED) continue;
+       new_values = talloc_array(tmp_ctx, struct ldb_val, max_num_values);
+       if (new_values == NULL) {
+               ldb_module_oom(module);
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
 
-               ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_guid, &old_dns[i].guid,
-                                         false, schema_attr, false);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
+       old_i = 0;
+       new_i = 0;
+       for (i = 0; i < max_num_values; i++) {
+               int cmp;
+               struct parsed_dn *old_p, *new_p;
+               if (old_i < old_num_values && new_i < repl_num_values) {
+                       old_p = &old_dns[old_i];
+                       new_p = &dns[new_i];
+                       cmp = parsed_dn_compare(old_p, new_p);
+               } else if (old_i < old_num_values) {
+                       /* the new list is empty, read the old list */
+                       old_p = &old_dns[old_i];
+                       new_p = NULL;
+                       cmp = -1;
+               } else if (new_i < repl_num_values) {
+                       /* the old list is empty, read new list */
+                       old_p = NULL;
+                       new_p = &dns[new_i];
+                       cmp = 1;
+               } else {
+                       break;
                }
 
-               ret = parsed_dn_find(ldb, dns, el->num_values,
-                                    &old_p->guid,
-                                    NULL,
-                                    &exact, &next,
-                                    schema_attr->syntax->ldap_oid);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
-               if (exact) {
-                       /* we don't delete it if we are re-adding it */
-                       continue;
-               }
+               if (cmp < 0) {
+                       /*
+                        * An old ones that come before the next replacement
+                        * (if any). We mark it as deleted and add it to the
+                        * final list.
+                        */
+                       uint32_t rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
+                       if ((rmd_flags & DSDB_RMD_FLAG_DELETED) == 0) {
+                               ret = replmd_update_la_val(new_values, old_p->v,
+                                                          old_p->dsdb_dn,
+                                                          old_p->dsdb_dn,
+                                                          invocation_id,
+                                                          seq_num, seq_num,
+                                                          now, true);
+                               if (ret != LDB_SUCCESS) {
+                                       talloc_free(tmp_ctx);
+                                       return ret;
+                               }
 
-               ret = replmd_update_la_val(old_el->values, old_p->v, old_p->dsdb_dn, old_p->dsdb_dn,
-                                          invocation_id, seq_num, seq_num, now, 0, true);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
-       }
+                               ret = replmd_add_backlink(module, replmd_private,
+                                                         schema, 
+                                                         msg_dn,
+                                                         &old_p->guid, false,
+                                                         schema_attr,
+                                                         parent);
+                               if (ret != LDB_SUCCESS) {
+                                       talloc_free(tmp_ctx);
+                                       return ret;
+                               }
+                       }
+                       new_values[i] = *old_p->v;
+                       old_i++;
+               } else if (cmp == 0) {
+                       /*
+                        * We are overwriting one. If it was previously
+                        * deleted, we need to add a backlink.
+                        *
+                        * Note that if any RMD_FLAGs in an extended new DN
+                        * will be ignored.
+                        */
+                       uint32_t rmd_flags;
 
-       /* for each new value, either update its meta-data, or add it
-        * to old_el
-       */
-       for (i=0; i<el->num_values; i++) {
-               struct parsed_dn *p = &dns[i];
-               struct parsed_dn *old_p = NULL, *next = NULL;
-
-               if (old_dns) {
-                       ret = parsed_dn_find(ldb, old_dns, old_num_values,
-                                            &p->guid,
-                                            NULL,
-                                            &old_p, &next,
-                                            schema_attr->syntax->ldap_oid);
+                       ret = replmd_update_la_val(new_values, old_p->v,
+                                                  new_p->dsdb_dn,
+                                                  old_p->dsdb_dn,
+                                                  invocation_id,
+                                                  seq_num, seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-               }
 
-               if (old_p != NULL) {
-                       /* update in place */
-                       ret = replmd_update_la_val(old_el->values, old_p->v, p->dsdb_dn,
-                                                  old_p->dsdb_dn, invocation_id,
-                                                  seq_num, seq_num, now, 0, false);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
+                       rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
+                       if ((rmd_flags & DSDB_RMD_FLAG_DELETED) != 0) {
+                               ret = replmd_add_backlink(module, replmd_private,
+                                                         schema, 
+                                                         msg_dn,
+                                                         &new_p->guid, true,
+                                                         schema_attr,
+                                                         parent);
+                               if (ret != LDB_SUCCESS) {
+                                       talloc_free(tmp_ctx);
+                                       return ret;
+                               }
                        }
+
+                       new_values[i] = *old_p->v;
+                       old_i++;
+                       new_i++;
                } else {
-                       /* add a new one */
-                       new_values = talloc_realloc(tmp_ctx, new_values, struct ldb_val,
-                                                   num_new_values+1);
-                       if (new_values == NULL) {
-                               ldb_module_oom(module);
+                       /*
+                        * Replacements that don't match an existing one. We
+                        * just add them to the final list.
+                        */
+                       ret = replmd_build_la_val(new_values,
+                                                 new_p->v,
+                                                 new_p->dsdb_dn,
+                                                 invocation_id,
+                                                 seq_num, now);
+                       if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
-                               return LDB_ERR_OPERATIONS_ERROR;
+                               return ret;
                        }
-                       ret = replmd_build_la_val(new_values, &new_values[num_new_values], dns[i].dsdb_dn,
-                                                 invocation_id, seq_num, seq_num, now, 0, false);
+                       ret = replmd_add_backlink(module, replmd_private,
+                                                 schema,
+                                                 msg_dn,
+                                                 &new_p->guid, true,
+                                                 schema_attr,
+                                                 parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       num_new_values++;
-               }
-
-               ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_guid, &dns[i].guid,
-                                         true, schema_attr, false);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
+                       new_values[i] = *new_p->v;
+                       new_i++;
                }
        }
-
-       /* add the new values to the end of old_el */
-       if (num_new_values != 0) {
-               el->values = talloc_realloc(msg->elements, old_el?old_el->values:NULL,
-                                           struct ldb_val, old_num_values+num_new_values);
-               if (el->values == NULL) {
-                       ldb_module_oom(module);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               memcpy(&el->values[old_num_values], &new_values[0],
-                      sizeof(struct ldb_val)*num_new_values);
-               el->num_values = old_num_values + num_new_values;
-               talloc_steal(msg->elements, new_values);
-       } else {
-               el->values = old_el->values;
-               el->num_values = old_el->num_values;
-               talloc_steal(msg->elements, el->values);
+       if (old_el != NULL) {
+               talloc_steal(msg->elements, old_el->values);
        }
-
+       el->values = talloc_steal(msg->elements, new_values);
+       el->num_values = i;
        talloc_free(tmp_ctx);
 
-       /* we now tell the backend to replace all existing values
-          with the one we have constructed */
        el->flags = LDB_FLAG_MOD_REPLACE;
 
        return LDB_SUCCESS;
@@ -2830,7 +3131,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        struct ldb_message *old_msg;
 
        const struct dsdb_schema *schema;
-       struct GUID old_guid;
 
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
@@ -2872,11 +3172,10 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
 
        old_msg = res->msgs[0];
 
-       old_guid = samdb_result_guid(old_msg, "objectGUID");
-
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i];
                struct ldb_message_element *old_el, *new_el;
+               unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
                        = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
                if (!schema_attr) {
@@ -2889,33 +3188,49 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
                        return LDB_ERR_UNWILLING_TO_PERFORM;
                }
                old_el = ldb_msg_find_element(old_msg, el->name);
-               switch (el->flags & LDB_FLAG_MOD_MASK) {
+               switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
                                                       schema, msg, el, old_el,
                                                       schema_attr, seq_num, t,
-                                                      &old_guid, parent);
+                                                      old_msg->dn,
+                                                      parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
                                                      schema, msg, el, old_el,
                                                      schema_attr, seq_num, t,
-                                                     &old_guid, parent);
+                                                     old_msg->dn,
+                                                     parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
                                                   schema, msg, el, old_el,
                                                   schema_attr, seq_num, t,
-                                                  &old_guid, parent);
+                                                  old_msg->dn,
+                                                  parent);
                        break;
                default:
                        ldb_asprintf_errstring(ldb,
@@ -2927,13 +3242,16 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        ldb_asprintf_errstring(ldb,
                                               "Attribute %s is single valued but more than one value has been supplied",
                                               el->name);
-                       return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       /* Return codes as found on Windows 2012r2 */
+                       if (mod_type == LDB_FLAG_MOD_REPLACE) {
+                               return LDB_ERR_CONSTRAINT_VIOLATION;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
                } else {
                        el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
                }
 
-
-
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -2959,6 +3277,47 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
 }
 
 
+static int send_rodc_referral(struct ldb_request *req,
+                             struct ldb_context *ldb,
+                             struct ldb_dn *dn)
+{
+       char *referral = NULL;
+       struct loadparm_context *lp_ctx = NULL;
+       struct ldb_dn *fsmo_role_dn = NULL;
+       struct ldb_dn *role_owner_dn = NULL;
+       const char *domain = NULL;
+       WERROR werr;
+
+       lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
+                                struct loadparm_context);
+
+       werr = dsdb_get_fsmo_role_info(req, ldb, DREPL_PDC_MASTER,
+                                      &fsmo_role_dn, &role_owner_dn);
+
+       if (W_ERROR_IS_OK(werr)) {
+               struct ldb_dn *server_dn = ldb_dn_copy(req, role_owner_dn);
+               if (server_dn != NULL) {
+                       ldb_dn_remove_child_components(server_dn, 1);
+                       domain = samdb_dn_to_dnshostname(ldb, req,
+                                                        server_dn);
+               }
+       }
+
+       if (domain == NULL) {
+               domain = lpcfg_dnsdomain(lp_ctx);
+       }
+
+       referral = talloc_asprintf(req, "ldap://%s/%s",
+                                  domain,
+                                  ldb_dn_get_linearized(dn));
+       if (referral == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       return ldb_module_send_referral(req, referral);
+}
+
 
 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
@@ -2973,6 +3332,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -2998,6 +3358,39 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3031,19 +3424,10 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                                 msg, &ac->seq_num, t, is_schema_nc,
                                 &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(msg->dn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, msg->dn);
                talloc_free(ac);
                return ret;
+
        }
 
        if (ret != LDB_SUCCESS) {
@@ -3303,18 +3687,7 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
                                 msg, &ac->seq_num, t,
                                 is_schema_nc, &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct ldb_dn *olddn = ac->req->op.rename.olddn;
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(olddn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, ac->req->op.rename.olddn);
                talloc_free(ares);
                return ldb_module_done(req, NULL, NULL, ret);
        }
@@ -3383,7 +3756,9 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
  */
 static int replmd_delete_remove_link(struct ldb_module *module,
                                     const struct dsdb_schema *schema,
+                                    struct replmd_private *replmd_private,
                                     struct ldb_dn *dn,
+                                    struct GUID *guid,
                                     struct ldb_message_element *el,
                                     const struct dsdb_attribute *sa,
                                     struct ldb_request *parent)
@@ -3394,14 +3769,19 @@ static int replmd_delete_remove_link(struct ldb_module *module,
 
        for (i=0; i<el->num_values; i++) {
                struct dsdb_dn *dsdb_dn;
-               NTSTATUS status;
                int ret;
-               struct GUID guid2;
                struct ldb_message *msg;
                const struct dsdb_attribute *target_attr;
                struct ldb_message_element *el2;
+               const char *dn_str;
                struct ldb_val dn_val;
                uint32_t dsdb_flags = 0;
+               const char *attrs[] = { NULL, NULL };
+               struct ldb_result *link_res;
+               struct ldb_message *link_msg;
+               struct ldb_message_element *link_el;
+               struct parsed_dn *link_dns;
+               struct parsed_dn *p = NULL, *unused = NULL;
 
                if (dsdb_dn_is_deleted_val(&el->values[i])) {
                        continue;
@@ -3413,12 +3793,6 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
-               status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid2, "GUID");
-               if (!NT_STATUS_IS_OK(status)) {
-                       talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-
                /* remove the link */
                msg = ldb_msg_new(tmp_ctx);
                if (!msg) {
@@ -3434,41 +3808,105 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                if (target_attr == NULL) {
                        continue;
                }
+               attrs[0] = target_attr->lDAPDisplayName;
 
-               ret = ldb_msg_add_empty(msg, target_attr->lDAPDisplayName, LDB_FLAG_MOD_DELETE, &el2);
+               ret = ldb_msg_add_empty(msg, target_attr->lDAPDisplayName,
+                                       LDB_FLAG_MOD_DELETE, &el2);
                if (ret != LDB_SUCCESS) {
                        ldb_module_oom(module);
                        talloc_free(tmp_ctx);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
-               dn_val = data_blob_string_const(ldb_dn_get_linearized(dn));
-               el2->values = &dn_val;
-               el2->num_values = 1;
 
-               /*
-                * Ensure that we tell the modification to vanish any linked
-                * attributes (not simply mark them as isDeleted = TRUE)
-                */
-               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+               ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
+                                           msg->dn, attrs,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
+                                           parent);
 
-               ret = dsdb_module_modify(module, msg, dsdb_flags|DSDB_FLAG_OWN_MODULE, parent);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
-       }
-       talloc_free(tmp_ctx);
-       return LDB_SUCCESS;
-}
 
+               link_msg = link_res->msgs[0];
+               link_el = ldb_msg_find_element(link_msg,
+                                              target_attr->lDAPDisplayName);
+               if (link_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_NO_SUCH_ATTRIBUTE;
+               }
 
-/*
-  handle update of replication meta data for deletion of objects
+               /*
+                * This call 'upgrades' the links in link_dns, but we
+                * do not commit the result back into the database, so
+                * this is safe to call in FL2000 or on databases that
+                * have been run at that level in the past.
+                */
+               ret = get_parsed_dns_trusted(module, replmd_private, tmp_ctx,
+                                            link_el, &link_dns,
+                                            target_attr->syntax->ldap_oid, parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
 
-  This also handles the mapping of delete to a rename operation
-  to allow deletes to be replicated.
+               ret = parsed_dn_find(ldb, link_dns, link_el->num_values,
+                                    guid, dn,
+                                    data_blob_null, 0,
+                                    &p, &unused,
+                                    target_attr->syntax->ldap_oid, false);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
 
-  It also handles the incoming deleted objects, to ensure they are
+               if (p == NULL) {
+                       ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                              "Failed to find forward link on %s "
+                                              "as %s to remove backlink %s on %s",
+                                              ldb_dn_get_linearized(msg->dn),
+                                              target_attr->lDAPDisplayName,
+                                              sa->lDAPDisplayName,
+                                              ldb_dn_get_linearized(dn));
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_NO_SUCH_ATTRIBUTE;
+               }
+
+
+               /* This needs to get the Binary DN, by first searching */
+               dn_str = dsdb_dn_get_linearized(tmp_ctx,
+                                               p->dsdb_dn);
+
+               dn_val = data_blob_string_const(dn_str);
+               el2->values = &dn_val;
+               el2->num_values = 1;
+
+               /*
+                * Ensure that we tell the modification to vanish any linked
+                * attributes (not simply mark them as isDeleted = TRUE)
+                */
+               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
+               ret = dsdb_module_modify(module, msg, dsdb_flags|DSDB_FLAG_OWN_MODULE, parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+       }
+       talloc_free(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
+
+/*
+  handle update of replication meta data for deletion of objects
+
+  This also handles the mapping of delete to a rename operation
+  to allow deletes to be replicated.
+
+  It also handles the incoming deleted objects, to ensure they are
   fully deleted here.  In that case re_delete is true, and we do not
   use this as a signal to change the deleted state, just reinforce it.
 
@@ -3477,7 +3915,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
-       struct ldb_dn *old_dn, *new_dn;
+       struct ldb_dn *old_dn = NULL, *new_dn = NULL;
        const char *rdn_name;
        const struct ldb_val *rdn_value, *new_rdn_value;
        struct GUID guid;
@@ -3538,8 +3976,14 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                "*",
                NULL
        };
-       unsigned int i, el_count = 0;
+       static const struct ldb_val true_val = {
+               .data = discard_const_p(uint8_t, "TRUE"),
+               .length = 4
+       };
+       
+       unsigned int i;
        uint32_t dsdb_flags = 0;
+       struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
 
        if (ldb_dn_is_special(req->op.del.dn)) {
@@ -3684,31 +4128,32 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                }
        }
 
+       /* get the objects GUID from the search we just did */
+       guid = samdb_result_guid(old_msg, "objectGUID");
+
        if (deletion_state == OBJECT_NOT_DELETED) {
-               /* get the objects GUID from the search we just did */
-               guid = samdb_result_guid(old_msg, "objectGUID");
-
-               /* Add a formatted child */
-               retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                           rdn_name,
-                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                           GUID_string(tmp_ctx, &guid));
-               if (!retb) {
-                       ldb_asprintf_errstring(ldb, __location__
-                                              ": Unable to add a formatted child to dn: %s",
-                                              ldb_dn_get_linearized(new_dn));
+               struct ldb_message_element *is_deleted_el;
+
+               ret = replmd_make_deleted_child_dn(tmp_ctx,
+                                                  ldb,
+                                                  new_dn,
+                                                  rdn_name, rdn_value,
+                                                  guid);
+
+               if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
-               ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
+               ret = ldb_msg_add_value(msg, "isDeleted", &true_val,
+                                       &is_deleted_el);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               is_deleted_el->flags = LDB_FLAG_MOD_REPLACE;
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
@@ -3757,6 +4202,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
+               struct ldb_message_element *p_el;
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
@@ -3800,7 +4246,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               p_el = ldb_msg_find_element(msg,
+                                           "lastKnownParent");
+               if (p_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_operr(module);
+               }
+               p_el->flags = LDB_FLAG_MOD_REPLACE;
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
@@ -3812,7 +4264,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       p_el = ldb_msg_find_element(msg,
+                                                   "msDS-LastKnownRDN");
+                       if (p_el == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_module_operr(module);
+                       }
+                       p_el->flags = LDB_FLAG_MOD_ADD;
                }
        }
 
@@ -3841,16 +4299,21 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
-                       ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
+                       struct ldb_message_element *is_recycled_el;
+
+                       ret = ldb_msg_add_value(msg, "isRecycled", &true_val,
+                                               &is_recycled_el);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+                       is_recycled_el->flags = LDB_FLAG_MOD_REPLACE;
                }
 
+               replmd_private = talloc_get_type(ldb_module_get_private(module),
+                                                struct replmd_private);
                /* work out which of the old attributes we will be removing */
                for (i=0; i<old_msg->num_elements; i++) {
                        const struct dsdb_attribute *sa;
@@ -3864,7 +4327,8 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
-                       if (sa->linkID && (sa->linkID & 1)) {
+
+                       if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
                                  that needs to be removed. We're not
@@ -3873,8 +4337,20 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                  modify to delete the corresponding
                                  forward link
                                 */
-                               ret = replmd_delete_remove_link(module, schema, old_dn, el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               ret = replmd_delete_remove_link(module, schema,
+                                                               replmd_private,
+                                                               old_dn, &guid,
+                                                               el, sa, req);
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -3887,13 +4363,17 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
-                       }
-                       if (!sa->linkID) {
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
+                       } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
                                }
@@ -4128,14 +4608,104 @@ static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_
 }
 
 
+/*
+  form a DN for a deleted (DEL:) or conflict (CNF:) DN
+ */
+static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
+                                      struct ldb_context *ldb,
+                                      struct ldb_dn *dn,
+                                      const char *four_char_prefix,
+                                      const char *rdn_name,
+                                      const struct ldb_val *rdn_value,
+                                      struct GUID guid)
+{
+       struct ldb_val deleted_child_rdn_val;
+       struct GUID_txt_buf guid_str;
+       bool retb;
+
+       GUID_buf_string(&guid, &guid_str);
+
+       retb = ldb_dn_add_child_fmt(dn, "X=Y");
+       if (!retb) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
+       deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
+
+       /*
+        * sizeof(guid_str.buf) will always be longer than
+        * strlen(guid_str.buf) but we allocate using this and
+        * waste the trailing bytes to avoid scaring folks
+        * with memcpy() using strlen() below
+        */
+
+       deleted_child_rdn_val.data
+               = talloc_realloc(tmp_ctx, deleted_child_rdn_val.data,
+                                uint8_t,
+                                rdn_value->length + 5
+                                + sizeof(guid_str.buf));
+       if (!deleted_child_rdn_val.data) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.length =
+               rdn_value->length + 5
+               + strlen(guid_str.buf);
+
+       SMB_ASSERT(deleted_child_rdn_val.length <
+                  talloc_get_size(deleted_child_rdn_val.data));
+
+       /*
+        * talloc won't allocate more than 256MB so we can't
+        * overflow but just to be sure
+        */
+       if (deleted_child_rdn_val.length < rdn_value->length) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.data[rdn_value->length] = 0x0a;
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 1],
+              four_char_prefix, 4);
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 5],
+              guid_str.buf,
+              sizeof(guid_str.buf));
+
+       /* Now set the value into the RDN, without parsing it */
+       ldb_dn_set_component(dn, 0, rdn_name,
+                            deleted_child_rdn_val);
+
+       return LDB_SUCCESS;
+}
+
+
 /*
   form a conflict DN
  */
-static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn, struct GUID *guid)
+static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx,
+                                        struct ldb_context *ldb,
+                                        struct ldb_dn *dn,
+                                        struct GUID *guid)
 {
        const struct ldb_val *rdn_val;
        const char *rdn_name;
        struct ldb_dn *new_dn;
+       int ret;
 
        rdn_val = ldb_dn_get_rdn_val(dn);
        rdn_name = ldb_dn_get_rdn_name(dn);
@@ -4143,32 +4713,48 @@ static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn,
                return NULL;
        }
 
-       new_dn = ldb_dn_copy(mem_ctx, dn);
+       new_dn = ldb_dn_get_parent(mem_ctx, dn);
        if (!new_dn) {
                return NULL;
        }
 
-       if (!ldb_dn_remove_child_components(new_dn, 1)) {
-               return NULL;
-       }
-
-       if (!ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ACNF:%s",
-                                 rdn_name,
-                                 ldb_dn_escape_value(new_dn, *rdn_val),
-                                 GUID_string(new_dn, guid))) {
+       ret = replmd_make_prefix_child_dn(mem_ctx,
+                                         ldb, new_dn,
+                                         "CNF:",
+                                         rdn_name,
+                                         rdn_val,
+                                         *guid);
+       if (ret != LDB_SUCCESS) {
                return NULL;
        }
-
        return new_dn;
 }
 
+/*
+  form a deleted DN
+ */
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid)
+{
+       return replmd_make_prefix_child_dn(tmp_ctx,
+                                          ldb, dn,
+                                          "DEL:",
+                                          rdn_name,
+                                          rdn_value,
+                                          guid);
+}
+
 
 /*
   perform a modify operation which sets the rDN and name attributes to
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
-  resolution are propogated to other DCs
+  resolution are propagated to other DCs
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
@@ -4215,9 +4801,16 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       ret = dsdb_module_modify(ar->module, msg, DSDB_FLAG_OWN_MODULE, req);
+       /*
+        * We have to mark this as a replicated update otherwise
+        * schema_data may reject a rename in the schema partition
+        */
+
+       ret = dsdb_module_modify(ar->module, msg,
+                                DSDB_FLAG_OWN_MODULE|DSDB_FLAG_REPLICATED_UPDATE,
+                                req);
        if (ret != LDB_SUCCESS) {
-               DEBUG(0,(__location__ ": Failed to modify rDN/name of conflict DN '%s' - %s",
+               DEBUG(0,(__location__ ": Failed to modify rDN/name of DN being DRS renamed '%s' - %s",
                         ldb_dn_get_linearized(dn),
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                return ret;
@@ -4229,7 +4822,7 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
 
 failed:
        talloc_free(msg);
-       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of conflict DN '%s'",
+       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of DN being DRS renamed '%s'",
                 ldb_dn_get_linearized(dn)));
        return LDB_ERR_OPERATIONS_ERROR;
 }
@@ -4366,6 +4959,7 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -4437,7 +5031,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                 ldb_dn_get_linearized(conflict_dn)));
                        goto failed;
                }
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4462,7 +5058,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                        goto failed;
                }
 
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4538,6 +5136,9 @@ failed:
         * replication will stop with an error, but there is not much
         * else we can do.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
@@ -4637,16 +5238,22 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
-               DEBUG(4, ("DRS replication add message of %s:\n%s\n",
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_ADD,
+                                                          msg);
+               DEBUG(8, ("DRS replication add message of %s:\n%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+               DEBUG(4, ("DRS replication add DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
-
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
                                                     "isDeleted", false);
 
@@ -4658,7 +5265,8 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 
        rdn_val = ldb_dn_get_rdn_val(msg->dn);
        ret = replmd_update_rpmd_rdn_attr(ldb, msg, rdn_val, NULL,
-                                    md, ar, now, is_schema_nc);
+                                         md, ar, now, is_schema_nc,
+                                         false);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
                return replmd_replicated_request_error(ar, ret);
@@ -4754,7 +5362,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -4993,6 +5601,7 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -5057,7 +5666,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        if (rename_incoming_record) {
 
-               new_dn = replmd_conflict_dn(msg, msg->dn,
+               new_dn = replmd_conflict_dn(msg,
+                                           ldb_module_get_ctx(ar->module),
+                                           msg->dn,
                                            &ar->objs->objects[ar->index_current].object_guid);
                if (new_dn == NULL) {
                        ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
@@ -5093,7 +5704,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       new_dn = replmd_conflict_dn(tmp_ctx,
+                                   ldb_module_get_ctx(ar->module),
+                                   conflict_dn, &guid);
        if (new_dn == NULL) {
                DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
@@ -5136,8 +5749,10 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
-failed:
 
+       talloc_free(tmp_ctx);
+       return ret;
+failed:
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
@@ -5145,6 +5760,9 @@ failed:
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
 
        talloc_free(tmp_ctx);
        return ret;
@@ -5208,11 +5826,12 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(5)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(5, ("Initial DRS replication modify message of %s is:\n%s\n"
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY, msg);
+               DEBUG(8, ("Initial DRS replication modify message of %s is:\n%s\n"
                          "%s\n"
                          "%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
@@ -5226,8 +5845,15 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                                  "incoming replPropertyMetaData",
                                                  rmd)));
                talloc_free(s);
-       }
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
 
+               DEBUG(4, ("Initial DRS replication modify DN of %s is: %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
+       }
+               
        local_isDeleted = ldb_msg_find_attr_as_bool(ar->search_msg,
                                                    "isDeleted", false);
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
@@ -5388,7 +6014,8 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
 
        if (renamed) {
                ret = replmd_update_rpmd_rdn_attr(ldb, msg, new_rdn, old_rdn,
-                                                 &nmd, ar, now, is_schema_nc);
+                                                 &nmd, ar, now, is_schema_nc,
+                                                 false);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
                        return replmd_replicated_request_error(ar, ret);
@@ -5483,14 +6110,24 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(4, ("Final DRS replication modify message of %s:\n%s\n",
-                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
+               DEBUG(8, ("Final DRS replication modify message of %s:\n%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+
+               DEBUG(4, ("Final DRS replication modify DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
 
        ret = ldb_build_mod_req(&change_req,
@@ -5698,6 +6335,62 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
        return LDB_SUCCESS;
 }
 
+/**
+ * Stores the linked attributes received in the replication chunk - these get
+ * applied at the end of the transaction. We also check that each linked
+ * attribute is valid, i.e. source and target objects are known.
+ */
+static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
+{
+       int ret = LDB_SUCCESS;
+       uint32_t i;
+       struct ldb_module *module = ar->module;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct ldb_context *ldb;
+
+       ldb = ldb_module_get_ctx(module);
+
+       DEBUG(4,("linked_attributes_count=%u\n", ar->objs->linked_attributes_count));
+
+       /* save away the linked attributes for the end of the transaction */
+       for (i = 0; i < ar->objs->linked_attributes_count; i++) {
+               struct la_entry *la_entry;
+
+               if (replmd_private->la_ctx == NULL) {
+                       replmd_private->la_ctx = talloc_new(replmd_private);
+               }
+               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
+               if (la_entry == NULL) {
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
+               if (la_entry->la == NULL) {
+                       talloc_free(la_entry);
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               *la_entry->la = ar->objs->linked_attributes[i];
+               la_entry->dsdb_repl_flags = ar->objs->dsdb_repl_flags;
+
+               /* we need to steal the non-scalars so they stay
+                  around until the end of the transaction */
+               talloc_steal(la_entry->la, la_entry->la->identifier);
+               talloc_steal(la_entry->la, la_entry->la->value.blob);
+
+               ret = replmd_verify_linked_attribute(ar, la_entry);
+
+               if (ret != LDB_SUCCESS) {
+                       break;
+               }
+
+               DLIST_ADD(replmd_private->la_list, la_entry);
+       }
+
+       return ret;
+}
+
 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
 
 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
@@ -5714,7 +6407,18 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        struct GUID_txt_buf guid_str_buf;
 
        if (ar->index_current >= ar->objs->num_objects) {
-               /* done with it, go to next stage */
+
+               /*
+                * Now that we've applied all the objects, check the new linked
+                * attributes and store them (we apply them in .prepare_commit)
+                */
+               ret = replmd_store_linked_attributes(ar);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+
+               /* done applying objects, move on to the next stage */
                return replmd_replicated_uptodate_vector(ar);
        }
 
@@ -6086,7 +6790,9 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
        nrf_el->flags = LDB_FLAG_MOD_REPLACE;
 
        if (CHECK_DEBUGLVL(4)) {
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
                DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
                talloc_free(s);
        }
@@ -6192,9 +6898,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        struct replmd_replicated_request *ar;
        struct ldb_control **ctrls;
        int ret;
-       uint32_t i;
-       struct replmd_private *replmd_private =
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
        ldb = ldb_module_get_ctx(module);
 
@@ -6251,73 +6954,254 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        ar->controls = req->controls;
        req->controls = ctrls;
 
-       DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
+       return replmd_replicated_apply_next(ar);
+}
 
-       /* save away the linked attributes for the end of the
-          transaction */
-       for (i=0; i<ar->objs->linked_attributes_count; i++) {
-               struct la_entry *la_entry;
+/**
+ * Checks how to handle an missing target - either we need to fail the
+ * replication and retry with GET_TGT, ignore the link and continue, or try to
+ * add a partial link to an unknown target.
+ */
+static int replmd_allow_missing_target(struct ldb_module *module,
+                                      TALLOC_CTX *mem_ctx,
+                                      struct ldb_dn *target_dn,
+                                      struct ldb_dn *source_dn,
+                                      bool is_obj_commit,
+                                      struct GUID *guid,
+                                      uint32_t dsdb_repl_flags,
+                                      bool *ignore_link,
+                                      const char * missing_str)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       bool is_in_same_nc;
 
-               if (replmd_private->la_ctx == NULL) {
-                       replmd_private->la_ctx = talloc_new(replmd_private);
-               }
-               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
-               if (la_entry == NULL) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
-               if (la_entry->la == NULL) {
-                       talloc_free(la_entry);
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *la_entry->la = ar->objs->linked_attributes[i];
+       /*
+        * we may not be able to resolve link targets properly when
+        * dealing with subsets of objects, e.g. the source is a
+        * critical object and the target isn't
+        *
+        * TODO:
+        * When we implement Trusted Domains we need to consider
+        * whether they get treated as an incomplete replica here or not
+        */
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
 
-               /* we need to steal the non-scalars so they stay
-                  around until the end of the transaction */
-               talloc_steal(la_entry->la, la_entry->la->identifier);
-               talloc_steal(la_entry->la, la_entry->la->value.blob);
+               /*
+                * Ignore the link. We don't increase the highwater-mark in
+                * the object subset cases, so subsequent replications should
+                * resolve any missing links
+                */
+               DEBUG(2, ("%s target %s linked from %s\n", missing_str,
+                         ldb_dn_get_linearized(target_dn),
+                         ldb_dn_get_linearized(source_dn)));
+               *ignore_link = true;
+               return LDB_SUCCESS;
+       }
 
-               DLIST_ADD(replmd_private->la_list, la_entry);
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
+
+               /*
+                * target should already be up-to-date so there's no point in
+                * retrying. This could be due to bad timing, or if a target
+                * on a one-way link was deleted. We ignore the link rather
+                * than failing the replication cycle completely
+                */
+               *ignore_link = true;
+               DBG_WARNING("%s is %s but up to date. Ignoring link from %s\n",
+                           ldb_dn_get_linearized(target_dn), missing_str,
+                           ldb_dn_get_linearized(source_dn));
+               return LDB_SUCCESS;
+       }
+       
+       is_in_same_nc = dsdb_objects_have_same_nc(ldb,
+                                                 mem_ctx,
+                                                 source_dn,
+                                                 target_dn);
+       if (is_in_same_nc) {
+               /* fail the replication and retry with GET_TGT */
+               ldb_asprintf_errstring(ldb, "%s target %s GUID %s linked from %s\n",
+                                      missing_str,
+                                      ldb_dn_get_linearized(target_dn),
+                                      GUID_string(mem_ctx, guid),
+                                      ldb_dn_get_linearized(source_dn));
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       return replmd_replicated_apply_next(ar);
+       /*
+        * The target of the cross-partition link is missing. Continue
+        * and try to at least add the forward-link. This isn't great,
+        * but a partial link can be fixed by dbcheck, so it's better
+        * than dropping the link completely.
+        */
+       *ignore_link = false;
+
+       if (is_obj_commit) {
+
+               /*
+                * Only log this when we're actually committing the objects.
+                * This avoids spurious logs, i.e. if we're just verifying the
+                * received link during a join.
+                */
+               DBG_WARNING("%s cross-partition target %s linked from %s\n",
+                           missing_str, ldb_dn_get_linearized(target_dn),
+                           ldb_dn_get_linearized(source_dn));
+       }
+       
+       return LDB_SUCCESS;
 }
 
-/*
-  process one linked attribute structure
+/**
+ * Checks that the target object for a linked attribute exists.
+ * @param guid returns the target object's GUID (is returned)if it exists)
+ * @param ignore_link set to true if the linked attribute should be ignored
+ * (i.e. the target doesn't exist, but that it's OK to skip the link)
  */
-static int replmd_process_linked_attribute(struct ldb_module *module,
-                                          struct replmd_private *replmd_private,
-                                          struct la_entry *la_entry,
-                                          struct ldb_request *parent)
+static int replmd_check_target_exists(struct ldb_module *module,
+                                     struct dsdb_dn *dsdb_dn,
+                                     struct la_entry *la_entry,
+                                     struct ldb_dn *source_dn,
+                                     bool is_obj_commit,
+                                     struct GUID *guid,
+                                     bool *ignore_link)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       struct ldb_message *target_msg = NULL;
+       struct ldb_result *target_res;
        TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
-       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       const char *attrs[] = { "isDeleted", "isRecycled", NULL };
+       NTSTATUS ntstatus;
        int ret;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *dsdb_dn;
-       uint64_t seq_num = 0;
-       struct ldb_message_element *old_el;
-       WERROR status;
-       time_t t = time(NULL);
-       struct ldb_result *res;
-       struct ldb_result *target_res;
-       const char *attrs[4];
-       const char *attrs2[] = { "isDeleted", "isRecycled", NULL };
-       struct parsed_dn *pdn_list, *pdn, *next;
-       struct GUID guid = GUID_zero();
-       NTSTATUS ntstatus;
-       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
-       const struct GUID *our_invocation_id;
+       enum deletion_state target_deletion_state = OBJECT_REMOVED;
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) ? true : false;
 
-       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
-       enum deletion_state target_deletion_state = OBJECT_NOT_DELETED;
+       *ignore_link = false;
+       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, guid, "GUID");
+
+       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+
+               /*
+                * This strange behaviour (allowing a NULL/missing
+                * GUID) originally comes from:
+                *
+                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
+                * Author: Andrew Tridgell <tridge@samba.org>
+                * Date:   Mon Dec 21 21:21:55 2009 +1100
+                *
+                *  s4-drs: cope better with NULL GUIDS from DRS
+                *
+                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
+                *  need to match by DN if possible when seeing if we should update an
+                *  existing link.
+                *
+                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                */
+               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
+                                           dsdb_dn->dn, attrs,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_RECYCLED |
+                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                           NULL);
+       } else if (!NT_STATUS_IS_OK(ntstatus)) {
+               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute 0x%x blob for %s from %s",
+                                      la->attid,
+                                      ldb_dn_get_linearized(dsdb_dn->dn),
+                                      ldb_dn_get_linearized(source_dn));
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               ret = dsdb_module_search(module, tmp_ctx, &target_res,
+                                        NULL, LDB_SCOPE_SUBTREE,
+                                        attrs,
+                                        DSDB_FLAG_NEXT_MODULE |
+                                        DSDB_SEARCH_SHOW_RECYCLED |
+                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                        NULL,
+                                        "objectGUID=%s",
+                                        GUID_string(tmp_ctx, guid));
+       }
+
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "Failed to re-resolve GUID %s: %s\n",
+                                      GUID_string(tmp_ctx, guid),
+                                      ldb_errstring(ldb));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (target_res->count == 0) {
+
+               /*
+                * target object is unknown. Check whether to ignore the link,
+                * fail the replication, or add a partial link
+                */
+               ret = replmd_allow_missing_target(module, tmp_ctx, dsdb_dn->dn,
+                                                 source_dn, is_obj_commit, guid,
+                                                 la_entry->dsdb_repl_flags,
+                                                 ignore_link, "Unknown");
+
+       } else if (target_res->count != 1) {
+               ldb_asprintf_errstring(ldb, "More than one object found matching objectGUID %s\n",
+                                      GUID_string(tmp_ctx, guid));
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               struct ldb_message *target_msg = target_res->msgs[0];
+
+               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
+
+               /* Get the object's state (i.e. Not Deleted, Tombstone, etc) */
+               replmd_deletion_state(module, target_msg,
+                                     &target_deletion_state, NULL);
+
+               /*
+                * Check for deleted objects as per MS-DRSR 4.1.10.6.14
+                * ProcessLinkValue(). Link updates should not be sent for
+                * recycled and tombstone objects (deleting the links should
+                * happen when we delete the object). This probably means our
+                * copy of the target object isn't up to date.
+                */
+               if (target_deletion_state >= OBJECT_RECYCLED) {
+
+                       /*
+                        * target object is deleted. Check whether to ignore the
+                        * link, fail the replication, or add a partial link
+                        */
+                       ret = replmd_allow_missing_target(module, tmp_ctx,
+                                                         dsdb_dn->dn, source_dn,
+                                                         is_obj_commit, guid,
+                                                         la_entry->dsdb_repl_flags,
+                                                         ignore_link, "Deleted");
+               }
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
+/**
+ * Extracts the key details about the source/target object for a
+ * linked-attribute entry.
+ * This returns the following details:
+ * @param ret_attr the schema details for the linked attribute
+ * @param source_msg the search result for the source object
+ * @param target_dsdb_dn the unpacked DN info for the target object
+ */
+static int replmd_extract_la_entry_details(struct ldb_module *module,
+                                          struct la_entry *la_entry,
+                                          TALLOC_CTX *mem_ctx,
+                                          const struct dsdb_attribute **ret_attr,
+                                          struct ldb_message **source_msg,
+                                          struct dsdb_dn **target_dsdb_dn)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
+       int ret;
+       const struct dsdb_attribute *attr;
+       WERROR status;
+       struct ldb_result *res;
+       const char *attrs[4];
 
 /*
 linked_attributes[0]:
@@ -6362,263 +7246,556 @@ linked_attributes[0]:
                                       la->attid,
                                       GUID_buf_string(&la->identifier->guid,
                                                       &guid_str));
-               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       /*
+        * All attributes listed here must be dealt with in some way
+        * by replmd_process_linked_attribute() otherwise in the case
+        * of isDeleted: FALSE the modify will fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
        attrs[3] = NULL;
 
-       /* get the existing message from the db for the object with
-          this GUID, returning attribute being modified. We will then
-          use this msg as the basis for a modify call */
-       ret = dsdb_module_search(module, tmp_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
+       /*
+        * get the existing message from the db for the object with
+        * this GUID, returning attribute being modified. We will then
+        * use this msg as the basis for a modify call
+        */
+       ret = dsdb_module_search(module, mem_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
                                 DSDB_FLAG_NEXT_MODULE |
                                 DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
                                 DSDB_SEARCH_SHOW_RECYCLED |
                                 DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
                                 DSDB_SEARCH_REVEAL_INTERNALS,
-                                parent,
-                                "objectGUID=%s", GUID_string(tmp_ctx, &la->identifier->guid));
+                                NULL,
+                                "objectGUID=%s", GUID_string(mem_ctx, &la->identifier->guid));
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
        if (res->count != 1) {
                ldb_asprintf_errstring(ldb, "DRS linked attribute for GUID %s - DN not found",
-                                      GUID_string(tmp_ctx, &la->identifier->guid));
-               talloc_free(tmp_ctx);
+                                      GUID_string(mem_ctx, &la->identifier->guid));
                return LDB_ERR_NO_SUCH_OBJECT;
        }
-       msg = res->msgs[0];
+
+       *source_msg = res->msgs[0];
+
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx, la->value.blob, target_dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(res->msgs[0]->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *ret_attr = attr;
+
+       return LDB_SUCCESS;
+}
+
+/**
+ * Verifies the source and target objects are known for a linked attribute
+ */
+static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
+                                         struct la_entry *la)
+{
+       int ret = LDB_SUCCESS;
+       TALLOC_CTX *tmp_ctx = talloc_new(la);
+       struct ldb_module *module = ar->module;
+       struct ldb_message *src_msg;
+       const struct dsdb_attribute *attr;
+       struct dsdb_dn *tgt_dsdb_dn;
+       struct GUID guid = GUID_zero();
+       bool dummy;
+
+       ret = replmd_extract_la_entry_details(module, la, tmp_ctx, &attr,
+                                             &src_msg, &tgt_dsdb_dn);
 
        /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
+        * When we fail to find the source object, the error code we pass
+        * back here is really important. It flags back to the callers to
+        * retry this request with DRSUAPI_DRS_GET_ANC. This case should
+        * never happen if we're replicating from a Samba DC, but it is
+        * needed to talk to a Windows DC
         */
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_MISSING_PARENT);
+       }
 
-       replmd_deletion_state(module, msg, &deletion_state, NULL);
-
-       if (deletion_state >= OBJECT_RECYCLED) {
+       if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       /*
+        * We can skip the target object checks if we're only syncing critical
+        * objects, or we know the target is up-to-date. If either case, we
+        * still continue even if the target doesn't exist
+        */
+       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
+                                                src_msg->dn, false, &guid,
+                                                &dummy);
+       }
+
+       /*
+        * When we fail to find the target object, the error code we pass
+        * back here is really important. It flags back to the callers to
+        * retry this request with DRSUAPI_DRS_GET_TGT
+        */
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_RECYCLED_TARGET);
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
+/**
+ * Finds the current active Parsed-DN value for a single-valued linked
+ * attribute, if one exists.
+ * @param ret_pdn assigned the active Parsed-DN, or NULL if none was found
+ * @returns LDB_SUCCESS (regardless of whether a match was found), unless
+ * an error occurred
+ */
+static int replmd_get_active_singleval_link(struct ldb_module *module,
+                                           TALLOC_CTX *mem_ctx,
+                                           struct parsed_dn pdn_list[],
+                                           unsigned int count,
+                                           const struct dsdb_attribute *attr,
+                                           struct parsed_dn **ret_pdn)
+{
+       unsigned int i;
+
+       *ret_pdn = NULL;
+
+       if (!(attr->ldb_schema_attribute->flags & LDB_ATTR_FLAG_SINGLE_VALUE)) {
+
+               /* nothing to do for multi-valued linked attributes */
                return LDB_SUCCESS;
        }
 
-       old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
-       if (old_el == NULL) {
-               ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
-               if (ret != LDB_SUCCESS) {
-                       ldb_module_oom(module);
-                       talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+       for (i = 0; i < count; i++) {
+               int ret = LDB_SUCCESS;
+               struct parsed_dn *pdn = &pdn_list[i];
+
+               /* skip any inactive links */
+               if (dsdb_dn_is_deleted_val(pdn->v)) {
+                       continue;
+               }
+
+               /* we've found an active value for this attribute */
+               *ret_pdn = pdn;
+
+               if (pdn->dsdb_dn == NULL) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+                       ret = really_parse_trusted_dn(mem_ctx, ldb, pdn,
+                                                     attr->syntax->ldap_oid);
                }
-       } else {
-               old_el->flags = LDB_FLAG_MOD_REPLACE;
-       }
 
-       /* parse the existing links */
-       ret = get_parsed_dns(module, tmp_ctx, old_el, &pdn_list, attr->syntax->ldap_oid, parent);
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
-       /* get our invocationId */
-       our_invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!our_invocation_id) {
-               ldb_debug_set(ldb, LDB_DEBUG_ERROR, __location__ ": unable to find invocationId\n");
-               talloc_free(tmp_ctx);
+       /* no active link found */
+       return LDB_SUCCESS;
+}
+
+/**
+ * @returns true if the replication linked attribute info is newer than we
+ * already have in our DB
+ * @param pdn the existing linked attribute info in our DB
+ * @param la the new linked attribute info received during replication
+ */
+static bool replmd_link_update_is_newer(struct parsed_dn *pdn,
+                                       struct drsuapi_DsReplicaLinkedAttribute *la)
+{
+       /* see if this update is newer than what we have already */
+       struct GUID invocation_id = GUID_zero();
+       uint32_t version = 0;
+       NTTIME change_time = 0;
+
+       if (pdn == NULL) {
+
+               /* no existing info so update is newer */
+               return true;
+       }
+
+       dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
+       dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
+       dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
+
+       return replmd_update_is_newer(&invocation_id,
+                                     &la->meta_data.originating_invocation_id,
+                                     version,
+                                     la->meta_data.version,
+                                     change_time,
+                                     la->meta_data.originating_change_time);
+}
+
+/**
+ * Marks an existing linked attribute value as deleted in the DB
+ * @param pdn the parsed-DN of the target-value to delete
+ */
+static int replmd_delete_link_value(struct ldb_module *module,
+                                   struct replmd_private *replmd_private,
+                                   TALLOC_CTX *mem_ctx,
+                                   struct ldb_dn *src_obj_dn,
+                                   const struct dsdb_schema *schema,
+                                   const struct dsdb_attribute *attr,
+                                   uint64_t seq_num,
+                                   bool is_active,
+                                   struct GUID *target_guid,
+                                   struct dsdb_dn *target_dsdb_dn,
+                                   struct ldb_val *output_val)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       time_t t;
+       NTTIME now;
+       const struct GUID *invocation_id = NULL;
+       int ret;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (invocation_id == NULL) {
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = replmd_check_upgrade_links(ldb, pdn_list, old_el->num_values,
-                                        old_el, our_invocation_id,
-                                        attr->syntax->ldap_oid);
+       /* if the existing link is active, remove its backlink */
+       if (is_active) {
+
+               ret = replmd_add_backlink(module, replmd_private, schema,
+                                         src_obj_dn, target_guid, false,
+                                         attr, NULL);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       /* mark the existing value as deleted */
+       ret = replmd_update_la_val(mem_ctx, output_val, target_dsdb_dn,
+                                  target_dsdb_dn, invocation_id, seq_num,
+                                  seq_num, now, true);
+       return ret;
+}
+
+/**
+ * Checks for a conflict in single-valued link attributes, and tries to
+ * resolve the problem if possible.
+ *
+ * Single-valued links should only ever have one active value. If we already
+ * have an active link value, and during replication we receive an active link
+ * value for a different target DN, then we need to resolve this inconsistency
+ * and determine which value should be active. If the received info is better/
+ * newer than the existing link attribute, then we need to set our existing
+ * link as deleted. If the received info is worse/older, then we should continue
+ * to add it, but set it as an inactive link.
+ *
+ * Note that this is a corner-case that is unlikely to happen (but if it does
+ * happen, we don't want it to break replication completely).
+ *
+ * @param pdn_being_modified the parsed DN corresponding to the received link
+ * target (note this is NULL if the link does not already exist in our DB)
+ * @param pdn_list all the source object's Parsed-DNs for this attribute, i.e.
+ * any existing active or inactive values for the attribute in our DB.
+ * @param dsdb_dn the target DN for the received link attribute
+ * @param add_as_inactive gets set to true if the received link is worse than
+ * the existing link - it should still be added, but as an inactive link.
+ */
+static int replmd_check_singleval_la_conflict(struct ldb_module *module,
+                                             struct replmd_private *replmd_private,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct ldb_dn *src_obj_dn,
+                                             struct drsuapi_DsReplicaLinkedAttribute *la,
+                                             struct dsdb_dn *dsdb_dn,
+                                             struct parsed_dn *pdn_being_modified,
+                                             struct parsed_dn *pdn_list,
+                                             struct ldb_message_element *old_el,
+                                             const struct dsdb_schema *schema,
+                                             const struct dsdb_attribute *attr,
+                                             uint64_t seq_num,
+                                             bool *add_as_inactive)
+{
+       struct parsed_dn *active_pdn = NULL;
+       bool update_is_newer = false;
+       int ret;
+
+       /*
+        * check if there's a conflict for single-valued links, i.e. an active
+        * linked attribute already exists, but it has a different target value
+        */
+       ret = replmd_get_active_singleval_link(module, mem_ctx, pdn_list,
+                                              old_el->num_values, attr,
+                                              &active_pdn);
+
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
-       status = dsdb_dn_la_from_blob(ldb, attr, schema, tmp_ctx, la->value.blob, &dsdb_dn);
-       if (!W_ERROR_IS_OK(status)) {
-               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
-                                      old_el->name, ldb_dn_get_linearized(msg->dn), win_errstr(status));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
+       /*
+        * If no active value exists (or the received info is for the currently
+        * active value), then no conflict exists
+        */
+       if (active_pdn == NULL || active_pdn == pdn_being_modified) {
+               return LDB_SUCCESS;
        }
 
-       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid, "GUID");
-       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+       DBG_WARNING("Link conflict for %s attribute on %s\n",
+                   attr->lDAPDisplayName, ldb_dn_get_linearized(src_obj_dn));
+
+       /* Work out how to resolve the conflict based on which info is better */
+       update_is_newer = replmd_link_update_is_newer(active_pdn, la);
+
+       if (update_is_newer) {
+               DBG_WARNING("Using received value %s, over existing target %s\n",
+                           ldb_dn_get_linearized(dsdb_dn->dn),
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn));
+
                /*
-                * This strange behaviour (allowing a NULL/missing
-                * GUID) originally comes from:
-                *
-                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
-                * Author: Andrew Tridgell <tridge@samba.org>
-                * Date:   Mon Dec 21 21:21:55 2009 +1100
-                *
-                *  s4-drs: cope better with NULL GUIDS from DRS
-                *
-                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
-                *  need to match by DN if possible when seeing if we should update an
-                *  existing link.
-                *
-                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                * Delete our existing active link. The received info will then
+                * be added (through normal link processing) as the active value
                 */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              src_obj_dn, schema, attr,
+                                              seq_num, true, &active_pdn->guid,
+                                              active_pdn->dsdb_dn,
+                                              active_pdn->v);
 
-               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
-                                           dsdb_dn->dn, attrs2,
-                                           DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_RECYCLED |
-                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                           parent);
-       } else if (!NT_STATUS_IS_OK(ntstatus)) {
-               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute blob for %s on %s from %s",
-                                      old_el->name,
-                                      ldb_dn_get_linearized(dsdb_dn->dn),
-                                      ldb_dn_get_linearized(msg->dn));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
        } else {
-               ret = dsdb_module_search(module, tmp_ctx, &target_res,
-                                        NULL, LDB_SCOPE_SUBTREE,
-                                        attrs2,
-                                        DSDB_FLAG_NEXT_MODULE |
-                                        DSDB_SEARCH_SHOW_RECYCLED |
-                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                        parent,
-                                        "objectGUID=%s",
-                                        GUID_string(tmp_ctx, &guid));
+               DBG_WARNING("Using existing target %s, over received value %s\n",
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn),
+                           ldb_dn_get_linearized(dsdb_dn->dn));
+
+               /*
+                * we want to keep our existing active link and add the
+                * received link as inactive
+                */
+               *add_as_inactive = true;
        }
 
+       return LDB_SUCCESS;
+}
+
+/*
+  process one linked attribute structure
+ */
+static int replmd_process_linked_attribute(struct ldb_module *module,
+                                          struct replmd_private *replmd_private,
+                                          struct la_entry *la_entry,
+                                          struct ldb_request *parent)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_message *msg;
+       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       int ret;
+       const struct dsdb_attribute *attr;
+       struct dsdb_dn *dsdb_dn;
+       uint64_t seq_num = 0;
+       struct ldb_message_element *old_el;
+       time_t t = time(NULL);
+       struct parsed_dn *pdn_list, *pdn, *next;
+       struct GUID guid = GUID_zero();
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
+       bool ignore_link;
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct dsdb_dn *old_dsdb_dn = NULL;
+       struct ldb_val *val_to_update = NULL;
+       bool add_as_inactive = false;
+
+       /*
+        * get the attribute being modified, the search result for the source object,
+        * and the target object's DN details
+        */
+       ret = replmd_extract_la_entry_details(module, la_entry, tmp_ctx, &attr,
+                                             &msg, &dsdb_dn);
+
        if (ret != LDB_SUCCESS) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "Failed to re-resolve GUID %s: %s\n",
-                                      GUID_string(tmp_ctx, &guid),
-                                      ldb_errstring(ldb_module_get_ctx(module)));
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       if (target_res->count == 0) {
-               DEBUG(2,(__location__ ": WARNING: Failed to re-resolve GUID %s - using %s\n",
-                        GUID_string(tmp_ctx, &guid),
-                        ldb_dn_get_linearized(dsdb_dn->dn)));
-       } else if (target_res->count != 1) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "More than one object found matching objectGUID %s\n",
-                                      GUID_string(tmp_ctx, &guid));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       } else {
-               target_msg = target_res->msgs[0];
-               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
-       }
-
        /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
+        * Check for deleted objects per MS-DRSR 4.1.10.6.14
         * ProcessLinkValue, because link updates are not applied to
         * recycled and tombstone objects.  We don't have to delete
         * any existing link, that should have happened when the
         * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
         */
-       replmd_deletion_state(module, target_msg,
-                             &target_deletion_state, NULL);
+       replmd_deletion_state(module, msg, &deletion_state, NULL);
 
-       if (target_deletion_state >= OBJECT_RECYCLED) {
+       if (deletion_state >= OBJECT_RECYCLED) {
                talloc_free(tmp_ctx);
                return LDB_SUCCESS;
        }
 
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
+       old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
+       if (old_el == NULL) {
+               ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
+               if (ret != LDB_SUCCESS) {
+                       ldb_module_oom(module);
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+       } else {
+               old_el->flags = LDB_FLAG_MOD_REPLACE;
+       }
+
+       /* parse the existing links */
+       ret = get_parsed_dns_trusted(module, replmd_private, tmp_ctx, old_el, &pdn_list,
+                                    attr->syntax->ldap_oid, parent);
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       ret = replmd_check_target_exists(module, dsdb_dn, la_entry, msg->dn,
+                                        true, &guid, &ignore_link);
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       /*
+        * there are some cases where the target object doesn't exist, but it's
+        * OK to ignore the linked attribute
+        */
+       if (ignore_link) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        /* see if this link already exists */
        ret = parsed_dn_find(ldb, pdn_list, old_el->num_values,
                             &guid,
                             dsdb_dn->dn,
+                            dsdb_dn->extra_part, 0,
                             &pdn, &next,
-                            attr->syntax->ldap_oid);
+                            attr->syntax->ldap_oid,
+                            true);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
+       if (!replmd_link_update_is_newer(pdn, la)) {
+               DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
+                        old_el->name, ldb_dn_get_linearized(msg->dn),
+                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
+       }
 
-       if (pdn != NULL) {
-               /* see if this update is newer than what we have already */
-               struct GUID invocation_id = GUID_zero();
-               uint32_t version = 0;
-               uint32_t originating_usn = 0;
-               NTTIME change_time = 0;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
-
-               dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &originating_usn, "RMD_ORIGINATING_USN");
-               dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
-
-               if (!replmd_update_is_newer(&invocation_id,
-                                           &la->meta_data.originating_invocation_id,
-                                           version,
-                                           la->meta_data.version,
-                                           change_time,
-                                           la->meta_data.originating_change_time)) {
-                       DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
-                                old_el->name, ldb_dn_get_linearized(msg->dn),
-                                GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
+       /* get a seq_num for this change */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       /*
+        * check for single-valued link conflicts, i.e. an active linked
+        * attribute already exists, but it has a different target value
+        */
+       if (active) {
+               ret = replmd_check_singleval_la_conflict(module, replmd_private,
+                                                        tmp_ctx, msg->dn, la,
+                                                        dsdb_dn, pdn, pdn_list,
+                                                        old_el, schema, attr,
+                                                        seq_num,
+                                                        &add_as_inactive);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
+       }
+
+       if (pdn != NULL) {
+               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, &la->identifier->guid,
-                                                 &guid, false, attr, true);
+                                                 schema, 
+                                                 msg->dn,
+                                                 &pdn->guid, false, attr,
+                                                 parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                }
 
-               ret = replmd_update_la_val(tmp_ctx, pdn->v, dsdb_dn, pdn->dsdb_dn,
-                                          &la->meta_data.originating_invocation_id,
-                                          la->meta_data.originating_usn, seq_num,
-                                          la->meta_data.originating_change_time,
-                                          la->meta_data.version,
-                                          !active);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+               val_to_update = pdn->v;
+               old_dsdb_dn = pdn->dsdb_dn;
 
-               if (active) {
-                       /* add the new backlink */
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, &la->identifier->guid,
-                                                 &guid, true, attr, true);
-                       if (ret != LDB_SUCCESS) {
+       } else {
+               unsigned offset;
+
+               /*
+                * We know where the new one needs to be, from the *next
+                * pointer into pdn_list.
+                */
+               if (next == NULL) {
+                       offset = old_el->num_values;
+               } else {
+                       if (next->dsdb_dn == NULL) {
+                               ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
+                                                             attr->syntax->ldap_oid);
+                               if (ret != LDB_SUCCESS) {
+                                       return ret;
+                               }
+                       }
+                       offset = next - pdn_list;
+                       if (offset > old_el->num_values) {
                                talloc_free(tmp_ctx);
-                               return ret;
+                               return LDB_ERR_OPERATIONS_ERROR;
                        }
                }
-       } else {
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
 
                old_el->values = talloc_realloc(msg->elements, old_el->values,
                                                struct ldb_val, old_el->num_values+1);
@@ -6626,27 +7803,54 @@ linked_attributes[0]:
                        ldb_module_oom(module);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
+
+               if (offset != old_el->num_values) {
+                       memmove(&old_el->values[offset + 1], &old_el->values[offset],
+                               (old_el->num_values - offset) * sizeof(old_el->values[0]));
+               }
+
                old_el->num_values++;
 
-               ret = replmd_build_la_val(tmp_ctx, &old_el->values[old_el->num_values-1], dsdb_dn,
-                                         &la->meta_data.originating_invocation_id,
-                                         la->meta_data.originating_usn, seq_num,
-                                         la->meta_data.originating_change_time,
-                                         la->meta_data.version,
-                                         !active);
+               val_to_update = &old_el->values[offset];
+               old_dsdb_dn = NULL;
+       }
+
+       /* set the link attribute's value to the info that was received */
+       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+                               &la->meta_data.originating_invocation_id,
+                               la->meta_data.originating_usn, seq_num,
+                               la->meta_data.originating_change_time,
+                               la->meta_data.version,
+                               !active);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (add_as_inactive) {
+
+               /* Set the new link as inactive/deleted to avoid conflicts */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              msg->dn, schema, attr, seq_num,
+                                              false, &guid, dsdb_dn,
+                                              val_to_update);
+
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               if (active) {
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, &la->identifier->guid,
-                                                 &guid, true, attr, true);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
+       } else if (active) {
+
+               /* if the new link is active, then add the new backlink */
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema,
+                                         msg->dn,
+                                         &guid, true, attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
                }
        }
 
@@ -6684,7 +7888,10 @@ linked_attributes[0]:
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s'\n%s\n",
                          ldb_errstring(ldb),
-                         ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
+                         ldb_ldif_message_redacted_string(ldb,
+                                                          tmp_ctx,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg));
                talloc_free(tmp_ctx);
                return ret;
        }
@@ -6740,12 +7947,15 @@ static int replmd_prepare_commit(struct ldb_module *module)
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
        struct la_entry *la, *prev;
-       struct la_backlink *bl;
        int ret;
 
-       /* walk the list backwards, to do the first entry first, as we
+       /*
+        * Walk the list of linked attributes from DRS replication.
+        *
+        * We walk backwards, to do the first entry first, as we
         * added the entries with DLIST_ADD() which puts them at the
-        * start of the list */
+        * start of the list
+        */
        for (la = DLIST_TAIL(replmd_private->la_list); la; la=prev) {
                prev = DLIST_PREV(la);
                DLIST_REMOVE(replmd_private->la_list, la);
@@ -6757,16 +7967,6 @@ static int replmd_prepare_commit(struct ldb_module *module)
                }
        }
 
-       /* process our backlink list, creating and deleting backlinks
-          as necessary */
-       for (bl=replmd_private->la_backlinks; bl; bl=bl->next) {
-               ret = replmd_process_backlink(module, bl, NULL);
-               if (ret != LDB_SUCCESS) {
-                       replmd_txn_cleanup(replmd_private);
-                       return ret;
-               }
-       }
-
        replmd_txn_cleanup(replmd_private);
 
        /* possibly change @REPLCHANGED */