repl_meta_data: fix linked attribute corruption on databases with unsorted links...
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 9edcc33b08abb0ac164a0c240fc7b98d6b03b85c..7646f942fcaec837e50d9a187bc7c7bf7020c625 100644 (file)
@@ -39,7 +39,9 @@
 #include "ldb_module.h"
 #include "dsdb/samdb/samdb.h"
 #include "dsdb/common/proto.h"
+#include "dsdb/common/util.h"
 #include "../libds/common/flags.h"
+#include "librpc/gen_ndr/irpc.h"
 #include "librpc/gen_ndr/ndr_misc.h"
 #include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/security/security.h"
 #include "lib/util/dlinklist.h"
 #include "dsdb/samdb/ldb_modules/util.h"
-#include "lib/util/binsearch.h"
 #include "lib/util/tsort.h"
 
+#undef DBGC_CLASS
+#define DBGC_CLASS            DBGC_DRS_REPL
+
+/* the RMD_VERSION for linked attributes starts from 1 */
+#define RMD_VERSION_INITIAL   1
+
 /*
  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
  * Deleted Objects Container
@@ -73,6 +80,7 @@ struct replmd_private {
 struct la_entry {
        struct la_entry *next, *prev;
        struct drsuapi_DsReplicaLinkedAttribute *la;
+       uint32_t dsdb_repl_flags;
 };
 
 struct replmd_replicated_request {
@@ -106,18 +114,25 @@ struct replmd_replicated_request {
        bool isDeleted;
 };
 
-struct parsed_dn {
-       struct dsdb_dn *dsdb_dn;
-       struct GUID guid;
-       struct ldb_val *v;
-};
-
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
 static int replmd_delete_internals(struct ldb_module *module, struct ldb_request *req, bool re_delete);
 static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      struct parsed_dn *dns, uint32_t count,
                                      struct ldb_message_element *el,
                                      const char *ldap_oid);
+static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
+                                         struct la_entry *la);
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted);
+
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid);
 
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
@@ -240,7 +255,6 @@ static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
        return false;
 }
 
-
 static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
 
 /*
@@ -377,8 +391,9 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
        ret = dsdb_module_dn_by_guid(module, frame, &bl->target_guid, &target_dn, parent);
        if (ret != LDB_SUCCESS) {
                struct GUID_txt_buf guid_str;
-               DEBUG(2,(__location__ ": WARNING: Failed to find target DN for linked attribute with GUID %s\n",
-                        GUID_buf_string(&bl->target_guid, &guid_str)));
+               DBG_WARNING("Failed to find target DN for linked attribute with GUID %s\n",
+                           GUID_buf_string(&bl->target_guid, &guid_str));
+               DBG_WARNING("Please run 'samba-tool dbcheck' to resolve any missing backlinks.\n");
                talloc_free(frame);
                return LDB_SUCCESS;
        }
@@ -562,9 +577,41 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               struct GUID_txt_buf guid_txt;
+               struct ldb_message *msg = NULL;
+               char *s = NULL;
+
+               if (ac->apply_mode == false) {
+                       DBG_NOTICE("Originating update failure. Error is: %s\n",
+                                  ldb_strerror(ares->error));
+                       return ldb_module_done(ac->req, controls,
+                                              ares->response, ares->error);
+               }
+
+               msg = ac->objs->objects[ac->index_current].msg;
+               /*
+                * Set at DBG_NOTICE as once these start to happe, they
+                * will happen a lot until resolved, due to repeated
+                * replication.  The caller will probably print the
+                * ldb error string anyway.
+                */
+               DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
+                          ldb_dn_get_linearized(msg->dn),
+                          ldb_strerror(ares->error));
+
+               s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
+                                                    ac,
+                                                    LDB_CHANGETYPE_ADD,
+                                                    msg);
+
+               DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
+                        ac->search_msg == NULL ? "ADD" : "MODIFY",
+                        GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
+                                        &guid_txt),
+                        s);
+               talloc_free(s);
                return ldb_module_done(ac->req, controls,
-                                       ares->response, ares->error);
+                                      ares->response, ares->error);
        }
 
        if (ares->type != LDB_REPLY_DONE) {
@@ -889,13 +936,19 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
 }
 
 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime);
+
+static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
 
 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                          struct ldb_message_element *el, struct parsed_dn **pdn,
                          const char *ldap_oid, struct ldb_request *parent);
 
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn);
+
 /*
   fix up linked attributes in replmd_add.
   This involves setting up the right meta-data in extended DN
@@ -917,9 +970,27 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
        /* We will take a reference to the schema in replmd_add_backlink */
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, NULL);
        struct ldb_val *new_values = NULL;
+       int ret;
+
+       if (dsdb_check_single_valued_link(sa, el) == LDB_SUCCESS) {
+               el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
+       } else {
+               ldb_asprintf_errstring(ldb,
+                                      "Attribute %s is single valued but "
+                                      "more than one value has been supplied",
+                                      el->name);
+               talloc_free(tmp_ctx);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
        
-       int ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
-                                sa->syntax->ldap_oid, parent);
+       ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
+                            sa->syntax->ldap_oid, parent);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       ret = check_parsed_dn_duplicates(module, el, pdn);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
@@ -936,7 +1007,7 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                struct parsed_dn *p = &pdn[i];
                ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
                                          &ac->our_invocation_id,
-                                         ac->seq_num, ac->seq_num, now, 0, false);
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -1373,6 +1444,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now,
                                      bool is_schema_nc,
+                                     bool is_forced_rodc,
                                      struct ldb_request *req)
 {
        uint32_t i;
@@ -1511,6 +1583,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1 = &omd->ctr.ctr1.array[i];
        md1->version++;
        md1->attid = attid;
+
        if (md1->attid == DRSUAPI_ATTID_isDeleted) {
                const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
                const char* rdn;
@@ -1537,6 +1610,11 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1->originating_usn           = *seq_num;
        md1->local_usn                 = *seq_num;
 
+       if (is_forced_rodc) {
+               /* Force version to 0 to be overriden later via replication */
+               md1->version = 0;
+       }
+
        return LDB_SUCCESS;
 }
 
@@ -1557,7 +1635,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
                                       struct replPropertyMetaDataBlob *omd,
                                       struct replmd_replicated_request *ar,
                                       NTTIME now,
-                                      bool is_schema_nc)
+                                      bool is_schema_nc,
+                                      bool is_forced_rodc)
 {
        const char *rdn_name = ldb_dn_get_rdn_name(msg->dn);
        const struct dsdb_attribute *rdn_attr =
@@ -1588,7 +1667,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
        return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
                                          omd, ar->schema, &ar->seq_num,
                                          &ar->our_invocation_id,
-                                         now, is_schema_nc, ar->req);
+                                         now, is_schema_nc, is_forced_rodc,
+                                         ar->req);
 
 }
 
@@ -1635,6 +1715,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
        bool rmd_is_provided;
        bool rmd_is_just_resorted = false;
        const char *not_rename_attrs[4 + msg->num_elements];
+       bool is_forced_rodc = false;
 
        if (rename_attrs) {
                attrs = rename_attrs;
@@ -1651,6 +1732,17 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
        ldb = ldb_module_get_ctx(module);
 
+       ret = samdb_rodc(ldb, rodc);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
+               *rodc = false;
+       }
+
+       if (*rodc &&
+           ldb_request_get_control(req, DSDB_CONTROL_FORCE_RODC_LOCAL_CHANGE)) {
+               is_forced_rodc = true;
+       }
+
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
                /* this happens during an initial vampire while
@@ -1787,6 +1879,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                                         &omd, schema, seq_num,
                                                         our_invocation_id,
                                                         now, is_schema_nc,
+                                                        is_forced_rodc,
                                                         req);
                        if (ret != LDB_SUCCESS) {
                                return ret;
@@ -1842,13 +1935,11 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
                /*if we are RODC and this is a DRSR update then its ok*/
                if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
