repl_meta_data: fix linked attribute corruption on databases with unsorted links...
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 8f123de7dbe35cc6b181211d9458ff299fab7915..7646f942fcaec837e50d9a187bc7c7bf7020c625 100644 (file)
@@ -54,6 +54,9 @@
 #undef DBGC_CLASS
 #define DBGC_CLASS            DBGC_DRS_REPL
 
+/* the RMD_VERSION for linked attributes starts from 1 */
+#define RMD_VERSION_INITIAL   1
+
 /*
  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
  * Deleted Objects Container
@@ -119,6 +122,17 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      const char *ldap_oid);
 static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                                          struct la_entry *la);
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted);
+
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid);
 
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
@@ -563,9 +577,41 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               struct GUID_txt_buf guid_txt;
+               struct ldb_message *msg = NULL;
+               char *s = NULL;
+
+               if (ac->apply_mode == false) {
+                       DBG_NOTICE("Originating update failure. Error is: %s\n",
+                                  ldb_strerror(ares->error));
+                       return ldb_module_done(ac->req, controls,
+                                              ares->response, ares->error);
+               }
+
+               msg = ac->objs->objects[ac->index_current].msg;
+               /*
+                * Set at DBG_NOTICE as once these start to happe, they
+                * will happen a lot until resolved, due to repeated
+                * replication.  The caller will probably print the
+                * ldb error string anyway.
+                */
+               DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
+                          ldb_dn_get_linearized(msg->dn),
+                          ldb_strerror(ares->error));
+
+               s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
+                                                    ac,
+                                                    LDB_CHANGETYPE_ADD,
+                                                    msg);
+
+               DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
+                        ac->search_msg == NULL ? "ADD" : "MODIFY",
+                        GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
+                                        &guid_txt),
+                        s);
+               talloc_free(s);
                return ldb_module_done(ac->req, controls,
-                                       ares->response, ares->error);
+                                      ares->response, ares->error);
        }
 
        if (ares->type != LDB_REPLY_DONE) {
@@ -890,8 +936,8 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
 }
 
 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime);
 
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
 
@@ -899,6 +945,10 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                          struct ldb_message_element *el, struct parsed_dn **pdn,
                          const char *ldap_oid, struct ldb_request *parent);
 
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn);
+
 /*
   fix up linked attributes in replmd_add.
   This involves setting up the right meta-data in extended DN
@@ -940,6 +990,12 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, pdn);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        new_values = talloc_array(tmp_ctx, struct ldb_val, el->num_values);
        if (new_values == NULL) {
                ldb_module_oom(module);
@@ -949,20 +1005,9 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
 
        for (i = 0; i < el->num_values; i++) {
                struct parsed_dn *p = &pdn[i];
-               if (i > 0 && parsed_dn_compare(p, &pdn[i - 1]) == 0) {
-                       ldb_asprintf_errstring(ldb,
-                                       "Linked attribute %s has "
-                                       "multiple identical values", el->name);
-                       talloc_free(tmp_ctx);
-                       if (ldb_attr_cmp(el->name, "member") == 0) {
-                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
-                       } else {
-                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
-                       }
-               }
                ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
                                          &ac->our_invocation_id,
-                                         ac->seq_num, ac->seq_num, now, 0, false);
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2093,6 +2138,37 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+/*
+   Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
+   otherwise an error code. For compatibility the error code differs depending
+   on whether or not the attribute is "member".
+
+   As always, the parsed_dn list is assumed to be sorted.
+ */
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn)
+{
+       unsigned int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       for (i = 1; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
+                       ldb_asprintf_errstring(ldb,
+                                              "Linked attribute %s has "
+                                              "multiple identical values",
+                                              el->name);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
+       }
+       return LDB_SUCCESS;
+}
+
 /*
   build a new extended DN, including all meta data fields
 
@@ -2104,85 +2180,20 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
   RMD_LOCAL_USN       = local_usn
   RMD_VERSION         = version
  */
