s4:repl_meta_data: pass down struct replmd_replicated_request to replmd_modify_la_add()
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 8f123de7dbe35cc6b181211d9458ff299fab7915..d5d9c4f8eeb171cafc5ca4a37bbc2f80b5bc9cb3 100644 (file)
@@ -54,6 +54,9 @@
 #undef DBGC_CLASS
 #define DBGC_CLASS            DBGC_DRS_REPL
 
+/* the RMD_VERSION for linked attributes starts from 1 */
+#define RMD_VERSION_INITIAL   1
+
 /*
  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
  * Deleted Objects Container
@@ -119,6 +122,17 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      const char *ldap_oid);
 static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                                          struct la_entry *la);
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted);
+
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid);
 
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
@@ -563,9 +577,41 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               struct GUID_txt_buf guid_txt;
+               struct ldb_message *msg = NULL;
+               char *s = NULL;
+
+               if (ac->apply_mode == false) {
+                       DBG_NOTICE("Originating update failure. Error is: %s\n",
+                                  ldb_strerror(ares->error));
+                       return ldb_module_done(ac->req, controls,
+                                              ares->response, ares->error);
+               }
+
+               msg = ac->objs->objects[ac->index_current].msg;
+               /*
+                * Set at DBG_NOTICE as once these start to happe, they
+                * will happen a lot until resolved, due to repeated
+                * replication.  The caller will probably print the
+                * ldb error string anyway.
+                */
+               DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
+                          ldb_dn_get_linearized(msg->dn),
+                          ldb_strerror(ares->error));
+
+               s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
+                                                    ac,
+                                                    LDB_CHANGETYPE_ADD,
+                                                    msg);
+
+               DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
+                        ac->search_msg == NULL ? "ADD" : "MODIFY",
+                        GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
+                                        &guid_txt),
+                        s);
+               talloc_free(s);
                return ldb_module_done(ac->req, controls,
-                                       ares->response, ares->error);
+                                      ares->response, ares->error);
        }
 
        if (ares->type != LDB_REPLY_DONE) {
@@ -890,8 +936,8 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
 }
 
 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime);
 
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
 
@@ -899,6 +945,10 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                          struct ldb_message_element *el, struct parsed_dn **pdn,
                          const char *ldap_oid, struct ldb_request *parent);
 
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn);
+
 /*
   fix up linked attributes in replmd_add.
   This involves setting up the right meta-data in extended DN
@@ -940,6 +990,12 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, pdn);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        new_values = talloc_array(tmp_ctx, struct ldb_val, el->num_values);
        if (new_values == NULL) {
                ldb_module_oom(module);
@@ -949,20 +1005,9 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
 
        for (i = 0; i < el->num_values; i++) {
                struct parsed_dn *p = &pdn[i];
-               if (i > 0 && parsed_dn_compare(p, &pdn[i - 1]) == 0) {
-                       ldb_asprintf_errstring(ldb,
-                                       "Linked attribute %s has "
-                                       "multiple identical values", el->name);
-                       talloc_free(tmp_ctx);
-                       if (ldb_attr_cmp(el->name, "member") == 0) {
-                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
-                       } else {
-                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                       }
-               }
                ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
                                          &ac->our_invocation_id,
-                                         ac->seq_num, ac->seq_num, now, 0, false);
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -1994,8 +2039,12 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
-                               ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
-                                                      ldb_dn_get_linearized(dn));
+                               char *dn_str = NULL;
+                               dn_str = ldb_dn_get_extended_linearized(mem_ctx,
+                                                                       (dn), 1);
+                               ldb_asprintf_errstring(ldb,
+                                               "Unable to find GUID for DN %s\n",
+                                               dn_str);
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
@@ -2093,6 +2142,37 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+/*
+   Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
+   otherwise an error code. For compatibility the error code differs depending
+   on whether or not the attribute is "member".
+
+   As always, the parsed_dn list is assumed to be sorted.
+ */
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn)
+{
+       unsigned int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       for (i = 1; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
+                       ldb_asprintf_errstring(ldb,
+                                              "Linked attribute %s has "
+                                              "multiple identical values",
+                                              el->name);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
+       }
+       return LDB_SUCCESS;
+}
+
 /*
   build a new extended DN, including all meta data fields
 
@@ -2104,85 +2184,20 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
   RMD_LOCAL_USN       = local_usn
   RMD_VERSION         = version
  */