-                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)) {
+                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)
+                   && !is_forced_rodc) {
                        unsigned instanceType;
 
-                       ret = samdb_rodc(ldb, rodc);
-                       if (ret != LDB_SUCCESS) {
-                               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
-                       } else if (*rodc) {
+                       if (*rodc) {
                                ldb_set_errstring(ldb, "RODC modify is forbidden!");
                                return LDB_ERR_REFERRAL;
                        }
@@ -1894,59 +1985,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
-struct compare_ctx {
-       struct GUID *guid;
-       struct ldb_context *ldb;
-       TALLOC_CTX *mem_ctx;
-       const char *ldap_oid;
-       int err;
-       const struct GUID *invocation_id;
-};
-
-/* When a parsed_dn comes from the database, sometimes it is not really parsed. */
-
-static int really_parse_trusted_dn(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
-                                  struct parsed_dn *pdn, const char *ldap_oid)
-{
-       NTSTATUS status;
-       struct dsdb_dn *dsdb_dn = dsdb_dn_parse_trusted(mem_ctx, ldb, pdn->v,
-                                                       ldap_oid);
-       if (dsdb_dn == NULL) {
-               return LDB_ERR_INVALID_DN_SYNTAX;
-       }
-
-       status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &pdn->guid, "GUID");
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       pdn->dsdb_dn = dsdb_dn;
-       return LDB_SUCCESS;
-}
-
-/*
- * We choose, as the sort order, the same order as is used in DRS replication,
- * which is the memcmp() order of the NDR GUID, not that obtained from
- * GUID_compare().
- *
- * This means that sorted links will be in the same order as a new DC would
- * see them.
- */
-static int ndr_guid_compare(struct GUID *guid1, struct GUID *guid2)
-{
-       uint8_t v1_data[16];
-       struct ldb_val v1 = data_blob_const(v1_data, sizeof(v1_data));
-       uint8_t v2_data[16];
-       struct ldb_val v2 = data_blob_const(v2_data, sizeof(v2_data));
-
-       /* This can't fail */
-       ndr_push_struct_into_fixed_blob(&v1, guid1,
-                                       (ndr_push_flags_fn_t)ndr_push_GUID);
-       /* This can't fail */
-       ndr_push_struct_into_fixed_blob(&v2, guid2,
-                                       (ndr_push_flags_fn_t)ndr_push_GUID);
-       return data_blob_cmp(&v1, &v2);
-}
-
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
 {
        int ret = ndr_guid_compare(&pdn1->guid, &pdn2->guid);
@@ -1957,126 +1995,6 @@ static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
        return ret;
 }
 
-static int la_guid_compare_with_trusted_dn(struct compare_ctx *ctx,
-                                          struct parsed_dn *p)
-{
-       /*
-        * This works like a standard compare function in its return values,
-        * but has an extra trick to deal with errors: zero is returned and
-        * ctx->err is set to the ldb error code.
-        *
-        * That is, if (as is expected in most cases) you get a non-zero
-        * result, you don't need to check for errors.
-        *
-        * We assume the second argument refers to a DN is from the database
-        * and has a GUID -- but this GUID might not have been parsed out yet.
-        */
-       if (p->dsdb_dn == NULL) {
-               int ret = really_parse_trusted_dn(ctx->mem_ctx, ctx->ldb, p,
-                                                 ctx->ldap_oid);
-               if (ret != LDB_SUCCESS) {
-                       ctx->err = ret;
-                       return 0;
-               }
-       }
-       return ndr_guid_compare(ctx->guid, &p->guid);
-}
-
-
-
-static int parsed_dn_find(struct ldb_context *ldb, struct parsed_dn *pdn,
-                         unsigned int count,
-                         struct GUID *guid,
-                         struct ldb_dn *target_dn,
-                         struct parsed_dn **exact,
-                         struct parsed_dn **next,
-                         const char *ldap_oid)
-{
-       unsigned int i;
-       struct compare_ctx ctx;
-       if (pdn == NULL) {
-               *exact = NULL;
-               *next = NULL;
-               return LDB_SUCCESS;
-       }
-
-       if (unlikely(GUID_all_zero(guid))) {
-               /*
-                * When updating a link using DRS, we sometimes get a NULL
-                * GUID when a forward link has been deleted and its GUID has
-                * for some reason been forgotten. The best we can do is try
-                * and match by DN via a linear search. Note that this
-                * probably only happens in the ADD case, in which we only
-                * allow modification of link if it is already deleted, so
-                * this seems very close to an elaborate NO-OP, but we are not
-                * quite prepared to declare it so.
-                *
-                * If the DN is not in our list, we have to add it to the
-                * beginning of the list, where it would naturally sort.
-                */
-               struct parsed_dn *p;
-               if (target_dn == NULL) {
-                       /* We don't know the target DN, so we can't search for DN */
-                       DEBUG(1, ("parsed_dn_find has a NULL GUID for a linked "
-                                 "attribute but we don't have a DN to compare "
-                                 "it with\n"));
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *exact = NULL;
-               *next = NULL;
-
-               DEBUG(3, ("parsed_dn_find has a NULL GUID for a link to DN "
-                         "%s; searching through links for it",
-                         ldb_dn_get_linearized(target_dn)));
-
-               for (i = 0; i < count; i++) {
-                       int cmp;
-                       p = &pdn[i];
-                       if (p->dsdb_dn == NULL) {
-                               int ret = really_parse_trusted_dn(pdn, ldb, p, ldap_oid);
-                               if (ret != LDB_SUCCESS) {
-                                       return LDB_ERR_OPERATIONS_ERROR;
-                               }
-                       }
-
-                       cmp = ldb_dn_compare(p->dsdb_dn->dn, target_dn);
-                       if (cmp == 0) {
-                               *exact = p;
-                               return LDB_SUCCESS;
-                       }
-               }
-               /*
-                * Here we have a null guid which doesn't match any existing
-                * link. This is a bit unexpected because null guids occur
-                * when a forward link has been deleted and we are replicating
-                * that deletion.
-                *
-                * The best thing to do is weep into the logs and add the
-                * offending link to the beginning of the list which is
-                * at least the correct sort position.
-                */
-               DEBUG(1, ("parsed_dn_find has been given a NULL GUID for a "
-                         "link to unknown DN %s\n",
-                         ldb_dn_get_linearized(target_dn)));
-               *next = pdn;
-               return LDB_SUCCESS;
-       }
-
-       ctx.guid = guid;
-       ctx.ldb = ldb;
-       ctx.mem_ctx = pdn;
-       ctx.ldap_oid = ldap_oid;
-       ctx.err = 0;
-
-       BINARY_ARRAY_SEARCH_GTE(pdn, count, &ctx, la_guid_compare_with_trusted_dn,
-                               *exact, *next);
-
-       if (ctx.err != 0) {
-               return ctx.err;
-       }
-       return LDB_SUCCESS;
-}
-
 /*
   get a series of message element values as an array of DNs and GUIDs
   the result is sorted by GUID
@@ -2220,6 +2138,37 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+/*
+   Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
+   otherwise an error code. For compatibility the error code differs depending
+   on whether or not the attribute is "member".
+
+   As always, the parsed_dn list is assumed to be sorted.
+ */
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn)
+{
+       unsigned int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       for (i = 1; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
+                       ldb_asprintf_errstring(ldb,
+                                              "Linked attribute %s has "
+                                              "multiple identical values",
+                                              el->name);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
+       }
+       return LDB_SUCCESS;
+}
+
 /*
   build a new extended DN, including all meta data fields
 
@@ -2231,85 +2180,20 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
   RMD_LOCAL_USN       = local_usn
   RMD_VERSION         = version
  */
-static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
+static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
+                              struct dsdb_dn *dsdb_dn,
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime)
 {
-       struct ldb_dn *dn = dsdb_dn->dn;
-       const char *tstring, *usn_string, *flags_string;
-       struct ldb_val tval;
-       struct ldb_val iid;
-       struct ldb_val usnv, local_usnv;
-       struct ldb_val vers, flagsv;
-       NTSTATUS status;
-       int ret;
-       const char *dnstring;
-       char *vstring;
-       uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
-
-       tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
-       if (!tstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       tval = data_blob_string_const(tstring);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       usnv = data_blob_string_const(usn_string);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       local_usnv = data_blob_string_const(usn_string);
-
-       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
-       if (!vstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       vers = data_blob_string_const(vstring);
-
-       status = GUID_to_ndr_blob(invocation_id, dn, &iid);
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
-       if (!flags_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       flagsv = data_blob_string_const(flags_string);
-
-       ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
-       if (ret != LDB_SUCCESS) return ret;
-
-       dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
-       if (dnstring == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       *v = data_blob_string_const(dnstring);
-
-       return LDB_SUCCESS;
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
+                                local_usn, local_usn, nttime,
+                                RMD_VERSION_INITIAL, false);
 }
 
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                                uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted);
+                               bool deleted);
 
 /*
   check if any links need upgrading from w2k format
@@ -2363,7 +2247,7 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                /* it's an old one that needs upgrading */
                ret = replmd_update_la_val(el->values, dns[i].v,
                                           dns[i].dsdb_dn, dns[i].dsdb_dn,
-                                          invocation_id, 1, 1, 0, 0, false);
+                                          invocation_id, 1, 1, 0, false);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -2385,14 +2269,14 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
 }
 
 /*
-  update an extended DN, including all meta data fields
+  Sets the value for a linked attribute, including all meta data fields
 
   see replmd_build_la_val for value names
  */