-static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
+static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
+                              struct dsdb_dn *dsdb_dn,
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime)
 {
-       struct ldb_dn *dn = dsdb_dn->dn;
-       const char *tstring, *usn_string, *flags_string;
-       struct ldb_val tval;
-       struct ldb_val iid;
-       struct ldb_val usnv, local_usnv;
-       struct ldb_val vers, flagsv;
-       NTSTATUS status;
-       int ret;
-       const char *dnstring;
-       char *vstring;
-       uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
-
-       tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
-       if (!tstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       tval = data_blob_string_const(tstring);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       usnv = data_blob_string_const(usn_string);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       local_usnv = data_blob_string_const(usn_string);
-
-       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
-       if (!vstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       vers = data_blob_string_const(vstring);
-
-       status = GUID_to_ndr_blob(invocation_id, dn, &iid);
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
-       if (!flags_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       flagsv = data_blob_string_const(flags_string);
-
-       ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
-       if (ret != LDB_SUCCESS) return ret;
-
-       dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
-       if (dnstring == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       *v = data_blob_string_const(dnstring);
-
-       return LDB_SUCCESS;
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
+                                local_usn, local_usn, nttime,
+                                RMD_VERSION_INITIAL, false);
 }
 
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                                uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted);
+                               bool deleted);
 
 /*
   check if any links need upgrading from w2k format
@@ -2236,7 +2247,7 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                /* it's an old one that needs upgrading */
                ret = replmd_update_la_val(el->values, dns[i].v,
                                           dns[i].dsdb_dn, dns[i].dsdb_dn,
-                                          invocation_id, 1, 1, 0, 0, false);
+                                          invocation_id, 1, 1, 0, false);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -2258,14 +2269,14 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
 }
 
 /*
-  update an extended DN, including all meta data fields
+  Sets the value for a linked attribute, including all meta data fields
 
   see replmd_build_la_val for value names
  */
-static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted)
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
        const char *tstring, *usn_string, *flags_string;
@@ -2273,8 +2284,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        struct ldb_val iid;
        struct ldb_val usnv, local_usnv;
        struct ldb_val vers, flagsv;
-       const struct ldb_val *old_addtime;
-       uint32_t old_version;
+       const struct ldb_val *old_addtime = NULL;
        NTSTATUS status;
        int ret;
        const char *dnstring;
@@ -2314,7 +2324,10 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        if (ret != LDB_SUCCESS) return ret;
 
        /* get the ADDTIME from the original */
-       old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
+       if (old_dsdb_dn != NULL) {
+               old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
+                                                           "RMD_ADDTIME");
+       }
        if (old_addtime == NULL) {
                old_addtime = &tval;
        }
@@ -2339,12 +2352,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
        if (ret != LDB_SUCCESS) return ret;
 
-       /* increase the version by 1 */
-       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
-       if (NT_STATUS_IS_OK(status) && old_version >= version) {
-               version = old_version+1;
-       }
-       vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
+       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
        vers = data_blob_string_const(vstring);
        ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
        if (ret != LDB_SUCCESS) return ret;
@@ -2358,6 +2366,33 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        return LDB_SUCCESS;
 }
 
+/**
+ * Updates the value for a linked attribute, including all meta data fields
+ */
+static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                               bool deleted)
+{
+       uint32_t old_version;
+       uint32_t version = RMD_VERSION_INITIAL;
+       NTSTATUS status;
+
+       /*
+        * We're updating the linked attribute locally, so increase the version
+        * by 1 so that other DCs will see the change when it gets replicated out
+        */
+       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
+                                            "RMD_VERSION");
+
+       if (NT_STATUS_IS_OK(status)) {
+               version = old_version + 1;
+       }
+
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
+                                usn, local_usn, nttime, version, deleted);
+}
+
 /*
   handle adding a linked attribute
  */
@@ -2488,7 +2523,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
                                                   invocation_id, seq_num,
-                                                  seq_num, now, 0, false);
+                                                  seq_num, now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2544,8 +2579,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
                                          dns[i].dsdb_dn, invocation_id,
-                                         seq_num, seq_num,
-                                         now, 0, false);
+                                         seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2695,7 +2729,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
                                                   invocation_id, seq_num,
-                                                  seq_num, now, 0, true);
+                                                  seq_num, now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2792,7 +2826,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
                                           invocation_id, seq_num, seq_num,
-                                          now, 0, true);
+                                          now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2810,11 +2844,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
@@ -2904,6 +2950,12 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, dns);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns,
                             ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
@@ -2961,7 +3013,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                           old_p->dsdb_dn,
                                                           invocation_id,
                                                           seq_num, seq_num,
-                                                          now, 0, true);
+                                                          now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
                                        return ret;
@@ -2995,7 +3047,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                   old_p->dsdb_dn,
                                                   invocation_id,
                                                   seq_num, seq_num,
-                                                  now, 0, false);
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3027,8 +3079,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
                                                  invocation_id,