-static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
+static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
+                              struct dsdb_dn *dsdb_dn,
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime)
 {
-       struct ldb_dn *dn = dsdb_dn->dn;
-       const char *tstring, *usn_string, *flags_string;
-       struct ldb_val tval;
-       struct ldb_val iid;
-       struct ldb_val usnv, local_usnv;
-       struct ldb_val vers, flagsv;
-       NTSTATUS status;
-       int ret;
-       const char *dnstring;
-       char *vstring;
-       uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
-
-       tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
-       if (!tstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       tval = data_blob_string_const(tstring);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       usnv = data_blob_string_const(usn_string);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       local_usnv = data_blob_string_const(usn_string);
-
-       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
-       if (!vstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       vers = data_blob_string_const(vstring);
-
-       status = GUID_to_ndr_blob(invocation_id, dn, &iid);
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
-       if (!flags_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       flagsv = data_blob_string_const(flags_string);
-
-       ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
-       if (ret != LDB_SUCCESS) return ret;
-
-       dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
-       if (dnstring == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       *v = data_blob_string_const(dnstring);
-
-       return LDB_SUCCESS;
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
+                                local_usn, local_usn, nttime,
+                                RMD_VERSION_INITIAL, false);
 }
 
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                                uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted);
+                               bool deleted);
 
 /*
   check if any links need upgrading from w2k format
@@ -2236,7 +2251,7 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                /* it's an old one that needs upgrading */
                ret = replmd_update_la_val(el->values, dns[i].v,
                                           dns[i].dsdb_dn, dns[i].dsdb_dn,
-                                          invocation_id, 1, 1, 0, 0, false);
+                                          invocation_id, 1, 1, 0, false);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -2258,14 +2273,14 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
 }
 
 /*
-  update an extended DN, including all meta data fields
+  Sets the value for a linked attribute, including all meta data fields
 
   see replmd_build_la_val for value names
  */
-static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted)
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
        const char *tstring, *usn_string, *flags_string;
@@ -2273,8 +2288,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        struct ldb_val iid;
        struct ldb_val usnv, local_usnv;
        struct ldb_val vers, flagsv;
-       const struct ldb_val *old_addtime;
-       uint32_t old_version;
+       const struct ldb_val *old_addtime = NULL;
        NTSTATUS status;
        int ret;
        const char *dnstring;
@@ -2314,7 +2328,10 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        if (ret != LDB_SUCCESS) return ret;
 
        /* get the ADDTIME from the original */
-       old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
+       if (old_dsdb_dn != NULL) {
+               old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
+                                                           "RMD_ADDTIME");
+       }
        if (old_addtime == NULL) {
                old_addtime = &tval;
        }
@@ -2339,12 +2356,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
        if (ret != LDB_SUCCESS) return ret;
 
-       /* increase the version by 1 */
-       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
-       if (NT_STATUS_IS_OK(status) && old_version >= version) {
-               version = old_version+1;
-       }
-       vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
+       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
        vers = data_blob_string_const(vstring);
        ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
        if (ret != LDB_SUCCESS) return ret;
@@ -2358,17 +2370,43 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        return LDB_SUCCESS;
 }
 
+/**
+ * Updates the value for a linked attribute, including all meta data fields
+ */
+static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                               bool deleted)
+{
+       uint32_t old_version;
+       uint32_t version = RMD_VERSION_INITIAL;
+       NTSTATUS status;
+
+       /*
+        * We're updating the linked attribute locally, so increase the version
+        * by 1 so that other DCs will see the change when it gets replicated out
+        */
+       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
+                                            "RMD_VERSION");
+
+       if (NT_STATUS_IS_OK(status)) {
+               version = old_version + 1;
+       }
+
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
+                                usn, local_usn, nttime, version, deleted);
+}
+
 /*
   handle adding a linked attribute
  */
 static int replmd_modify_la_add(struct ldb_module *module,
                                struct replmd_private *replmd_private,
-                               const struct dsdb_schema *schema,
+                               struct replmd_replicated_request *ac,
                                struct ldb_message *msg,
                                struct ldb_message_element *el,
                                struct ldb_message_element *old_el,
                                const struct dsdb_attribute *schema_attr,
-                               uint64_t seq_num,
                                time_t t,
                                struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
@@ -2381,17 +2419,10 @@ static int replmd_modify_la_add(struct ldb_module *module,
        unsigned old_num_values = old_el ? old_el->num_values : 0;
        unsigned num_values = 0;
        unsigned max_num_values;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /* get the DNs to be added, fully parsed.
         *
         * We need full parsing because they came off the wire and we don't
@@ -2487,15 +2518,16 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, exact->v,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, 0, false);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
 
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &dns[i].guid, 
                                                  true,
@@ -2537,15 +2569,14 @@ static int replmd_modify_la_add(struct ldb_module *module,
                }
 
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &dns[i].guid,
                                          true, schema_attr,
                                          parent);
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
-                                         dns[i].dsdb_dn, invocation_id,
-                                         seq_num, seq_num,
-                                         now, 0, false);
+                                         dns[i].dsdb_dn, &ac->our_invocation_id,
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2695,7 +2726,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
                                                   invocation_id, seq_num,
-                                                  seq_num, now, 0, true);
+                                                  seq_num, now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2792,7 +2823,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
                                           invocation_id, seq_num, seq_num,
-                                          now, 0, true);
+                                          now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2810,11 +2841,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
@@ -2904,6 +2947,12 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, dns);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns,
                             ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
@@ -2961,7 +3010,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                           old_p->dsdb_dn,
                                                           invocation_id,
                                                           seq_num, seq_num,
-                                                          now, 0, true);
+                                                          now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
                                        return ret;
@@ -2995,7 +3044,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                   old_p->dsdb_dn,
                                                   invocation_id,
                                                   seq_num, seq_num,
-                                                  now, 0, false);
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3027,8 +3076,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
                                                  invocation_id,
-                                                 seq_num, seq_num,
-                                                 now, 0, false);
+                                                 seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3065,8 +3113,9 @@ static int replmd_modify_la_replace(struct ldb_module *module,
  */
 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                                               struct replmd_private *replmd_private,
+                                              struct replmd_replicated_request *ac,
                                               struct ldb_message *msg,
-                                              uint64_t seq_num, time_t t,
+                                              time_t t,
                                               struct ldb_request *parent)
 {
        struct ldb_result *res;
@@ -3075,8 +3124,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *old_msg;
 
-       const struct dsdb_schema *schema;
-
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
                 * Nothing special is required for modifying or vanishing links
@@ -3110,10 +3157,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       schema = dsdb_get_schema(ldb, res);
-       if (!schema) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
        old_msg = res->msgs[0];
 
@@ -3122,7 +3165,7 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                struct ldb_message_element *old_el, *new_el;
                unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
-                       = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
+                       = dsdb_attribute_by_lDAPDisplayName(ac->schema, el->name);
                if (!schema_attr) {
                        ldb_asprintf_errstring(ldb,
                                               "%s: attribute %s is not a valid attribute in schema",
@@ -3133,9 +3176,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
@@ -3145,22 +3201,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
-                                                      schema, msg, el, old_el,
-                                                      schema_attr, seq_num, t,
+                                                      ac->schema, msg, el, old_el,
+                                                      schema_attr, ac->seq_num, t,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
-                                                     schema, msg, el, old_el,
-                                                     schema_attr, seq_num, t,
+                                                     ac->schema, msg, el, old_el,
+                                                     schema_attr, ac->seq_num, t,
                                                      old_msg->dn,
                                                      parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
-                                                  schema, msg, el, old_el,
-                                                  schema_attr, seq_num, t,
+                                                  ac, msg, el, old_el,
+                                                  schema_attr, t,
                                                   old_msg->dn,
                                                   parent);
                        break;
@@ -3264,6 +3320,8 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
+       struct ldb_control *fix_dn_name_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3289,6 +3347,102 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
+       fix_dn_name_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_NAME);
+       if (fix_dn_name_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 2) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_DELETE) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].flags != LDB_FLAG_MOD_ADD) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (ldb_attr_cmp(req->op.mod.message->elements[0].name,
+                                req->op.mod.message->elements[1].name) != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format == DSDB_INVALID_DN) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               /*
+                * If we are run from dbcheck and we are not updating
+                * a link (as these would need to be sorted and so
+                * can't go via such a simple update, then do not
+                * trigger replicated updates and a new USN from this
+                * change, it wasn't a real change, just a new
+                * (correct) string DN
+                */
+
+               fix_dn_name_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3334,7 +3488,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        }
 
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
-                                                 msg, ac->seq_num, t, req);
+                                                 ac, msg, t, req);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
@@ -3719,7 +3873,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3812,7 +3967,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
-       struct ldb_dn *old_dn, *new_dn;
+       struct ldb_dn *old_dn = NULL, *new_dn = NULL;
        const char *rdn_name;
        const struct ldb_val *rdn_value, *new_rdn_value;
        struct GUID guid;