-static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted)
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
        const char *tstring, *usn_string, *flags_string;
@@ -2400,8 +2284,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        struct ldb_val iid;
        struct ldb_val usnv, local_usnv;
        struct ldb_val vers, flagsv;
-       const struct ldb_val *old_addtime;
-       uint32_t old_version;
+       const struct ldb_val *old_addtime = NULL;
        NTSTATUS status;
        int ret;
        const char *dnstring;
@@ -2441,7 +2324,10 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        if (ret != LDB_SUCCESS) return ret;
 
        /* get the ADDTIME from the original */
-       old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
+       if (old_dsdb_dn != NULL) {
+               old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
+                                                           "RMD_ADDTIME");
+       }
        if (old_addtime == NULL) {
                old_addtime = &tval;
        }
@@ -2466,12 +2352,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
        if (ret != LDB_SUCCESS) return ret;
 
-       /* increase the version by 1 */
-       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
-       if (NT_STATUS_IS_OK(status) && old_version >= version) {
-               version = old_version+1;
-       }
-       vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
+       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
        vers = data_blob_string_const(vstring);
        ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
        if (ret != LDB_SUCCESS) return ret;
@@ -2485,6 +2366,33 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        return LDB_SUCCESS;
 }
 
+/**
+ * Updates the value for a linked attribute, including all meta data fields
+ */
+static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                               bool deleted)
+{
+       uint32_t old_version;
+       uint32_t version = RMD_VERSION_INITIAL;
+       NTSTATUS status;
+
+       /*
+        * We're updating the linked attribute locally, so increase the version
+        * by 1 so that other DCs will see the change when it gets replicated out
+        */
+       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
+                                            "RMD_VERSION");
+
+       if (NT_STATUS_IS_OK(status)) {
+               version = old_version + 1;
+       }
+
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
+                                usn, local_usn, nttime, version, deleted);
+}
+
 /*
   handle adding a linked attribute
  */
@@ -2572,8 +2480,10 @@ static int replmd_modify_la_add(struct ldb_module *module,
                int err = parsed_dn_find(ldb, old_dns, old_num_values,
                                         &dns[i].guid,
                                         dns[i].dsdb_dn->dn,
+                                        dns[i].dsdb_dn->extra_part, 0,
                                         &exact, &next,
-                                        schema_attr->syntax->ldap_oid);
+                                        schema_attr->syntax->ldap_oid,
+                                        true);
                if (err != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return err;
@@ -2613,7 +2523,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
                                                   invocation_id, seq_num,
-                                                  seq_num, now, 0, false);
+                                                  seq_num, now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2669,8 +2579,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
                                          dns[i].dsdb_dn, invocation_id,
-                                         seq_num, seq_num,
-                                         now, 0, false);
+                                         seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2820,7 +2729,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
                                                   invocation_id, seq_num,
-                                                  seq_num, now, 0, true);
+                                                  seq_num, now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2842,8 +2751,10 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                ret = parsed_dn_find(ldb, old_dns, old_el->num_values,
                                     &p->guid,
                                     NULL,
+                                    p->dsdb_dn->extra_part, 0,
                                     &exact, &next,
-                                    schema_attr->syntax->ldap_oid);
+                                    schema_attr->syntax->ldap_oid,
+                                    true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2915,7 +2826,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
                                           invocation_id, seq_num, seq_num,
-                                          now, 0, true);
+                                          now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2933,11 +2844,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
@@ -3027,6 +2950,12 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, dns);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns,
                             ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
@@ -3084,7 +3013,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                           old_p->dsdb_dn,
                                                           invocation_id,
                                                           seq_num, seq_num,
-                                                          now, 0, true);
+                                                          now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
                                        return ret;
@@ -3118,7 +3047,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                   old_p->dsdb_dn,
                                                   invocation_id,
                                                   seq_num, seq_num,
-                                                  now, 0, false);
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3150,8 +3079,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
                                                  invocation_id,
-                                                 seq_num, seq_num,
-                                                 now, 0, false);
+                                                 seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3243,6 +3171,7 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i];
                struct ldb_message_element *old_el, *new_el;
+               unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
                        = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
                if (!schema_attr) {
@@ -3255,16 +3184,29 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
                        return LDB_ERR_UNWILLING_TO_PERFORM;
                }
                old_el = ldb_msg_find_element(old_msg, el->name);
-               switch (el->flags & LDB_FLAG_MOD_MASK) {
+               switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
                                                       schema, msg, el, old_el,
@@ -3296,13 +3238,16 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        ldb_asprintf_errstring(ldb,
                                               "Attribute %s is single valued but more than one value has been supplied",
                                               el->name);
-                       return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       /* Return codes as found on Windows 2012r2 */
+                       if (mod_type == LDB_FLAG_MOD_REPLACE) {
+                               return LDB_ERR_CONSTRAINT_VIOLATION;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
                } else {
                        el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
                }
 
-
-
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -3328,6 +3273,47 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
 }
 
 
+static int send_rodc_referral(struct ldb_request *req,
+                             struct ldb_context *ldb,
+                             struct ldb_dn *dn)
+{
+       char *referral = NULL;
+       struct loadparm_context *lp_ctx = NULL;
+       struct ldb_dn *fsmo_role_dn = NULL;
+       struct ldb_dn *role_owner_dn = NULL;
+       const char *domain = NULL;
+       WERROR werr;
+
+       lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
+                                struct loadparm_context);
+
+       werr = dsdb_get_fsmo_role_info(req, ldb, DREPL_PDC_MASTER,
+                                      &fsmo_role_dn, &role_owner_dn);
+
+       if (W_ERROR_IS_OK(werr)) {
+               struct ldb_dn *server_dn = ldb_dn_copy(req, role_owner_dn);
+               if (server_dn != NULL) {
+                       ldb_dn_remove_child_components(server_dn, 1);
+                       domain = samdb_dn_to_dnshostname(ldb, req,
+                                                        server_dn);
+               }
+       }
+
+       if (domain == NULL) {
+               domain = lpcfg_dnsdomain(lp_ctx);
+       }
+
+       referral = talloc_asprintf(req, "ldap://%s/%s",
+                                  domain,
+                                  ldb_dn_get_linearized(dn));
+       if (referral == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       return ldb_module_send_referral(req, referral);
+}
+
 
 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
@@ -3342,6 +3328,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3367,12 +3354,45 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
-       ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
 
-       guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
-       if (guid_el != NULL) {
-               ldb_set_errstring(ldb,
-                                 "replmd_modify: it's not allowed to change the objectGUID!");
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
+       ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
+
+       guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
+       if (guid_el != NULL) {
+               ldb_set_errstring(ldb,
+                                 "replmd_modify: it's not allowed to change the objectGUID!");
                return LDB_ERR_CONSTRAINT_VIOLATION;
        }
 
@@ -3400,19 +3420,10 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                                 msg, &ac->seq_num, t, is_schema_nc,
                                 &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(msg->dn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, msg->dn);
                talloc_free(ac);
                return ret;
+
        }
 
        if (ret != LDB_SUCCESS) {
@@ -3672,18 +3683,7 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
                                 msg, &ac->seq_num, t,
                                 is_schema_nc, &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct ldb_dn *olddn = ac->req->op.rename.olddn;
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(olddn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, ac->req->op.rename.olddn);
                talloc_free(ares);
                return ldb_module_done(req, NULL, NULL, ret);
        }
@@ -3817,7 +3817,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3848,8 +3849,10 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                }
 
                ret = parsed_dn_find(ldb, link_dns, link_el->num_values,
-                                    guid, dn, &p, &unused,
-                                    target_attr->syntax->ldap_oid);
+                                    guid, dn,
+                                    data_blob_null, 0,
+                                    &p, &unused,
+                                    target_attr->syntax->ldap_oid, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -3871,6 +3874,7 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                /* This needs to get the Binary DN, by first searching */
                dn_str = dsdb_dn_get_linearized(tmp_ctx,
                                                p->dsdb_dn);