-                                                 seq_num, seq_num,
-                                                 now, 0, false);
+                                                 seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3133,9 +3184,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
@@ -3264,6 +3328,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3289,6 +3354,39 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3719,7 +3817,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3812,7 +3911,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
-       struct ldb_dn *old_dn, *new_dn;
+       struct ldb_dn *old_dn = NULL, *new_dn = NULL;
        const char *rdn_name;
        const struct ldb_val *rdn_value, *new_rdn_value;
        struct GUID guid;
@@ -4024,17 +4123,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
-               /* Add a formatted child */
-               retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                           rdn_name,
-                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                           GUID_string(tmp_ctx, &guid));
-               if (!retb) {
-                       ldb_asprintf_errstring(ldb, __location__
-                                              ": Unable to add a formatted child to dn: %s",
-                                              ldb_dn_get_linearized(new_dn));
+
+               ret = replmd_make_deleted_child_dn(tmp_ctx,
+                                                  ldb,
+                                                  new_dn,
+                                                  rdn_name, rdn_value,
+                                                  guid);
+
+               if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
                ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
@@ -4202,6 +4300,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
@@ -4215,7 +4314,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4228,11 +4336,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4468,14 +4581,104 @@ static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_
 }
 
 
+/*
+  form a DN for a deleted (DEL:) or conflict (CNF:) DN
+ */
+static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
+                                      struct ldb_context *ldb,
+                                      struct ldb_dn *dn,
+                                      const char *four_char_prefix,
+                                      const char *rdn_name,
+                                      const struct ldb_val *rdn_value,
+                                      struct GUID guid)
+{
+       struct ldb_val deleted_child_rdn_val;
+       struct GUID_txt_buf guid_str;
+       bool retb;
+
+       GUID_buf_string(&guid, &guid_str);
+
+       retb = ldb_dn_add_child_fmt(dn, "X=Y");
+       if (!retb) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
+       deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
+
+       /*
+        * sizeof(guid_str.buf) will always be longer than
+        * strlen(guid_str.buf) but we allocate using this and
+        * waste the trailing bytes to avoid scaring folks
+        * with memcpy() using strlen() below
+        */
+
+       deleted_child_rdn_val.data
+               = talloc_realloc(tmp_ctx, deleted_child_rdn_val.data,
+                                uint8_t,
+                                rdn_value->length + 5
+                                + sizeof(guid_str.buf));
+       if (!deleted_child_rdn_val.data) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.length =
+               rdn_value->length + 5
+               + strlen(guid_str.buf);
+
+       SMB_ASSERT(deleted_child_rdn_val.length <
+                  talloc_get_size(deleted_child_rdn_val.data));
+
+       /*
+        * talloc won't allocate more than 256MB so we can't
+        * overflow but just to be sure
+        */
+       if (deleted_child_rdn_val.length < rdn_value->length) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.data[rdn_value->length] = 0x0a;
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 1],
+              four_char_prefix, 4);
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 5],
+              guid_str.buf,
+              sizeof(guid_str.buf));
+
+       /* Now set the value into the RDN, without parsing it */
+       ldb_dn_set_component(dn, 0, rdn_name,
+                            deleted_child_rdn_val);
+
+       return LDB_SUCCESS;
+}
+
+
 /*
   form a conflict DN
  */
-static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn, struct GUID *guid)
+static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx,
+                                        struct ldb_context *ldb,
+                                        struct ldb_dn *dn,
+                                        struct GUID *guid)
 {
        const struct ldb_val *rdn_val;
        const char *rdn_name;
        struct ldb_dn *new_dn;
+       int ret;
 
        rdn_val = ldb_dn_get_rdn_val(dn);
        rdn_name = ldb_dn_get_rdn_name(dn);
@@ -4483,25 +4686,41 @@ static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn,
                return NULL;
        }
 
-       new_dn = ldb_dn_copy(mem_ctx, dn);
+       new_dn = ldb_dn_get_parent(mem_ctx, dn);
        if (!new_dn) {
                return NULL;
        }
 
-       if (!ldb_dn_remove_child_components(new_dn, 1)) {
-               return NULL;
-       }
-
-       if (!ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ACNF:%s",
-                                 rdn_name,
-                                 ldb_dn_escape_value(new_dn, *rdn_val),
-                                 GUID_string(new_dn, guid))) {
+       ret = replmd_make_prefix_child_dn(mem_ctx,
+                                         ldb, new_dn,
+                                         "CNF:",
+                                         rdn_name,
+                                         rdn_val,
+                                         *guid);
+       if (ret != LDB_SUCCESS) {
                return NULL;
        }
-
        return new_dn;
 }
 