@@ -3873,7 +4028,12 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                "*",
                NULL
        };
-       unsigned int i, el_count = 0;
+       static const struct ldb_val true_val = {
+               .data = discard_const_p(uint8_t, "TRUE"),
+               .length = 4
+       };
+       
+       unsigned int i;
        uint32_t dsdb_flags = 0;
        struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
@@ -4024,27 +4184,28 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
-               /* Add a formatted child */
-               retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                           rdn_name,
-                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                           GUID_string(tmp_ctx, &guid));
-               if (!retb) {
-                       ldb_asprintf_errstring(ldb, __location__
-                                              ": Unable to add a formatted child to dn: %s",
-                                              ldb_dn_get_linearized(new_dn));
+               struct ldb_message_element *is_deleted_el;
+
+               ret = replmd_make_deleted_child_dn(tmp_ctx,
+                                                  ldb,
+                                                  new_dn,
+                                                  rdn_name, rdn_value,
+                                                  guid);
+
+               if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
-               ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
+               ret = ldb_msg_add_value(msg, "isDeleted", &true_val,
+                                       &is_deleted_el);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               is_deleted_el->flags = LDB_FLAG_MOD_REPLACE;
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
@@ -4093,6 +4254,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
+               struct ldb_message_element *p_el;
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
@@ -4136,7 +4298,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               p_el = ldb_msg_find_element(msg,
+                                           "lastKnownParent");
+               if (p_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_operr(module);
+               }
+               p_el->flags = LDB_FLAG_MOD_REPLACE;
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
@@ -4148,7 +4316,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       p_el = ldb_msg_find_element(msg,
+                                                   "msDS-LastKnownRDN");
+                       if (p_el == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_module_operr(module);
+                       }
+                       p_el->flags = LDB_FLAG_MOD_ADD;
                }
        }
 
@@ -4177,14 +4351,17 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
-                       ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
+                       struct ldb_message_element *is_recycled_el;
+
+                       ret = ldb_msg_add_value(msg, "isRecycled", &true_val,
+                                               &is_recycled_el);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+                       is_recycled_el->flags = LDB_FLAG_MOD_REPLACE;
                }
 
                replmd_private = talloc_get_type(ldb_module_get_private(module),
@@ -4202,6 +4379,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
@@ -4215,7 +4393,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4228,11 +4415,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4468,14 +4660,108 @@ static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_
 }
 
 