+
                dn_val = data_blob_string_const(dn_str);
                el2->values = &dn_val;
                el2->num_values = 1;
@@ -3907,7 +3911,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
-       struct ldb_dn *old_dn, *new_dn;
+       struct ldb_dn *old_dn = NULL, *new_dn = NULL;
        const char *rdn_name;
        const struct ldb_val *rdn_value, *new_rdn_value;
        struct GUID guid;
@@ -4119,17 +4123,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
-               /* Add a formatted child */
-               retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                           rdn_name,
-                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                           GUID_string(tmp_ctx, &guid));
-               if (!retb) {
-                       ldb_asprintf_errstring(ldb, __location__
-                                              ": Unable to add a formatted child to dn: %s",
-                                              ldb_dn_get_linearized(new_dn));
+
+               ret = replmd_make_deleted_child_dn(tmp_ctx,
+                                                  ldb,
+                                                  new_dn,
+                                                  rdn_name, rdn_value,
+                                                  guid);
+
+               if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
                ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
@@ -4297,6 +4300,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
@@ -4310,7 +4314,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4323,11 +4336,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4563,14 +4581,104 @@ static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_
 }
 
 
+/*
+  form a DN for a deleted (DEL:) or conflict (CNF:) DN
+ */
+static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
+                                      struct ldb_context *ldb,
+                                      struct ldb_dn *dn,
+                                      const char *four_char_prefix,
+                                      const char *rdn_name,
+                                      const struct ldb_val *rdn_value,
+                                      struct GUID guid)
+{
+       struct ldb_val deleted_child_rdn_val;
+       struct GUID_txt_buf guid_str;
+       bool retb;
+
+       GUID_buf_string(&guid, &guid_str);
+
+       retb = ldb_dn_add_child_fmt(dn, "X=Y");
+       if (!retb) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
+       deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
+
+       /*
+        * sizeof(guid_str.buf) will always be longer than
+        * strlen(guid_str.buf) but we allocate using this and
+        * waste the trailing bytes to avoid scaring folks
+        * with memcpy() using strlen() below
+        */
+
+       deleted_child_rdn_val.data
+               = talloc_realloc(tmp_ctx, deleted_child_rdn_val.data,
+                                uint8_t,
+                                rdn_value->length + 5
+                                + sizeof(guid_str.buf));
+       if (!deleted_child_rdn_val.data) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.length =
+               rdn_value->length + 5
+               + strlen(guid_str.buf);
+
+       SMB_ASSERT(deleted_child_rdn_val.length <
+                  talloc_get_size(deleted_child_rdn_val.data));
+
+       /*
+        * talloc won't allocate more than 256MB so we can't
+        * overflow but just to be sure
+        */
+       if (deleted_child_rdn_val.length < rdn_value->length) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.data[rdn_value->length] = 0x0a;
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 1],
+              four_char_prefix, 4);
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 5],
+              guid_str.buf,
+              sizeof(guid_str.buf));
+
+       /* Now set the value into the RDN, without parsing it */
+       ldb_dn_set_component(dn, 0, rdn_name,
+                            deleted_child_rdn_val);
+
+       return LDB_SUCCESS;
+}
+
+
 /*
   form a conflict DN
  */
-static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn, struct GUID *guid)
+static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx,
+                                        struct ldb_context *ldb,
+                                        struct ldb_dn *dn,
+                                        struct GUID *guid)
 {
        const struct ldb_val *rdn_val;
        const char *rdn_name;
        struct ldb_dn *new_dn;
+       int ret;
 
        rdn_val = ldb_dn_get_rdn_val(dn);
        rdn_name = ldb_dn_get_rdn_name(dn);
@@ -4578,25 +4686,41 @@ static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn,
                return NULL;
        }
 
-       new_dn = ldb_dn_copy(mem_ctx, dn);
+       new_dn = ldb_dn_get_parent(mem_ctx, dn);
        if (!new_dn) {
                return NULL;
        }
 
-       if (!ldb_dn_remove_child_components(new_dn, 1)) {
-               return NULL;
-       }
-
-       if (!ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ACNF:%s",
-                                 rdn_name,
-                                 ldb_dn_escape_value(new_dn, *rdn_val),
-                                 GUID_string(new_dn, guid))) {
+       ret = replmd_make_prefix_child_dn(mem_ctx,
+                                         ldb, new_dn,
+                                         "CNF:",
+                                         rdn_name,
+                                         rdn_val,
+                                         *guid);
+       if (ret != LDB_SUCCESS) {
                return NULL;
        }
-
        return new_dn;
 }
 