+/*
+  form a deleted DN
+ */
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid)
+{
+       return replmd_make_prefix_child_dn(tmp_ctx,
+                                          ldb, dn,
+                                          "DEL:",
+                                          rdn_name,
+                                          rdn_value,
+                                          guid);
+}
+
 
 /*
   perform a modify operation which sets the rDN and name attributes to
@@ -4784,7 +5003,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                 ldb_dn_get_linearized(conflict_dn)));
                        goto failed;
                }
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4809,7 +5030,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                        goto failed;
                }
 
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -5108,7 +5331,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -5411,7 +5634,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        if (rename_incoming_record) {
 
-               new_dn = replmd_conflict_dn(msg, msg->dn,
+               new_dn = replmd_conflict_dn(msg,
+                                           ldb_module_get_ctx(ar->module),
+                                           msg->dn,
                                            &ar->objs->objects[ar->index_current].object_guid);
                if (new_dn == NULL) {
                        ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
@@ -5447,7 +5672,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       new_dn = replmd_conflict_dn(tmp_ctx,
+                                   ldb_module_get_ctx(ar->module),
+                                   conflict_dn, &guid);
        if (new_dn == NULL) {
                DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
@@ -6694,43 +6921,96 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
 }
 
 /**
- * Returns True if the source and target DNs both have the same naming context,
- * i.e. they're both in the same partition.
+ * Checks how to handle an missing target - either we need to fail the
+ * replication and retry with GET_TGT, ignore the link and continue, or try to
+ * add a partial link to an unknown target.
  */