+/*
+  form a DN for a deleted (DEL:) or conflict (CNF:) DN
+ */
+static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
+                                      struct ldb_context *ldb,
+                                      struct ldb_dn *dn,
+                                      const char *four_char_prefix,
+                                      const char *rdn_name,
+                                      const struct ldb_val *rdn_value,
+                                      struct GUID guid)
+{
+       struct ldb_val deleted_child_rdn_val;
+       struct GUID_txt_buf guid_str;
+       int ret;
+       bool retb;
+
+       GUID_buf_string(&guid, &guid_str);
+
+       retb = ldb_dn_add_child_fmt(dn, "X=Y");
+       if (!retb) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
+       deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
+
+       /*
+        * sizeof(guid_str.buf) will always be longer than
+        * strlen(guid_str.buf) but we allocate using this and
+        * waste the trailing bytes to avoid scaring folks
+        * with memcpy() using strlen() below
+        */
+
+       deleted_child_rdn_val.data
+               = talloc_realloc(tmp_ctx, deleted_child_rdn_val.data,
+                                uint8_t,
+                                rdn_value->length + 5
+                                + sizeof(guid_str.buf));
+       if (!deleted_child_rdn_val.data) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.length =
+               rdn_value->length + 5
+               + strlen(guid_str.buf);
+
+       SMB_ASSERT(deleted_child_rdn_val.length <
+                  talloc_get_size(deleted_child_rdn_val.data));
+
+       /*
+        * talloc won't allocate more than 256MB so we can't
+        * overflow but just to be sure
+        */
+       if (deleted_child_rdn_val.length < rdn_value->length) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.data[rdn_value->length] = 0x0a;
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 1],
+              four_char_prefix, 4);
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 5],
+              guid_str.buf,
+              sizeof(guid_str.buf));
+
+       /* Now set the value into the RDN, without parsing it */
+       ret = ldb_dn_set_component(
+               dn,
+               0,
+               rdn_name,
+               deleted_child_rdn_val);
+
+       return ret;
+}
+
+
 /*
   form a conflict DN
  */
-static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn, struct GUID *guid)
+static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx,
+                                        struct ldb_context *ldb,
+                                        struct ldb_dn *dn,
+                                        struct GUID *guid)
 {
        const struct ldb_val *rdn_val;
        const char *rdn_name;
        struct ldb_dn *new_dn;
+       int ret;
 
        rdn_val = ldb_dn_get_rdn_val(dn);
        rdn_name = ldb_dn_get_rdn_name(dn);
@@ -4483,32 +4769,48 @@ static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn,
                return NULL;
        }
 
-       new_dn = ldb_dn_copy(mem_ctx, dn);
+       new_dn = ldb_dn_get_parent(mem_ctx, dn);
        if (!new_dn) {
                return NULL;
        }
 
-       if (!ldb_dn_remove_child_components(new_dn, 1)) {
-               return NULL;
-       }
-
-       if (!ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ACNF:%s",
-                                 rdn_name,
-                                 ldb_dn_escape_value(new_dn, *rdn_val),
-                                 GUID_string(new_dn, guid))) {
+       ret = replmd_make_prefix_child_dn(mem_ctx,
+                                         ldb, new_dn,
+                                         "CNF:",
+                                         rdn_name,
+                                         rdn_val,
+                                         *guid);
+       if (ret != LDB_SUCCESS) {
                return NULL;
        }
-
        return new_dn;
 }
 
+/*
+  form a deleted DN
+ */
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid)
+{
+       return replmd_make_prefix_child_dn(tmp_ctx,
+                                          ldb, dn,
+                                          "DEL:",
+                                          rdn_name,
+                                          rdn_value,
+                                          guid);
+}
+
 
 /*
   perform a modify operation which sets the rDN and name attributes to
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
-  resolution are propogated to other DCs
+  resolution are propagated to other DCs
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
@@ -4713,6 +5015,7 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -4784,7 +5087,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                 ldb_dn_get_linearized(conflict_dn)));
                        goto failed;
                }
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4809,7 +5114,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                        goto failed;
                }
 
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4885,6 +5192,9 @@ failed:
         * replication will stop with an error, but there is not much
         * else we can do.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
@@ -5108,7 +5418,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -5347,6 +5657,7 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -5411,7 +5722,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        if (rename_incoming_record) {
 
-               new_dn = replmd_conflict_dn(msg, msg->dn,
+               new_dn = replmd_conflict_dn(msg,
+                                           ldb_module_get_ctx(ar->module),
+                                           msg->dn,
                                            &ar->objs->objects[ar->index_current].object_guid);
                if (new_dn == NULL) {
                        ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
@@ -5447,7 +5760,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       new_dn = replmd_conflict_dn(tmp_ctx,
+                                   ldb_module_get_ctx(ar->module),
+                                   conflict_dn, &guid);
        if (new_dn == NULL) {
                DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
@@ -5490,8 +5805,10 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
-failed:
 
+       talloc_free(tmp_ctx);
+       return ret;
+failed:
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
@@ -5499,6 +5816,9 @@ failed:
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
 
        talloc_free(tmp_ctx);
        return ret;
@@ -6694,43 +7014,96 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
 }
 
 /**
- * Returns True if the source and target DNs both have the same naming context,
- * i.e. they're both in the same partition.
+ * Checks how to handle an missing target - either we need to fail the
+ * replication and retry with GET_TGT, ignore the link and continue, or try to
+ * add a partial link to an unknown target.
  */