+/*
+  form a deleted DN
+ */
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid)
+{
+       return replmd_make_prefix_child_dn(tmp_ctx,
+                                          ldb, dn,
+                                          "DEL:",
+                                          rdn_name,
+                                          rdn_value,
+                                          guid);
+}
+
 
 /*
   perform a modify operation which sets the rDN and name attributes to
@@ -4650,9 +4774,16 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       ret = dsdb_module_modify(ar->module, msg, DSDB_FLAG_OWN_MODULE, req);
+       /*
+        * We have to mark this as a replicated update otherwise
+        * schema_data may reject a rename in the schema partition
+        */
+
+       ret = dsdb_module_modify(ar->module, msg,
+                                DSDB_FLAG_OWN_MODULE|DSDB_FLAG_REPLICATED_UPDATE,
+                                req);
        if (ret != LDB_SUCCESS) {
-               DEBUG(0,(__location__ ": Failed to modify rDN/name of conflict DN '%s' - %s",
+               DEBUG(0,(__location__ ": Failed to modify rDN/name of DN being DRS renamed '%s' - %s",
                         ldb_dn_get_linearized(dn),
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                return ret;
@@ -4664,7 +4795,7 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
 
 failed:
        talloc_free(msg);
-       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of conflict DN '%s'",
+       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of DN being DRS renamed '%s'",
                 ldb_dn_get_linearized(dn)));
        return LDB_ERR_OPERATIONS_ERROR;
 }
@@ -4872,7 +5003,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                 ldb_dn_get_linearized(conflict_dn)));
                        goto failed;
                }
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4897,7 +5030,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                        goto failed;
                }
 
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -5072,16 +5207,22 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
-               DEBUG(4, ("DRS replication add message of %s:\n%s\n",
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_ADD,
+                                                          msg);
+               DEBUG(8, ("DRS replication add message of %s:\n%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+               DEBUG(4, ("DRS replication add DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
-
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
                                                     "isDeleted", false);
 
@@ -5093,7 +5234,8 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 
        rdn_val = ldb_dn_get_rdn_val(msg->dn);
        ret = replmd_update_rpmd_rdn_attr(ldb, msg, rdn_val, NULL,
-                                    md, ar, now, is_schema_nc);
+                                         md, ar, now, is_schema_nc,
+                                         false);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
                return replmd_replicated_request_error(ar, ret);
@@ -5189,7 +5331,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -5492,7 +5634,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        if (rename_incoming_record) {
 
-               new_dn = replmd_conflict_dn(msg, msg->dn,
+               new_dn = replmd_conflict_dn(msg,
+                                           ldb_module_get_ctx(ar->module),
+                                           msg->dn,
                                            &ar->objs->objects[ar->index_current].object_guid);
                if (new_dn == NULL) {
                        ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
@@ -5528,7 +5672,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       new_dn = replmd_conflict_dn(tmp_ctx,
+                                   ldb_module_get_ctx(ar->module),
+                                   conflict_dn, &guid);
        if (new_dn == NULL) {
                DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
@@ -5643,11 +5789,12 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(5)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(5, ("Initial DRS replication modify message of %s is:\n%s\n"
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY, msg);
+               DEBUG(8, ("Initial DRS replication modify message of %s is:\n%s\n"
                          "%s\n"
                          "%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
@@ -5661,8 +5808,15 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                                  "incoming replPropertyMetaData",
                                                  rmd)));
                talloc_free(s);
-       }
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
 
+               DEBUG(4, ("Initial DRS replication modify DN of %s is: %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
+       }
+               
        local_isDeleted = ldb_msg_find_attr_as_bool(ar->search_msg,
                                                    "isDeleted", false);
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
@@ -5823,7 +5977,8 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
 
        if (renamed) {
                ret = replmd_update_rpmd_rdn_attr(ldb, msg, new_rdn, old_rdn,
-                                                 &nmd, ar, now, is_schema_nc);
+                                                 &nmd, ar, now, is_schema_nc,
+                                                 false);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
                        return replmd_replicated_request_error(ar, ret);
@@ -5918,14 +6073,24 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(4, ("Final DRS replication modify message of %s:\n%s\n",
-                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
+               DEBUG(8, ("Final DRS replication modify message of %s:\n%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+
+               DEBUG(4, ("Final DRS replication modify DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
 
        ret = ldb_build_mod_req(&change_req,
@@ -6133,6 +6298,62 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
        return LDB_SUCCESS;
 }
 
+/**
+ * Stores the linked attributes received in the replication chunk - these get
+ * applied at the end of the transaction. We also check that each linked
+ * attribute is valid, i.e. source and target objects are known.
+ */
+static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
+{
+       int ret = LDB_SUCCESS;
+       uint32_t i;
+       struct ldb_module *module = ar->module;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct ldb_context *ldb;
+
+       ldb = ldb_module_get_ctx(module);
+
+       DEBUG(4,("linked_attributes_count=%u\n", ar->objs->linked_attributes_count));
+
+       /* save away the linked attributes for the end of the transaction */
+       for (i = 0; i < ar->objs->linked_attributes_count; i++) {
+               struct la_entry *la_entry;
+
+               if (replmd_private->la_ctx == NULL) {
+                       replmd_private->la_ctx = talloc_new(replmd_private);
+               }
+               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
+               if (la_entry == NULL) {
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
+               if (la_entry->la == NULL) {
+                       talloc_free(la_entry);
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               *la_entry->la = ar->objs->linked_attributes[i];
+               la_entry->dsdb_repl_flags = ar->objs->dsdb_repl_flags;
+
+               /* we need to steal the non-scalars so they stay
+                  around until the end of the transaction */
+               talloc_steal(la_entry->la, la_entry->la->identifier);
+               talloc_steal(la_entry->la, la_entry->la->value.blob);
+
+               ret = replmd_verify_linked_attribute(ar, la_entry);
+
+               if (ret != LDB_SUCCESS) {
+                       break;
+               }
+
+               DLIST_ADD(replmd_private->la_list, la_entry);
+       }
+
+       return ret;
+}
+
 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
 
 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
@@ -6149,7 +6370,18 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        struct GUID_txt_buf guid_str_buf;
 
        if (ar->index_current >= ar->objs->num_objects) {
-               /* done with it, go to next stage */
+
+               /*
+                * Now that we've applied all the objects, check the new linked
+                * attributes and store them (we apply them in .prepare_commit)
+                */
+               ret = replmd_store_linked_attributes(ar);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+
+               /* done applying objects, move on to the next stage */
                return replmd_replicated_uptodate_vector(ar);
        }
 
@@ -6521,7 +6753,9 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
        nrf_el->flags = LDB_FLAG_MOD_REPLACE;
 
        if (CHECK_DEBUGLVL(4)) {
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
                DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
                talloc_free(s);
        }
@@ -6627,9 +6861,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        struct replmd_replicated_request *ar;
        struct ldb_control **ctrls;
        int ret;
-       uint32_t i;
-       struct replmd_private *replmd_private =
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
        ldb = ldb_module_get_ctx(module);
 
@@ -6686,90 +6917,272 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        ar->controls = req->controls;
        req->controls = ctrls;
 
-       DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
+       return replmd_replicated_apply_next(ar);
+}
 
-       /* save away the linked attributes for the end of the
-          transaction */
-       for (i=0; i<ar->objs->linked_attributes_count; i++) {
-               struct la_entry *la_entry;
+/**
+ * Checks how to handle an missing target - either we need to fail the
+ * replication and retry with GET_TGT, ignore the link and continue, or try to
+ * add a partial link to an unknown target.
+ */
+static int replmd_allow_missing_target(struct ldb_module *module,
+                                      TALLOC_CTX *mem_ctx,
+                                      struct ldb_dn *target_dn,
+                                      struct ldb_dn *source_dn,
+                                      bool is_obj_commit,
+                                      struct GUID *guid,
+                                      uint32_t dsdb_repl_flags,
+                                      bool *ignore_link,
+                                      const char * missing_str)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       bool is_in_same_nc;
 
-               if (replmd_private->la_ctx == NULL) {
-                       replmd_private->la_ctx = talloc_new(replmd_private);
-               }
-               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
-               if (la_entry == NULL) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
-               if (la_entry->la == NULL) {
-                       talloc_free(la_entry);
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *la_entry->la = ar->objs->linked_attributes[i];
+       /*
+        * we may not be able to resolve link targets properly when
+        * dealing with subsets of objects, e.g. the source is a
+        * critical object and the target isn't
+        *
+        * TODO:
+        * When we implement Trusted Domains we need to consider
+        * whether they get treated as an incomplete replica here or not
+        */
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
 
-               /* we need to steal the non-scalars so they stay
-                  around until the end of the transaction */
-               talloc_steal(la_entry->la, la_entry->la->identifier);
-               talloc_steal(la_entry->la, la_entry->la->value.blob);
+               /*
+                * Ignore the link. We don't increase the highwater-mark in
+                * the object subset cases, so subsequent replications should
+                * resolve any missing links
+                */
+               DEBUG(2, ("%s target %s linked from %s\n", missing_str,
+                         ldb_dn_get_linearized(target_dn),
+                         ldb_dn_get_linearized(source_dn)));
+               *ignore_link = true;
+               return LDB_SUCCESS;
+       }
 
-               DLIST_ADD(replmd_private->la_list, la_entry);
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
+
+               /*
+                * target should already be up-to-date so there's no point in
+                * retrying. This could be due to bad timing, or if a target
+                * on a one-way link was deleted. We ignore the link rather
+                * than failing the replication cycle completely
+                */
+               *ignore_link = true;
+               DBG_WARNING("%s is %s but up to date. Ignoring link from %s\n",
+                           ldb_dn_get_linearized(target_dn), missing_str,
+                           ldb_dn_get_linearized(source_dn));
+               return LDB_SUCCESS;
+       }
+       
+       is_in_same_nc = dsdb_objects_have_same_nc(ldb,
+                                                 mem_ctx,
+                                                 source_dn,
+                                                 target_dn);
+       if (is_in_same_nc) {
+               /* fail the replication and retry with GET_TGT */
+               ldb_asprintf_errstring(ldb, "%s target %s GUID %s linked from %s\n",
+                                      missing_str,
+                                      ldb_dn_get_linearized(target_dn),
+                                      GUID_string(mem_ctx, guid),
+                                      ldb_dn_get_linearized(source_dn));
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       return replmd_replicated_apply_next(ar);
+       /*
+        * The target of the cross-partition link is missing. Continue
+        * and try to at least add the forward-link. This isn't great,
+        * but a partial link can be fixed by dbcheck, so it's better
+        * than dropping the link completely.
+        */
+       *ignore_link = false;
+
+       if (is_obj_commit) {
+
+               /*
+                * Only log this when we're actually committing the objects.
+                * This avoids spurious logs, i.e. if we're just verifying the
+                * received link during a join.
+                */
+               DBG_WARNING("%s cross-partition target %s linked from %s\n",
+                           missing_str, ldb_dn_get_linearized(target_dn),
+                           ldb_dn_get_linearized(source_dn));
+       }
+       
+       return LDB_SUCCESS;
 }
 
-/*
-  process one linked attribute structure
+/**
+ * Checks that the target object for a linked attribute exists.
+ * @param guid returns the target object's GUID (is returned)if it exists)
+ * @param ignore_link set to true if the linked attribute should be ignored
+ * (i.e. the target doesn't exist, but that it's OK to skip the link)
  */
-static int replmd_process_linked_attribute(struct ldb_module *module,
-                                          struct replmd_private *replmd_private,
-                                          struct la_entry *la_entry,
-                                          struct ldb_request *parent)
+static int replmd_check_target_exists(struct ldb_module *module,
+                                     struct dsdb_dn *dsdb_dn,
+                                     struct la_entry *la_entry,
+                                     struct ldb_dn *source_dn,
+                                     bool is_obj_commit,
+                                     struct GUID *guid,
+                                     bool *ignore_link)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       struct ldb_message *target_msg = NULL;
-       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
-       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
-       int ret;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *dsdb_dn;
-       uint64_t seq_num = 0;
-       struct ldb_message_element *old_el;
-       WERROR status;
-       time_t t = time(NULL);
-       struct ldb_result *res;
        struct ldb_result *target_res;
-       const char *attrs[4];
-       const char *attrs2[] = { "isDeleted", "isRecycled", NULL };
-       struct parsed_dn *pdn_list, *pdn, *next;
-       struct GUID guid = GUID_zero();
+       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
+       const char *attrs[] = { "isDeleted", "isRecycled", NULL };
        NTSTATUS ntstatus;
-       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
+       int ret;
+       enum deletion_state target_deletion_state = OBJECT_REMOVED;
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) ? true : false;
 
-       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
-       enum deletion_state target_deletion_state = OBJECT_NOT_DELETED;
+       *ignore_link = false;
+       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, guid, "GUID");
 
-/*
-linked_attributes[0]:
-     &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute
-        identifier               : *
-            identifier: struct drsuapi_DsReplicaObjectIdentifier
-                __ndr_size               : 0x0000003a (58)
-                __ndr_size_sid           : 0x00000000 (0)
-                guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
-                sid                      : S-0-0
-                __ndr_size_dn            : 0x00000000 (0)
-                dn                       : ''
-        attid                    : DRSUAPI_ATTID_member (0x1F)
-        value: struct drsuapi_DsAttributeValue
-            __ndr_size               : 0x0000007e (126)
-            blob                     : *
-                blob                     : DATA_BLOB length=126
-        flags                    : 0x00000001 (1)
+       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+
+               /*
+                * This strange behaviour (allowing a NULL/missing
+                * GUID) originally comes from:
+                *
+                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
+                * Author: Andrew Tridgell <tridge@samba.org>
+                * Date:   Mon Dec 21 21:21:55 2009 +1100
+                *
+                *  s4-drs: cope better with NULL GUIDS from DRS
+                *
+                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
+                *  need to match by DN if possible when seeing if we should update an
+                *  existing link.
+                *
+                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                */
+               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
+                                           dsdb_dn->dn, attrs,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_RECYCLED |
+                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                           NULL);
+       } else if (!NT_STATUS_IS_OK(ntstatus)) {
+               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute 0x%x blob for %s from %s",
+                                      la->attid,
+                                      ldb_dn_get_linearized(dsdb_dn->dn),
+                                      ldb_dn_get_linearized(source_dn));
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               ret = dsdb_module_search(module, tmp_ctx, &target_res,
+                                        NULL, LDB_SCOPE_SUBTREE,
+                                        attrs,
+                                        DSDB_FLAG_NEXT_MODULE |
+                                        DSDB_SEARCH_SHOW_RECYCLED |
+                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                        NULL,
+                                        "objectGUID=%s",
+                                        GUID_string(tmp_ctx, guid));
+       }
+
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "Failed to re-resolve GUID %s: %s\n",
+                                      GUID_string(tmp_ctx, guid),
+                                      ldb_errstring(ldb));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (target_res->count == 0) {
+
+               /*
+                * target object is unknown. Check whether to ignore the link,
+                * fail the replication, or add a partial link
+                */
+               ret = replmd_allow_missing_target(module, tmp_ctx, dsdb_dn->dn,
+                                                 source_dn, is_obj_commit, guid,
+                                                 la_entry->dsdb_repl_flags,
+                                                 ignore_link, "Unknown");
+
+       } else if (target_res->count != 1) {
+               ldb_asprintf_errstring(ldb, "More than one object found matching objectGUID %s\n",
+                                      GUID_string(tmp_ctx, guid));
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               struct ldb_message *target_msg = target_res->msgs[0];
+
+               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
+
+               /* Get the object's state (i.e. Not Deleted, Tombstone, etc) */
+               replmd_deletion_state(module, target_msg,
+                                     &target_deletion_state, NULL);
+
+               /*
+                * Check for deleted objects as per MS-DRSR 4.1.10.6.14
+                * ProcessLinkValue(). Link updates should not be sent for
+                * recycled and tombstone objects (deleting the links should
+                * happen when we delete the object). This probably means our
+                * copy of the target object isn't up to date.
+                */
+               if (target_deletion_state >= OBJECT_RECYCLED) {
+
+                       /*
+                        * target object is deleted. Check whether to ignore the
+                        * link, fail the replication, or add a partial link
+                        */
+                       ret = replmd_allow_missing_target(module, tmp_ctx,
+                                                         dsdb_dn->dn, source_dn,
+                                                         is_obj_commit, guid,
+                                                         la_entry->dsdb_repl_flags,
+                                                         ignore_link, "Deleted");
+               }
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
+/**
+ * Extracts the key details about the source/target object for a
+ * linked-attribute entry.
+ * This returns the following details:
+ * @param ret_attr the schema details for the linked attribute
+ * @param source_msg the search result for the source object
+ * @param target_dsdb_dn the unpacked DN info for the target object
+ */
+static int replmd_extract_la_entry_details(struct ldb_module *module,
+                                          struct la_entry *la_entry,
+                                          TALLOC_CTX *mem_ctx,
+                                          const struct dsdb_attribute **ret_attr,
+                                          struct ldb_message **source_msg,
+                                          struct dsdb_dn **target_dsdb_dn)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
+       int ret;
+       const struct dsdb_attribute *attr;
+       WERROR status;
+       struct ldb_result *res;
+       const char *attrs[4];
+
+/*
+linked_attributes[0]:
+     &objs->linked_attributes[i]: struct drsuapi_DsReplicaLinkedAttribute
+        identifier               : *
+            identifier: struct drsuapi_DsReplicaObjectIdentifier
+                __ndr_size               : 0x0000003a (58)
+                __ndr_size_sid           : 0x00000000 (0)
+                guid                     : 8e95b6a9-13dd-4158-89db-3220a5be5cc7
+                sid                      : S-0-0
+                __ndr_size_dn            : 0x00000000 (0)
+                dn                       : ''
+        attid                    : DRSUAPI_ATTID_member (0x1F)
+        value: struct drsuapi_DsAttributeValue
+            __ndr_size               : 0x0000007e (126)
+            blob                     : *
+                blob                     : DATA_BLOB length=126
+        flags                    : 0x00000001 (1)
                1: DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
         originating_add_time     : Wed Sep  2 22:20:01 2009 EST
         meta_data: struct drsuapi_DsReplicaMetaData
@@ -6796,7 +7209,6 @@ linked_attributes[0]:
                                       la->attid,
                                       GUID_buf_string(&la->identifier->guid,
                                                       &guid_str));
-               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -6805,37 +7217,382 @@ linked_attributes[0]:
        attrs[2] = "isRecycled";
        attrs[3] = NULL;
 
-       /* get the existing message from the db for the object with
-          this GUID, returning attribute being modified. We will then
-          use this msg as the basis for a modify call */
-       ret = dsdb_module_search(module, tmp_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
+       /*
+        * get the existing message from the db for the object with
+        * this GUID, returning attribute being modified. We will then
+        * use this msg as the basis for a modify call
+        */
+       ret = dsdb_module_search(module, mem_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
                                 DSDB_FLAG_NEXT_MODULE |
                                 DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
                                 DSDB_SEARCH_SHOW_RECYCLED |
                                 DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
                                 DSDB_SEARCH_REVEAL_INTERNALS,
-                                parent,
-                                "objectGUID=%s", GUID_string(tmp_ctx, &la->identifier->guid));
+                                NULL,
+                                "objectGUID=%s", GUID_string(mem_ctx, &la->identifier->guid));
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
        if (res->count != 1) {
                ldb_asprintf_errstring(ldb, "DRS linked attribute for GUID %s - DN not found",
-                                      GUID_string(tmp_ctx, &la->identifier->guid));
-               talloc_free(tmp_ctx);
+                                      GUID_string(mem_ctx, &la->identifier->guid));
                return LDB_ERR_NO_SUCH_OBJECT;
        }
-       msg = res->msgs[0];
+
+       *source_msg = res->msgs[0];
+
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx, la->value.blob, target_dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(res->msgs[0]->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       *ret_attr = attr;
+
+       return LDB_SUCCESS;
+}
+
+/**
+ * Verifies the source and target objects are known for a linked attribute
+ */
+static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
+                                         struct la_entry *la)
+{
+       int ret = LDB_SUCCESS;
+       TALLOC_CTX *tmp_ctx = talloc_new(la);
+       struct ldb_module *module = ar->module;
+       struct ldb_message *src_msg;
+       const struct dsdb_attribute *attr;
+       struct dsdb_dn *tgt_dsdb_dn;
+       struct GUID guid = GUID_zero();
+       bool dummy;
+
+       ret = replmd_extract_la_entry_details(module, la, tmp_ctx, &attr,
+                                             &src_msg, &tgt_dsdb_dn);
 
        /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
+        * When we fail to find the source object, the error code we pass
+        * back here is really important. It flags back to the callers to
+        * retry this request with DRSUAPI_DRS_GET_ANC. This case should
+        * never happen if we're replicating from a Samba DC, but it is
+        * needed to talk to a Windows DC
+        */
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_MISSING_PARENT);
+       }
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       /*
+        * We can skip the target object checks if we're only syncing critical
+        * objects, or we know the target is up-to-date. If either case, we
+        * still continue even if the target doesn't exist
+        */
+       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
+                                                src_msg->dn, false, &guid,
+                                                &dummy);
+       }
+
+       /*
+        * When we fail to find the target object, the error code we pass
+        * back here is really important. It flags back to the callers to
+        * retry this request with DRSUAPI_DRS_GET_TGT
+        */
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_RECYCLED_TARGET);
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
+/**
+ * Finds the current active Parsed-DN value for a single-valued linked
+ * attribute, if one exists.
+ * @param ret_pdn assigned the active Parsed-DN, or NULL if none was found
+ * @returns LDB_SUCCESS (regardless of whether a match was found), unless
+ * an error occurred
+ */
+static int replmd_get_active_singleval_link(struct ldb_module *module,
+                                           TALLOC_CTX *mem_ctx,
+                                           struct parsed_dn pdn_list[],
+                                           unsigned int count,
+                                           const struct dsdb_attribute *attr,
+                                           struct parsed_dn **ret_pdn)
+{
+       unsigned int i;
+
+       *ret_pdn = NULL;
+
+       if (!(attr->ldb_schema_attribute->flags & LDB_ATTR_FLAG_SINGLE_VALUE)) {
+
+               /* nothing to do for multi-valued linked attributes */
+               return LDB_SUCCESS;
+       }
+
+       for (i = 0; i < count; i++) {
+               int ret = LDB_SUCCESS;
+               struct parsed_dn *pdn = &pdn_list[i];
+
+               /* skip any inactive links */
+               if (dsdb_dn_is_deleted_val(pdn->v)) {
+                       continue;
+               }
+
+               /* we've found an active value for this attribute */
+               *ret_pdn = pdn;
+
+               if (pdn->dsdb_dn == NULL) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+                       ret = really_parse_trusted_dn(mem_ctx, ldb, pdn,
+                                                     attr->syntax->ldap_oid);
+               }
+
+               return ret;
+       }
+
+       /* no active link found */
+       return LDB_SUCCESS;
+}
+
+/**
+ * @returns true if the replication linked attribute info is newer than we
+ * already have in our DB
+ * @param pdn the existing linked attribute info in our DB
+ * @param la the new linked attribute info received during replication
+ */
+static bool replmd_link_update_is_newer(struct parsed_dn *pdn,
+                                       struct drsuapi_DsReplicaLinkedAttribute *la)
+{
+       /* see if this update is newer than what we have already */
+       struct GUID invocation_id = GUID_zero();
+       uint32_t version = 0;
+       NTTIME change_time = 0;
+
+       if (pdn == NULL) {
+
+               /* no existing info so update is newer */
+               return true;
+       }
+
+       dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
+       dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
+       dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
+
+       return replmd_update_is_newer(&invocation_id,
+                                     &la->meta_data.originating_invocation_id,
+                                     version,
+                                     la->meta_data.version,
+                                     change_time,
+                                     la->meta_data.originating_change_time);
+}
+
+/**
+ * Marks an existing linked attribute value as deleted in the DB
+ * @param pdn the parsed-DN of the target-value to delete
+ */
+static int replmd_delete_link_value(struct ldb_module *module,
+                                   struct replmd_private *replmd_private,
+                                   TALLOC_CTX *mem_ctx,
+                                   struct ldb_dn *src_obj_dn,
+                                   const struct dsdb_schema *schema,
+                                   const struct dsdb_attribute *attr,
+                                   uint64_t seq_num,
+                                   bool is_active,
+                                   struct GUID *target_guid,
+                                   struct dsdb_dn *target_dsdb_dn,
+                                   struct ldb_val *output_val)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       time_t t;
+       NTTIME now;
+       const struct GUID *invocation_id = NULL;
+       int ret;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (invocation_id == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if the existing link is active, remove its backlink */
+       if (is_active) {
+
+               ret = replmd_add_backlink(module, replmd_private, schema,
+                                         src_obj_dn, target_guid, false,
+                                         attr, NULL);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       }
+
+       /* mark the existing value as deleted */
+       ret = replmd_update_la_val(mem_ctx, output_val, target_dsdb_dn,
+                                  target_dsdb_dn, invocation_id, seq_num,
+                                  seq_num, now, true);
+       return ret;
+}
+
+/**
+ * Checks for a conflict in single-valued link attributes, and tries to
+ * resolve the problem if possible.
+ *
+ * Single-valued links should only ever have one active value. If we already
+ * have an active link value, and during replication we receive an active link
+ * value for a different target DN, then we need to resolve this inconsistency
+ * and determine which value should be active. If the received info is better/
+ * newer than the existing link attribute, then we need to set our existing
+ * link as deleted. If the received info is worse/older, then we should continue
+ * to add it, but set it as an inactive link.
+ *
+ * Note that this is a corner-case that is unlikely to happen (but if it does
+ * happen, we don't want it to break replication completely).
+ *
+ * @param pdn_being_modified the parsed DN corresponding to the received link
+ * target (note this is NULL if the link does not already exist in our DB)
+ * @param pdn_list all the source object's Parsed-DNs for this attribute, i.e.
+ * any existing active or inactive values for the attribute in our DB.
+ * @param dsdb_dn the target DN for the received link attribute
+ * @param add_as_inactive gets set to true if the received link is worse than
+ * the existing link - it should still be added, but as an inactive link.
+ */
+static int replmd_check_singleval_la_conflict(struct ldb_module *module,
+                                             struct replmd_private *replmd_private,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct ldb_dn *src_obj_dn,
+                                             struct drsuapi_DsReplicaLinkedAttribute *la,
+                                             struct dsdb_dn *dsdb_dn,
+                                             struct parsed_dn *pdn_being_modified,
+                                             struct parsed_dn *pdn_list,
+                                             struct ldb_message_element *old_el,
+                                             const struct dsdb_schema *schema,
+                                             const struct dsdb_attribute *attr,
+                                             uint64_t seq_num,
+                                             bool *add_as_inactive)
+{
+       struct parsed_dn *active_pdn = NULL;
+       bool update_is_newer = false;
+       int ret;
+
+       /*
+        * check if there's a conflict for single-valued links, i.e. an active
+        * linked attribute already exists, but it has a different target value
+        */
+       ret = replmd_get_active_singleval_link(module, mem_ctx, pdn_list,
+                                              old_el->num_values, attr,
+                                              &active_pdn);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * If no active value exists (or the received info is for the currently
+        * active value), then no conflict exists
+        */
+       if (active_pdn == NULL || active_pdn == pdn_being_modified) {
+               return LDB_SUCCESS;
+       }
+
+       DBG_WARNING("Link conflict for %s attribute on %s\n",
+                   attr->lDAPDisplayName, ldb_dn_get_linearized(src_obj_dn));
+
+       /* Work out how to resolve the conflict based on which info is better */
+       update_is_newer = replmd_link_update_is_newer(active_pdn, la);
+
+       if (update_is_newer) {
+               DBG_WARNING("Using received value %s, over existing target %s\n",
+                           ldb_dn_get_linearized(dsdb_dn->dn),
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn));
+
+               /*
+                * Delete our existing active link. The received info will then
+                * be added (through normal link processing) as the active value
+                */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              src_obj_dn, schema, attr,
+                                              seq_num, true, &active_pdn->guid,
+                                              active_pdn->dsdb_dn,
+                                              active_pdn->v);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       } else {
+               DBG_WARNING("Using existing target %s, over received value %s\n",
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn),
+                           ldb_dn_get_linearized(dsdb_dn->dn));
+
+               /*
+                * we want to keep our existing active link and add the
+                * received link as inactive
+                */
+               *add_as_inactive = true;
+       }
+
+       return LDB_SUCCESS;
+}
+
+/*
+  process one linked attribute structure
+ */
+static int replmd_process_linked_attribute(struct ldb_module *module,
+                                          struct replmd_private *replmd_private,
+                                          struct la_entry *la_entry,
+                                          struct ldb_request *parent)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct ldb_message *msg;
+       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       int ret;
+       const struct dsdb_attribute *attr;
+       struct dsdb_dn *dsdb_dn;
+       uint64_t seq_num = 0;
+       struct ldb_message_element *old_el;
+       time_t t = time(NULL);
+       struct parsed_dn *pdn_list, *pdn, *next;
+       struct GUID guid = GUID_zero();
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
+       bool ignore_link;
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct dsdb_dn *old_dsdb_dn = NULL;
+       struct ldb_val *val_to_update = NULL;
+       bool add_as_inactive = false;
+
+       /*
+        * get the attribute being modified, the search result for the source object,
+        * and the target object's DN details
+        */
+       ret = replmd_extract_la_entry_details(module, la_entry, tmp_ctx, &attr,
+                                             &msg, &dsdb_dn);
+
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.14
         * ProcessLinkValue, because link updates are not applied to
         * recycled and tombstone objects.  We don't have to delete
         * any existing link, that should have happened when the
         * object deletion was replicated or initiated.
         */