-static bool replmd_objects_have_same_nc(struct ldb_context *ldb,
-                                       TALLOC_CTX *mem_ctx,
-                                       struct ldb_dn *source_dn,
-                                       struct ldb_dn *target_dn)
+static int replmd_allow_missing_target(struct ldb_module *module,
+                                      TALLOC_CTX *mem_ctx,
+                                      struct ldb_dn *target_dn,
+                                      struct ldb_dn *source_dn,
+                                      bool is_obj_commit,
+                                      struct GUID *guid,
+                                      uint32_t dsdb_repl_flags,
+                                      bool *ignore_link,
+                                      const char * missing_str)
 {
-       TALLOC_CTX *tmp_ctx;
-       struct ldb_dn *source_nc;
-       struct ldb_dn *target_nc;
-       int ret;
-       bool same_nc = true;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       bool is_in_same_nc;
 
-       tmp_ctx = talloc_new(mem_ctx);
+       /*
+        * we may not be able to resolve link targets properly when
+        * dealing with subsets of objects, e.g. the source is a
+        * critical object and the target isn't
+        *
+        * TODO:
+        * When we implement Trusted Domains we need to consider
+        * whether they get treated as an incomplete replica here or not
+        */
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
 
-       ret = dsdb_find_nc_root(ldb, tmp_ctx, source_dn, &source_nc);
-       if (ret != LDB_SUCCESS) {
-               DBG_ERR("Failed to find base DN for source %s\n",
-                       ldb_dn_get_linearized(source_dn));
-               talloc_free(tmp_ctx);
-               return true;
+               /*
+                * Ignore the link. We don't increase the highwater-mark in
+                * the object subset cases, so subsequent replications should
+                * resolve any missing links
+                */
+               DEBUG(2, ("%s target %s linked from %s\n", missing_str,
+                         ldb_dn_get_linearized(target_dn),
+                         ldb_dn_get_linearized(source_dn)));
+               *ignore_link = true;
+               return LDB_SUCCESS;
        }
 
-       ret = dsdb_find_nc_root(ldb, tmp_ctx, target_dn, &target_nc);
-       if (ret != LDB_SUCCESS) {
-               DBG_ERR("Failed to find base DN for target %s\n",
-                       ldb_dn_get_linearized(target_dn));
-               talloc_free(tmp_ctx);
-               return true;
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
+
+               /*
+                * target should already be up-to-date so there's no point in
+                * retrying. This could be due to bad timing, or if a target
+                * on a one-way link was deleted. We ignore the link rather
+                * than failing the replication cycle completely
+                */
+               *ignore_link = true;
+               DBG_WARNING("%s is %s but up to date. Ignoring link from %s\n",
+                           ldb_dn_get_linearized(target_dn), missing_str,
+                           ldb_dn_get_linearized(source_dn));
+               return LDB_SUCCESS;
+       }
+       
+       is_in_same_nc = dsdb_objects_have_same_nc(ldb,
+                                                 mem_ctx,
+                                                 source_dn,
+                                                 target_dn);
+       if (is_in_same_nc) {
+               /* fail the replication and retry with GET_TGT */
+               ldb_asprintf_errstring(ldb, "%s target %s GUID %s linked from %s\n",
+                                      missing_str,
+                                      ldb_dn_get_linearized(target_dn),
+                                      GUID_string(mem_ctx, guid),
+                                      ldb_dn_get_linearized(source_dn));
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       same_nc = (ldb_dn_compare(source_nc, target_nc) == 0);
+       /*
+        * The target of the cross-partition link is missing. Continue
+        * and try to at least add the forward-link. This isn't great,
+        * but a partial link can be fixed by dbcheck, so it's better
+        * than dropping the link completely.
+        */
+       *ignore_link = false;
 
-       talloc_free(tmp_ctx);
+       if (is_obj_commit) {
 
-       return same_nc;
+               /*
+                * Only log this when we're actually committing the objects.
+                * This avoids spurious logs, i.e. if we're just verifying the
+                * received link during a join.
+                */
+               DBG_WARNING("%s cross-partition target %s linked from %s\n",
+                           missing_str, ldb_dn_get_linearized(target_dn),
+                           ldb_dn_get_linearized(source_dn));
+       }
+       
+       return LDB_SUCCESS;
 }
 
 /**
@@ -6743,6 +7023,7 @@ static int replmd_check_target_exists(struct ldb_module *module,
                                      struct dsdb_dn *dsdb_dn,
                                      struct la_entry *la_entry,
                                      struct ldb_dn *source_dn,
+                                     bool is_obj_commit,
                                      struct GUID *guid,
                                      bool *ignore_link)
 {
@@ -6815,46 +7096,14 @@ static int replmd_check_target_exists(struct ldb_module *module,
        if (target_res->count == 0) {
 
                /*
-                * we may not be able to resolve link targets properly when
-                * dealing with subsets of objects, e.g. the source is a
-                * critical object and the target isn't
-                *
-                * TODO:
-                * When we implement Trusted Domains we need to consider
-                * whether they get treated as an incomplete replica here or not
+                * target object is unknown. Check whether to ignore the link,
+                * fail the replication, or add a partial link
                 */
-               if (la_entry->dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
-
-                       /*
-                        * We don't increase the highwater-mark in the object
-                        * subset cases, so subsequent replications should
-                        * resolve any missing links
-                        */
-                       DEBUG(2,(__location__
-                                ": Failed to find target %s linked from %s\n",
-                                ldb_dn_get_linearized(dsdb_dn->dn),
-                                ldb_dn_get_linearized(source_dn)));
-                       *ignore_link = true;
-
-               } else if (replmd_objects_have_same_nc(ldb, tmp_ctx, source_dn,
-                                                      dsdb_dn->dn)) {
-                       ldb_asprintf_errstring(ldb, "Unknown target %s GUID %s linked from %s\n",
-                                              ldb_dn_get_linearized(dsdb_dn->dn),
-                                              GUID_string(tmp_ctx, guid),
-                                              ldb_dn_get_linearized(source_dn));
-                       ret = LDB_ERR_NO_SUCH_OBJECT;
-               } else {
+               ret = replmd_allow_missing_target(module, tmp_ctx, dsdb_dn->dn,
+                                                 source_dn, is_obj_commit, guid,
+                                                 la_entry->dsdb_repl_flags,
+                                                 ignore_link, "Unknown");
 
-                       /*
-                        * The target of the cross-partition link is missing.
-                        * Continue and try to at least add the forward-link.
-                        * This isn't great, but if we can add a partial link
-                        * then it's better than nothing.
-                        */
-                       DEBUG(2,("Failed to resolve cross-partition link between %s and %s\n",
-                                ldb_dn_get_linearized(source_dn),
-                                ldb_dn_get_linearized(dsdb_dn->dn)));
-               }
        } else if (target_res->count != 1) {
                ldb_asprintf_errstring(ldb, "More than one object found matching objectGUID %s\n",
                                       GUID_string(tmp_ctx, guid));
@@ -6877,26 +7126,15 @@ static int replmd_check_target_exists(struct ldb_module *module,
                 */
                if (target_deletion_state >= OBJECT_RECYCLED) {
 
-                       if (la_entry->dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
-
-                               /*
-                                * target should already be uptodate so there's no
-                                * point retrying - it's probably just bad timing
-                                */
-                               *ignore_link = true;
-                               DEBUG(0, ("%s is deleted but up to date. "
-                                         "Ignoring link from %s\n",
-                                         ldb_dn_get_linearized(dsdb_dn->dn),
-                                         ldb_dn_get_linearized(source_dn)));
-
-                       } else {
-                               ldb_asprintf_errstring(ldb,
-                                                      "Deleted target %s GUID %s linked from %s",
-                                                      ldb_dn_get_linearized(dsdb_dn->dn),
-                                                      GUID_string(tmp_ctx, guid),
-                                                      ldb_dn_get_linearized(source_dn));
-                               ret = LDB_ERR_NO_SUCH_OBJECT;
-                       }
+                       /*
+                        * target object is deleted. Check whether to ignore the
+                        * link, fail the replication, or add a partial link
+                        */
+                       ret = replmd_allow_missing_target(module, tmp_ctx,
+                                                         dsdb_dn->dn, source_dn,
+                                                         is_obj_commit, guid,
+                                                         la_entry->dsdb_repl_flags,
+                                                         ignore_link, "Deleted");
                }
        }
 
@@ -7052,8 +7290,18 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                return ret;
        }
 