-static bool replmd_objects_have_same_nc(struct ldb_context *ldb,
-                                       TALLOC_CTX *mem_ctx,
-                                       struct ldb_dn *source_dn,
-                                       struct ldb_dn *target_dn)
+static int replmd_allow_missing_target(struct ldb_module *module,
+                                      TALLOC_CTX *mem_ctx,
+                                      struct ldb_dn *target_dn,
+                                      struct ldb_dn *source_dn,
+                                      bool is_obj_commit,
+                                      struct GUID *guid,
+                                      uint32_t dsdb_repl_flags,
+                                      bool *ignore_link,
+                                      const char * missing_str)
 {
-       TALLOC_CTX *tmp_ctx;
-       struct ldb_dn *source_nc;
-       struct ldb_dn *target_nc;
-       int ret;
-       bool same_nc = true;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       bool is_in_same_nc;
 
-       tmp_ctx = talloc_new(mem_ctx);
+       /*
+        * we may not be able to resolve link targets properly when
+        * dealing with subsets of objects, e.g. the source is a
+        * critical object and the target isn't
+        *
+        * TODO:
+        * When we implement Trusted Domains we need to consider
+        * whether they get treated as an incomplete replica here or not
+        */
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
 
-       ret = dsdb_find_nc_root(ldb, tmp_ctx, source_dn, &source_nc);
-       if (ret != LDB_SUCCESS) {
-               DBG_ERR("Failed to find base DN for source %s\n",
-                       ldb_dn_get_linearized(source_dn));
-               talloc_free(tmp_ctx);
-               return true;
+               /*
+                * Ignore the link. We don't increase the highwater-mark in
+                * the object subset cases, so subsequent replications should
+                * resolve any missing links
+                */
+               DEBUG(2, ("%s target %s linked from %s\n", missing_str,
+                         ldb_dn_get_linearized(target_dn),
+                         ldb_dn_get_linearized(source_dn)));
+               *ignore_link = true;
+               return LDB_SUCCESS;
        }
 
-       ret = dsdb_find_nc_root(ldb, tmp_ctx, target_dn, &target_nc);
-       if (ret != LDB_SUCCESS) {
-               DBG_ERR("Failed to find base DN for target %s\n",
-                       ldb_dn_get_linearized(target_dn));
-               talloc_free(tmp_ctx);
-               return true;
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
+
+               /*
+                * target should already be up-to-date so there's no point in
+                * retrying. This could be due to bad timing, or if a target
+                * on a one-way link was deleted. We ignore the link rather
+                * than failing the replication cycle completely
+                */
+               *ignore_link = true;
+               DBG_WARNING("%s is %s but up to date. Ignoring link from %s\n",
+                           ldb_dn_get_linearized(target_dn), missing_str,
+                           ldb_dn_get_linearized(source_dn));
+               return LDB_SUCCESS;
+       }
+       
+       is_in_same_nc = dsdb_objects_have_same_nc(ldb,
+                                                 mem_ctx,
+                                                 source_dn,
+                                                 target_dn);
+       if (is_in_same_nc) {
+               /* fail the replication and retry with GET_TGT */
+               ldb_asprintf_errstring(ldb, "%s target %s GUID %s linked from %s\n",
+                                      missing_str,
+                                      ldb_dn_get_linearized(target_dn),
+                                      GUID_string(mem_ctx, guid),
+                                      ldb_dn_get_linearized(source_dn));
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       same_nc = (ldb_dn_compare(source_nc, target_nc) == 0);
+       /*
+        * The target of the cross-partition link is missing. Continue
+        * and try to at least add the forward-link. This isn't great,
+        * but a partial link can be fixed by dbcheck, so it's better
+        * than dropping the link completely.
+        */
+       *ignore_link = false;
 
-       talloc_free(tmp_ctx);
+       if (is_obj_commit) {
 
-       return same_nc;
+               /*
+                * Only log this when we're actually committing the objects.
+                * This avoids spurious logs, i.e. if we're just verifying the
+                * received link during a join.
+                */
+               DBG_WARNING("%s cross-partition target %s linked from %s\n",
+                           missing_str, ldb_dn_get_linearized(target_dn),
+                           ldb_dn_get_linearized(source_dn));
+       }
+       
+       return LDB_SUCCESS;
 }
 
 /**
@@ -6743,6 +7116,7 @@ static int replmd_check_target_exists(struct ldb_module *module,
                                      struct dsdb_dn *dsdb_dn,
                                      struct la_entry *la_entry,
                                      struct ldb_dn *source_dn,
+                                     bool is_obj_commit,
                                      struct GUID *guid,
                                      bool *ignore_link)
 {
@@ -6815,46 +7189,14 @@ static int replmd_check_target_exists(struct ldb_module *module,
        if (target_res->count == 0) {
 
                /*
-                * we may not be able to resolve link targets properly when
-                * dealing with subsets of objects, e.g. the source is a
-                * critical object and the target isn't
-                *
-                * TODO:
-                * When we implement Trusted Domains we need to consider
-                * whether they get treated as an incomplete replica here or not
+                * target object is unknown. Check whether to ignore the link,
+                * fail the replication, or add a partial link
                 */
-               if (la_entry->dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
+               ret = replmd_allow_missing_target(module, tmp_ctx, dsdb_dn->dn,
+                                                 source_dn, is_obj_commit, guid,
+                                                 la_entry->dsdb_repl_flags,
+                                                 ignore_link, "Unknown");
 
-                       /*
-                        * We don't increase the highwater-mark in the object
-                        * subset cases, so subsequent replications should
-                        * resolve any missing links
-                        */
-                       DEBUG(2,(__location__
-                                ": Failed to find target %s linked from %s\n",
-                                ldb_dn_get_linearized(dsdb_dn->dn),
-                                ldb_dn_get_linearized(source_dn)));
-                       *ignore_link = true;
-
-               } else if (replmd_objects_have_same_nc(ldb, tmp_ctx, source_dn,
-                                                      dsdb_dn->dn)) {
-                       ldb_asprintf_errstring(ldb, "Unknown target %s GUID %s linked from %s\n",
-                                              ldb_dn_get_linearized(dsdb_dn->dn),
-                                              GUID_string(tmp_ctx, guid),
-                                              ldb_dn_get_linearized(source_dn));
-                       ret = LDB_ERR_NO_SUCH_OBJECT;
-               } else {
-
-                       /*
-                        * The target of the cross-partition link is missing.
-                        * Continue and try to at least add the forward-link.
-                        * This isn't great, but if we can add a partial link
-                        * then it's better than nothing.
-                        */
-                       DEBUG(2,("Failed to resolve cross-partition link between %s and %s\n",
-                                ldb_dn_get_linearized(source_dn),
-                                ldb_dn_get_linearized(dsdb_dn->dn)));
-               }
        } else if (target_res->count != 1) {
                ldb_asprintf_errstring(ldb, "More than one object found matching objectGUID %s\n",
                                       GUID_string(tmp_ctx, guid));
@@ -6877,26 +7219,15 @@ static int replmd_check_target_exists(struct ldb_module *module,
                 */
                if (target_deletion_state >= OBJECT_RECYCLED) {
 
-                       if (la_entry->dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
-
-                               /*
-                                * target should already be uptodate so there's no
-                                * point retrying - it's probably just bad timing
-                                */
-                               *ignore_link = true;
-                               DEBUG(0, ("%s is deleted but up to date. "
-                                         "Ignoring link from %s\n",
-                                         ldb_dn_get_linearized(dsdb_dn->dn),
-                                         ldb_dn_get_linearized(source_dn)));
-
-                       } else {
-                               ldb_asprintf_errstring(ldb,
-                                                      "Deleted target %s GUID %s linked from %s",
-                                                      ldb_dn_get_linearized(dsdb_dn->dn),
-                                                      GUID_string(tmp_ctx, guid),
-                                                      ldb_dn_get_linearized(source_dn));
-                               ret = LDB_ERR_NO_SUCH_OBJECT;
-                       }
+                       /*
+                        * target object is deleted. Check whether to ignore the
+                        * link, fail the replication, or add a partial link
+                        */
+                       ret = replmd_allow_missing_target(module, tmp_ctx,
+                                                         dsdb_dn->dn, source_dn,
+                                                         is_obj_commit, guid,
+                                                         la_entry->dsdb_repl_flags,
+                                                         ignore_link, "Deleted");
                }
        }
 
@@ -6974,6 +7305,19 @@ linked_attributes[0]:
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       /*
+        * All attributes listed here must be dealt with in some way
+        * by replmd_process_linked_attribute() otherwise in the case
+        * of isDeleted: FALSE the modify will fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
@@ -7052,8 +7396,18 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                return ret;
        }
 
-       ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
-                                        src_msg->dn, &guid, &dummy);
+       /*
+        * We can skip the target object checks if we're only syncing critical
+        * objects, or we know the target is up-to-date. If either case, we
+        * still continue even if the target doesn't exist
+        */
+       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
+                                                src_msg->dn, false, &guid,
+                                                &dummy);
+       }
 
        /*
         * When we fail to find the target object, the error code we pass
@@ -7068,6 +7422,236 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
        return ret;
 }
 
+/**
+ * Finds the current active Parsed-DN value for a single-valued linked
+ * attribute, if one exists.
+ * @param ret_pdn assigned the active Parsed-DN, or NULL if none was found
+ * @returns LDB_SUCCESS (regardless of whether a match was found), unless
+ * an error occurred
+ */
+static int replmd_get_active_singleval_link(struct ldb_module *module,
+                                           TALLOC_CTX *mem_ctx,
+                                           struct parsed_dn pdn_list[],
+                                           unsigned int count,
+                                           const struct dsdb_attribute *attr,
+                                           struct parsed_dn **ret_pdn)
+{
+       unsigned int i;
+
+       *ret_pdn = NULL;
+
+       if (!(attr->ldb_schema_attribute->flags & LDB_ATTR_FLAG_SINGLE_VALUE)) {
+
+               /* nothing to do for multi-valued linked attributes */
+               return LDB_SUCCESS;
+       }
+
+       for (i = 0; i < count; i++) {
+               int ret = LDB_SUCCESS;
+               struct parsed_dn *pdn = &pdn_list[i];
+
+               /* skip any inactive links */
+               if (dsdb_dn_is_deleted_val(pdn->v)) {
+                       continue;
+               }
+
+               /* we've found an active value for this attribute */
+               *ret_pdn = pdn;
+
+               if (pdn->dsdb_dn == NULL) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+                       ret = really_parse_trusted_dn(mem_ctx, ldb, pdn,
+                                                     attr->syntax->ldap_oid);
+               }
+
+               return ret;
+       }
+
+       /* no active link found */
+       return LDB_SUCCESS;
+}
+
+/**
+ * @returns true if the replication linked attribute info is newer than we
+ * already have in our DB
+ * @param pdn the existing linked attribute info in our DB
+ * @param la the new linked attribute info received during replication
+ */
+static bool replmd_link_update_is_newer(struct parsed_dn *pdn,
+                                       struct drsuapi_DsReplicaLinkedAttribute *la)
+{
+       /* see if this update is newer than what we have already */
+       struct GUID invocation_id = GUID_zero();
+       uint32_t version = 0;
+       NTTIME change_time = 0;
+
+       if (pdn == NULL) {
+
+               /* no existing info so update is newer */
+               return true;
+       }
+
+       dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
+       dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
+       dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
+
+       return replmd_update_is_newer(&invocation_id,
+                                     &la->meta_data.originating_invocation_id,
+                                     version,
+                                     la->meta_data.version,
+                                     change_time,
+                                     la->meta_data.originating_change_time);
+}
+
+/**
+ * Marks an existing linked attribute value as deleted in the DB
+ * @param pdn the parsed-DN of the target-value to delete
+ */
+static int replmd_delete_link_value(struct ldb_module *module,
+                                   struct replmd_private *replmd_private,
+                                   TALLOC_CTX *mem_ctx,
+                                   struct ldb_dn *src_obj_dn,
+                                   const struct dsdb_schema *schema,
+                                   const struct dsdb_attribute *attr,
+                                   uint64_t seq_num,
+                                   bool is_active,
+                                   struct GUID *target_guid,
+                                   struct dsdb_dn *target_dsdb_dn,
+                                   struct ldb_val *output_val)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       time_t t;
+       NTTIME now;
+       const struct GUID *invocation_id = NULL;
+       int ret;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (invocation_id == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if the existing link is active, remove its backlink */
+       if (is_active) {
+
+               ret = replmd_add_backlink(module, replmd_private, schema,
+                                         src_obj_dn, target_guid, false,
+                                         attr, NULL);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       /* mark the existing value as deleted */
+       ret = replmd_update_la_val(mem_ctx, output_val, target_dsdb_dn,
+                                  target_dsdb_dn, invocation_id, seq_num,
+                                  seq_num, now, true);
+       return ret;
+}
+
+/**
+ * Checks for a conflict in single-valued link attributes, and tries to
+ * resolve the problem if possible.
+ *
+ * Single-valued links should only ever have one active value. If we already
+ * have an active link value, and during replication we receive an active link
+ * value for a different target DN, then we need to resolve this inconsistency
+ * and determine which value should be active. If the received info is better/
+ * newer than the existing link attribute, then we need to set our existing
+ * link as deleted. If the received info is worse/older, then we should continue
+ * to add it, but set it as an inactive link.
+ *
+ * Note that this is a corner-case that is unlikely to happen (but if it does
+ * happen, we don't want it to break replication completely).
+ *
+ * @param pdn_being_modified the parsed DN corresponding to the received link
+ * target (note this is NULL if the link does not already exist in our DB)
+ * @param pdn_list all the source object's Parsed-DNs for this attribute, i.e.
+ * any existing active or inactive values for the attribute in our DB.
+ * @param dsdb_dn the target DN for the received link attribute
+ * @param add_as_inactive gets set to true if the received link is worse than
+ * the existing link - it should still be added, but as an inactive link.
+ */
+static int replmd_check_singleval_la_conflict(struct ldb_module *module,
+                                             struct replmd_private *replmd_private,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct ldb_dn *src_obj_dn,
+                                             struct drsuapi_DsReplicaLinkedAttribute *la,
+                                             struct dsdb_dn *dsdb_dn,
+                                             struct parsed_dn *pdn_being_modified,
+                                             struct parsed_dn *pdn_list,
+                                             struct ldb_message_element *old_el,
+                                             const struct dsdb_schema *schema,
+                                             const struct dsdb_attribute *attr,
+                                             uint64_t seq_num,
+                                             bool *add_as_inactive)
+{
+       struct parsed_dn *active_pdn = NULL;
+       bool update_is_newer = false;
+       int ret;
+
+       /*
+        * check if there's a conflict for single-valued links, i.e. an active
+        * linked attribute already exists, but it has a different target value
+        */
+       ret = replmd_get_active_singleval_link(module, mem_ctx, pdn_list,
+                                              old_el->num_values, attr,
+                                              &active_pdn);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * If no active value exists (or the received info is for the currently
+        * active value), then no conflict exists
+        */
+       if (active_pdn == NULL || active_pdn == pdn_being_modified) {
+               return LDB_SUCCESS;
+       }
+
+       DBG_WARNING("Link conflict for %s attribute on %s\n",
+                   attr->lDAPDisplayName, ldb_dn_get_linearized(src_obj_dn));
+
+       /* Work out how to resolve the conflict based on which info is better */
+       update_is_newer = replmd_link_update_is_newer(active_pdn, la);
+
+       if (update_is_newer) {
+               DBG_WARNING("Using received value %s, over existing target %s\n",
+                           ldb_dn_get_linearized(dsdb_dn->dn),
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn));
+
+               /*
+                * Delete our existing active link. The received info will then
+                * be added (through normal link processing) as the active value
+                */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              src_obj_dn, schema, attr,
+                                              seq_num, true, &active_pdn->guid,
+                                              active_pdn->dsdb_dn,
+                                              active_pdn->v);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       } else {
+               DBG_WARNING("Using existing target %s, over received value %s\n",
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn),
+                           ldb_dn_get_linearized(dsdb_dn->dn));
+
+               /*
+                * we want to keep our existing active link and add the
+                * received link as inactive
+                */
+               *add_as_inactive = true;
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   process one linked attribute structure
  */