-
        replmd_deletion_state(module, msg, &deletion_state, NULL);
 
        if (deletion_state >= OBJECT_RECYCLED) {
@@ -6864,148 +7621,77 @@ linked_attributes[0]:
                return ret;
        }
 
-       status = dsdb_dn_la_from_blob(ldb, attr, schema, tmp_ctx, la->value.blob, &dsdb_dn);
-       if (!W_ERROR_IS_OK(status)) {
-               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
-                                      old_el->name, ldb_dn_get_linearized(msg->dn), win_errstr(status));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid, "GUID");
-       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
-               /*
-                * This strange behaviour (allowing a NULL/missing
-                * GUID) originally comes from:
-                *
-                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
-                * Author: Andrew Tridgell <tridge@samba.org>
-                * Date:   Mon Dec 21 21:21:55 2009 +1100
-                *
-                *  s4-drs: cope better with NULL GUIDS from DRS
-                *
-                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
-                *  need to match by DN if possible when seeing if we should update an
-                *  existing link.
-                *
-                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
-                */
-
-               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
-                                           dsdb_dn->dn, attrs2,
-                                           DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_RECYCLED |
-                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                           parent);
-       } else if (!NT_STATUS_IS_OK(ntstatus)) {
-               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute blob for %s on %s from %s",
-                                      old_el->name,
-                                      ldb_dn_get_linearized(dsdb_dn->dn),
-                                      ldb_dn_get_linearized(msg->dn));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       } else {
-               ret = dsdb_module_search(module, tmp_ctx, &target_res,
-                                        NULL, LDB_SCOPE_SUBTREE,
-                                        attrs2,
-                                        DSDB_FLAG_NEXT_MODULE |
-                                        DSDB_SEARCH_SHOW_RECYCLED |
-                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                        parent,
-                                        "objectGUID=%s",
-                                        GUID_string(tmp_ctx, &guid));
-       }
+       ret = replmd_check_target_exists(module, dsdb_dn, la_entry, msg->dn,
+                                        true, &guid, &ignore_link);
 
        if (ret != LDB_SUCCESS) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "Failed to re-resolve GUID %s: %s\n",