-       ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
-                                        src_msg->dn, &guid, &dummy);
+       /*
+        * We can skip the target object checks if we're only syncing critical
+        * objects, or we know the target is up-to-date. If either case, we
+        * still continue even if the target doesn't exist
+        */
+       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
+                                                src_msg->dn, false, &guid,
+                                                &dummy);
+       }
 
        /*
         * When we fail to find the target object, the error code we pass
@@ -7068,6 +7316,236 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
        return ret;
 }
 
+/**
+ * Finds the current active Parsed-DN value for a single-valued linked
+ * attribute, if one exists.
+ * @param ret_pdn assigned the active Parsed-DN, or NULL if none was found
+ * @returns LDB_SUCCESS (regardless of whether a match was found), unless
+ * an error occurred
+ */
+static int replmd_get_active_singleval_link(struct ldb_module *module,
+                                           TALLOC_CTX *mem_ctx,
+                                           struct parsed_dn pdn_list[],
+                                           unsigned int count,
+                                           const struct dsdb_attribute *attr,
+                                           struct parsed_dn **ret_pdn)
+{
+       unsigned int i;
+
+       *ret_pdn = NULL;
+
+       if (!(attr->ldb_schema_attribute->flags & LDB_ATTR_FLAG_SINGLE_VALUE)) {
+
+               /* nothing to do for multi-valued linked attributes */
+               return LDB_SUCCESS;
+       }
+
+       for (i = 0; i < count; i++) {
+               int ret = LDB_SUCCESS;
+               struct parsed_dn *pdn = &pdn_list[i];
+
+               /* skip any inactive links */
+               if (dsdb_dn_is_deleted_val(pdn->v)) {
+                       continue;
+               }
+
+               /* we've found an active value for this attribute */
+               *ret_pdn = pdn;
+
+               if (pdn->dsdb_dn == NULL) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+                       ret = really_parse_trusted_dn(mem_ctx, ldb, pdn,
+                                                     attr->syntax->ldap_oid);
+               }
+
+               return ret;
+       }
+
+       /* no active link found */
+       return LDB_SUCCESS;
+}
+
+/**
+ * @returns true if the replication linked attribute info is newer than we
+ * already have in our DB
+ * @param pdn the existing linked attribute info in our DB
+ * @param la the new linked attribute info received during replication
+ */
+static bool replmd_link_update_is_newer(struct parsed_dn *pdn,
+                                       struct drsuapi_DsReplicaLinkedAttribute *la)
+{
+       /* see if this update is newer than what we have already */
+       struct GUID invocation_id = GUID_zero();
+       uint32_t version = 0;
+       NTTIME change_time = 0;
+
+       if (pdn == NULL) {
+
+               /* no existing info so update is newer */
+               return true;
+       }
+
+       dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
+       dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
+       dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
+
+       return replmd_update_is_newer(&invocation_id,
+                                     &la->meta_data.originating_invocation_id,
+                                     version,
+                                     la->meta_data.version,
+                                     change_time,
+                                     la->meta_data.originating_change_time);
+}
+
+/**
+ * Marks an existing linked attribute value as deleted in the DB
+ * @param pdn the parsed-DN of the target-value to delete
+ */
+static int replmd_delete_link_value(struct ldb_module *module,
+                                   struct replmd_private *replmd_private,
+                                   TALLOC_CTX *mem_ctx,
+                                   struct ldb_dn *src_obj_dn,
+                                   const struct dsdb_schema *schema,
+                                   const struct dsdb_attribute *attr,
+                                   uint64_t seq_num,
+                                   bool is_active,
+                                   struct GUID *target_guid,
+                                   struct dsdb_dn *target_dsdb_dn,
+                                   struct ldb_val *output_val)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       time_t t;
+       NTTIME now;
+       const struct GUID *invocation_id = NULL;
+       int ret;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (invocation_id == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if the existing link is active, remove its backlink */
+       if (is_active) {
+
+               ret = replmd_add_backlink(module, replmd_private, schema,
+                                         src_obj_dn, target_guid, false,
+                                         attr, NULL);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       /* mark the existing value as deleted */
+       ret = replmd_update_la_val(mem_ctx, output_val, target_dsdb_dn,
+                                  target_dsdb_dn, invocation_id, seq_num,
+                                  seq_num, now, true);
+       return ret;
+}
+
+/**
+ * Checks for a conflict in single-valued link attributes, and tries to
+ * resolve the problem if possible.
+ *
+ * Single-valued links should only ever have one active value. If we already
+ * have an active link value, and during replication we receive an active link
+ * value for a different target DN, then we need to resolve this inconsistency
+ * and determine which value should be active. If the received info is better/
+ * newer than the existing link attribute, then we need to set our existing
+ * link as deleted. If the received info is worse/older, then we should continue
+ * to add it, but set it as an inactive link.
+ *
+ * Note that this is a corner-case that is unlikely to happen (but if it does
+ * happen, we don't want it to break replication completely).
+ *
+ * @param pdn_being_modified the parsed DN corresponding to the received link
+ * target (note this is NULL if the link does not already exist in our DB)
+ * @param pdn_list all the source object's Parsed-DNs for this attribute, i.e.
+ * any existing active or inactive values for the attribute in our DB.
+ * @param dsdb_dn the target DN for the received link attribute
+ * @param add_as_inactive gets set to true if the received link is worse than
+ * the existing link - it should still be added, but as an inactive link.
+ */
+static int replmd_check_singleval_la_conflict(struct ldb_module *module,
+                                             struct replmd_private *replmd_private,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct ldb_dn *src_obj_dn,
+                                             struct drsuapi_DsReplicaLinkedAttribute *la,
+                                             struct dsdb_dn *dsdb_dn,
+                                             struct parsed_dn *pdn_being_modified,
+                                             struct parsed_dn *pdn_list,
+                                             struct ldb_message_element *old_el,
+                                             const struct dsdb_schema *schema,
+                                             const struct dsdb_attribute *attr,
+                                             uint64_t seq_num,
+                                             bool *add_as_inactive)
+{
+       struct parsed_dn *active_pdn = NULL;
+       bool update_is_newer = false;
+       int ret;
+
+       /*
+        * check if there's a conflict for single-valued links, i.e. an active
+        * linked attribute already exists, but it has a different target value
+        */
+       ret = replmd_get_active_singleval_link(module, mem_ctx, pdn_list,
+                                              old_el->num_values, attr,
+                                              &active_pdn);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * If no active value exists (or the received info is for the currently
+        * active value), then no conflict exists
+        */
+       if (active_pdn == NULL || active_pdn == pdn_being_modified) {
+               return LDB_SUCCESS;
+       }
+
+       DBG_WARNING("Link conflict for %s attribute on %s\n",
+                   attr->lDAPDisplayName, ldb_dn_get_linearized(src_obj_dn));
+
+       /* Work out how to resolve the conflict based on which info is better */
+       update_is_newer = replmd_link_update_is_newer(active_pdn, la);
+
+       if (update_is_newer) {
+               DBG_WARNING("Using received value %s, over existing target %s\n",
+                           ldb_dn_get_linearized(dsdb_dn->dn),
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn));
+
+               /*
+                * Delete our existing active link. The received info will then
+                * be added (through normal link processing) as the active value
+                */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              src_obj_dn, schema, attr,
+                                              seq_num, true, &active_pdn->guid,
+                                              active_pdn->dsdb_dn,
+                                              active_pdn->v);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       } else {
+               DBG_WARNING("Using existing target %s, over received value %s\n",
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn),
+                           ldb_dn_get_linearized(dsdb_dn->dn));
+
+               /*
+                * we want to keep our existing active link and add the
+                * received link as inactive
+                */
+               *add_as_inactive = true;
+       }
+
+       return LDB_SUCCESS;
+}
+
 /*
   process one linked attribute structure
  */