@@ -7092,6 +7676,9 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
        bool ignore_link;
        enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct dsdb_dn *old_dsdb_dn = NULL;
+       struct ldb_val *val_to_update = NULL;
+       bool add_as_inactive = false;
 
        /*
         * get the attribute being modified, the search result for the source object,
@@ -7111,6 +7698,9 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         * recycled and tombstone objects.  We don't have to delete
         * any existing link, that should have happened when the
         * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
         */
        replmd_deletion_state(module, msg, &deletion_state, NULL);
 
@@ -7119,6 +7709,24 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                return LDB_SUCCESS;
        }
 
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
@@ -7141,7 +7749,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        ret = replmd_check_target_exists(module, dsdb_dn, la_entry, msg->dn,
-                                        &guid, &ignore_link);
+                                        true, &guid, &ignore_link);
 
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
@@ -7170,46 +7778,47 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                return ret;
        }
 
+       if (!replmd_link_update_is_newer(pdn, la)) {
+               DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
+                        old_el->name, ldb_dn_get_linearized(msg->dn),
+                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
+       }
 
-       if (pdn != NULL) {
-               /* see if this update is newer than what we have already */
-               struct GUID invocation_id = GUID_zero();
-               uint32_t version = 0;
-               uint32_t originating_usn = 0;
-               NTTIME change_time = 0;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
-
-               dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &originating_usn, "RMD_ORIGINATING_USN");
-               dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
-
-               if (!replmd_update_is_newer(&invocation_id,
-                                           &la->meta_data.originating_invocation_id,
-                                           version,
-                                           la->meta_data.version,
-                                           change_time,
-                                           la->meta_data.originating_change_time)) {
-                       DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
-                                old_el->name, ldb_dn_get_linearized(msg->dn),
-                                GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
+       /* get a seq_num for this change */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       /*
+        * check for single-valued link conflicts, i.e. an active linked
+        * attribute already exists, but it has a different target value
+        */
+       if (active) {
+               ret = replmd_check_singleval_la_conflict(module, replmd_private,
+                                                        tmp_ctx, msg->dn, la,
+                                                        dsdb_dn, pdn, pdn_list,
+                                                        old_el, schema, attr,
+                                                        seq_num,
+                                                        &add_as_inactive);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
+       }
+
+       if (pdn != NULL) {
+               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
                        ret = replmd_add_backlink(module, replmd_private,
                                                  schema, 
                                                  msg->dn,
-                                                 &guid, false, attr,
+                                                 &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
@@ -7217,37 +7826,12 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        }
                }
 
-               ret = replmd_update_la_val(tmp_ctx, pdn->v, dsdb_dn, pdn->dsdb_dn,
-                                          &la->meta_data.originating_invocation_id,
-                                          la->meta_data.originating_usn, seq_num,
-                                          la->meta_data.originating_change_time,
-                                          la->meta_data.version,
-                                          !active);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+               val_to_update = pdn->v;
+               old_dsdb_dn = pdn->dsdb_dn;
 
-               if (active) {
-                       /* add the new backlink */
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-               }
        } else {
                unsigned offset;
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+
                /*
                 * We know where the new one needs to be, from the *next
                 * pointer into pdn_list.
@@ -7283,27 +7867,46 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
 
                old_el->num_values++;
 
-               ret = replmd_build_la_val(tmp_ctx, &old_el->values[offset], dsdb_dn,
-                                         &la->meta_data.originating_invocation_id,
-                                         la->meta_data.originating_usn, seq_num,
-                                         la->meta_data.originating_change_time,
-                                         la->meta_data.version,
-                                         !active);
+               val_to_update = &old_el->values[offset];
+               old_dsdb_dn = NULL;
+       }
+
+       /* set the link attribute's value to the info that was received */
+       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+                               &la->meta_data.originating_invocation_id,
+                               la->meta_data.originating_usn, seq_num,
+                               la->meta_data.originating_change_time,
+                               la->meta_data.version,
+                               !active);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (add_as_inactive) {
+
+               /* Set the new link as inactive/deleted to avoid conflicts */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              msg->dn, schema, attr, seq_num,
+                                              false, &guid, dsdb_dn,
+                                              val_to_update);
+
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               if (active) {
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
+       } else if (active) {
+
+               /* if the new link is active, then add the new backlink */
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema,
+                                         msg->dn,
+                                         &guid, true, attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
                }
        }