-                                      GUID_string(tmp_ctx, &guid),
-                                      ldb_errstring(ldb_module_get_ctx(module)));
                talloc_free(tmp_ctx);
                return ret;
        }
 
-       if (target_res->count == 0) {
-               DEBUG(2,(__location__ ": WARNING: Failed to re-resolve GUID %s - using %s\n",
-                        GUID_string(tmp_ctx, &guid),
-                        ldb_dn_get_linearized(dsdb_dn->dn)));
-       } else if (target_res->count != 1) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "More than one object found matching objectGUID %s\n",
-                                      GUID_string(tmp_ctx, &guid));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       } else {
-               target_msg = target_res->msgs[0];
-               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
-       }
-
        /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
+        * there are some cases where the target object doesn't exist, but it's
+        * OK to ignore the linked attribute
         */
-       replmd_deletion_state(module, target_msg,
-                             &target_deletion_state, NULL);
-
-       if (target_deletion_state >= OBJECT_RECYCLED) {
+       if (ignore_link) {
                talloc_free(tmp_ctx);
-               return LDB_SUCCESS;
+               return ret;
        }
 
        /* see if this link already exists */
        ret = parsed_dn_find(ldb, pdn_list, old_el->num_values,
                             &guid,
                             dsdb_dn->dn,
+                            dsdb_dn->extra_part, 0,
                             &pdn, &next,
-                            attr->syntax->ldap_oid);
+                            attr->syntax->ldap_oid,
+                            true);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
        }
 