@@ -7092,6 +7570,9 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
        bool ignore_link;
        enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct dsdb_dn *old_dsdb_dn = NULL;
+       struct ldb_val *val_to_update = NULL;
+       bool add_as_inactive = false;
 
        /*
         * get the attribute being modified, the search result for the source object,
@@ -7141,7 +7622,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        ret = replmd_check_target_exists(module, dsdb_dn, la_entry, msg->dn,
-                                        &guid, &ignore_link);
+                                        true, &guid, &ignore_link);
 
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
@@ -7170,46 +7651,47 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                return ret;
        }
 
+       if (!replmd_link_update_is_newer(pdn, la)) {
+               DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
+                        old_el->name, ldb_dn_get_linearized(msg->dn),
+                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
+       }
 
-       if (pdn != NULL) {
-               /* see if this update is newer than what we have already */
-               struct GUID invocation_id = GUID_zero();
-               uint32_t version = 0;
-               uint32_t originating_usn = 0;
-               NTTIME change_time = 0;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
-
-               dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &originating_usn, "RMD_ORIGINATING_USN");
-               dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
-
-               if (!replmd_update_is_newer(&invocation_id,
-                                           &la->meta_data.originating_invocation_id,
-                                           version,
-                                           la->meta_data.version,
-                                           change_time,
-                                           la->meta_data.originating_change_time)) {
-                       DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
-                                old_el->name, ldb_dn_get_linearized(msg->dn),
-                                GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
+       /* get a seq_num for this change */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       /*
+        * check for single-valued link conflicts, i.e. an active linked
+        * attribute already exists, but it has a different target value
+        */
+       if (active) {
+               ret = replmd_check_singleval_la_conflict(module, replmd_private,
+                                                        tmp_ctx, msg->dn, la,
+                                                        dsdb_dn, pdn, pdn_list,
+                                                        old_el, schema, attr,
+                                                        seq_num,
+                                                        &add_as_inactive);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
+       }
+
+       if (pdn != NULL) {
+               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
                        ret = replmd_add_backlink(module, replmd_private,
                                                  schema, 
                                                  msg->dn,
-                                                 &guid, false, attr,
+                                                 &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
@@ -7217,37 +7699,12 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        }
                }
 
-               ret = replmd_update_la_val(tmp_ctx, pdn->v, dsdb_dn, pdn->dsdb_dn,
-                                          &la->meta_data.originating_invocation_id,
-                                          la->meta_data.originating_usn, seq_num,
-                                          la->meta_data.originating_change_time,
-                                          la->meta_data.version,
-                                          !active);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+               val_to_update = pdn->v;
+               old_dsdb_dn = pdn->dsdb_dn;
 
-               if (active) {
-                       /* add the new backlink */
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-               }
        } else {
                unsigned offset;
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+
                /*
                 * We know where the new one needs to be, from the *next
                 * pointer into pdn_list.
@@ -7283,27 +7740,46 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
 
                old_el->num_values++;
 
-               ret = replmd_build_la_val(tmp_ctx, &old_el->values[offset], dsdb_dn,
-                                         &la->meta_data.originating_invocation_id,
-                                         la->meta_data.originating_usn, seq_num,
-                                         la->meta_data.originating_change_time,
-                                         la->meta_data.version,
-                                         !active);
+               val_to_update = &old_el->values[offset];
+               old_dsdb_dn = NULL;
+       }
+
+       /* set the link attribute's value to the info that was received */
+       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+                               &la->meta_data.originating_invocation_id,
+                               la->meta_data.originating_usn, seq_num,
+                               la->meta_data.originating_change_time,
+                               la->meta_data.version,
+                               !active);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (add_as_inactive) {
+
+               /* Set the new link as inactive/deleted to avoid conflicts */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              msg->dn, schema, attr, seq_num,
+                                              false, &guid, dsdb_dn,
+                                              val_to_update);
+
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               if (active) {
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
+       } else if (active) {
+
+               /* if the new link is active, then add the new backlink */
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema,
+                                         msg->dn,
+                                         &guid, true, attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
                }
        }