+       if (!replmd_link_update_is_newer(pdn, la)) {
+               DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
+                        old_el->name, ldb_dn_get_linearized(msg->dn),
+                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
+               talloc_free(tmp_ctx);
+               return LDB_SUCCESS;
+       }
 
-       if (pdn != NULL) {
-               /* see if this update is newer than what we have already */
-               struct GUID invocation_id = GUID_zero();
-               uint32_t version = 0;
-               uint32_t originating_usn = 0;
-               NTTIME change_time = 0;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
-
-               dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &originating_usn, "RMD_ORIGINATING_USN");
-               dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
-
-               if (!replmd_update_is_newer(&invocation_id,
-                                           &la->meta_data.originating_invocation_id,
-                                           version,
-                                           la->meta_data.version,
-                                           change_time,
-                                           la->meta_data.originating_change_time)) {
-                       DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
-                                old_el->name, ldb_dn_get_linearized(msg->dn),
-                                GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
+       /* get a seq_num for this change */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
 
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       /*
+        * check for single-valued link conflicts, i.e. an active linked
+        * attribute already exists, but it has a different target value
+        */
+       if (active) {
+               ret = replmd_check_singleval_la_conflict(module, replmd_private,
+                                                        tmp_ctx, msg->dn, la,
+                                                        dsdb_dn, pdn, pdn_list,
+                                                        old_el, schema, attr,
+                                                        seq_num,
+                                                        &add_as_inactive);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
+       }
+
+       if (pdn != NULL) {
+               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
                        ret = replmd_add_backlink(module, replmd_private,
                                                  schema, 
                                                  msg->dn,
-                                                 &guid, false, attr,
+                                                 &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
@@ -7013,37 +7699,12 @@ linked_attributes[0]:
                        }
                }
 
-               ret = replmd_update_la_val(tmp_ctx, pdn->v, dsdb_dn, pdn->dsdb_dn,
-                                          &la->meta_data.originating_invocation_id,
-                                          la->meta_data.originating_usn, seq_num,
-                                          la->meta_data.originating_change_time,
-                                          la->meta_data.version,
-                                          !active);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+               val_to_update = pdn->v;
+               old_dsdb_dn = pdn->dsdb_dn;
 
-               if (active) {
-                       /* add the new backlink */
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-               }
        } else {
                unsigned offset;
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+
                /*
                 * We know where the new one needs to be, from the *next
                 * pointer into pdn_list.
@@ -7079,27 +7740,46 @@ linked_attributes[0]:
 
                old_el->num_values++;
 
-               ret = replmd_build_la_val(tmp_ctx, &old_el->values[offset], dsdb_dn,
-                                         &la->meta_data.originating_invocation_id,
-                                         la->meta_data.originating_usn, seq_num,
-                                         la->meta_data.originating_change_time,
-                                         la->meta_data.version,
-                                         !active);
+               val_to_update = &old_el->values[offset];
+               old_dsdb_dn = NULL;
+       }
+
+       /* set the link attribute's value to the info that was received */
+       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+                               &la->meta_data.originating_invocation_id,
+                               la->meta_data.originating_usn, seq_num,
+                               la->meta_data.originating_change_time,
+                               la->meta_data.version,
+                               !active);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (add_as_inactive) {
+
+               /* Set the new link as inactive/deleted to avoid conflicts */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              msg->dn, schema, attr, seq_num,
+                                              false, &guid, dsdb_dn,
+                                              val_to_update);
+
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
 
-               if (active) {
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
+       } else if (active) {
+
+               /* if the new link is active, then add the new backlink */
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema,
+                                         msg->dn,
+                                         &guid, true, attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
                }
        }
 
@@ -7137,7 +7817,10 @@ linked_attributes[0]:
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s'\n%s\n",
                          ldb_errstring(ldb),
-                         ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
+                         ldb_ldif_message_redacted_string(ldb,
+                                                          tmp_ctx,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg));
                talloc_free(tmp_ctx);
                return ret;
        }