replmd: Split up replmd_verify_linked_attribute() into src/target checks
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index 9597fc53fefe64d4e47cc9ea5b4d8eccd61a0d89..5bf819382f225dbd6d94df567922f5773247a8eb 100644 (file)
@@ -39,7 +39,9 @@
 #include "ldb_module.h"
 #include "dsdb/samdb/samdb.h"
 #include "dsdb/common/proto.h"
+#include "dsdb/common/util.h"
 #include "../libds/common/flags.h"
+#include "librpc/gen_ndr/irpc.h"
 #include "librpc/gen_ndr/ndr_misc.h"
 #include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/security/security.h"
 #include "lib/util/dlinklist.h"
 #include "dsdb/samdb/ldb_modules/util.h"
-#include "lib/util/binsearch.h"
 #include "lib/util/tsort.h"
 
+#undef DBGC_CLASS
+#define DBGC_CLASS            DBGC_DRS_REPL
+
+/* the RMD_VERSION for linked attributes starts from 1 */
+#define RMD_VERSION_INITIAL   1
+
 /*
  * It's 29/12/9999 at 23:59:59 UTC as specified in MS-ADTS 7.1.1.4.2
  * Deleted Objects Container
@@ -58,7 +65,7 @@ static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000UL
 
 struct replmd_private {
        TALLOC_CTX *la_ctx;
-       struct la_entry *la_list;
+       struct la_group *la_list;
        struct nc_entry {
                struct nc_entry *prev, *next;
                struct ldb_dn *dn;
@@ -68,11 +75,28 @@ struct replmd_private {
        struct ldb_dn *schema_dn;
        bool originating_updates;
        bool sorted_links;
+       uint32_t total_links;
+       uint32_t num_processed;
+};
+
+/*
+ * groups link attributes together by source-object and attribute-ID,
+ * to improve processing efficiency (i.e. for 'member' attribute, which
+ * could have 100s or 1000s of links).
+ * Note this grouping is best effort - the same source object could still
+ * correspond to several la_groups (a lot depends on the order DRS sends
+ * the links in). The groups currently don't span replication chunks (which
+ * caps the size to ~1500 links by default).
+ */
+struct la_group {
+       struct la_group *next, *prev;
+       struct la_entry *la_entries;
 };
 
 struct la_entry {
        struct la_entry *next, *prev;
        struct drsuapi_DsReplicaLinkedAttribute *la;
+       uint32_t dsdb_repl_flags;
 };
 
 struct replmd_replicated_request {
@@ -104,12 +128,8 @@ struct replmd_replicated_request {
        bool is_urgent;
 
        bool isDeleted;
-};
 
-struct parsed_dn {
-       struct dsdb_dn *dsdb_dn;
-       struct GUID guid;
-       struct ldb_val *v;
+       bool fix_link_sid;
 };
 
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
@@ -118,6 +138,27 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      struct parsed_dn *dns, uint32_t count,
                                      struct ldb_message_element *el,
                                      const char *ldap_oid);
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_message *src_msg,
+                                    const struct dsdb_attribute *attr);
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg);
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted);
+
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid);
 
 enum urgent_situation {
        REPL_URGENT_ON_CREATE = 1,
@@ -240,7 +281,6 @@ static bool replmd_check_urgent_attribute(const struct ldb_message_element *el)
        return false;
 }
 
-
 static int replmd_replicated_apply_isDeleted(struct replmd_replicated_request *ar);
 
 /*
@@ -377,8 +417,9 @@ static int replmd_process_backlink(struct ldb_module *module, struct la_backlink
        ret = dsdb_module_dn_by_guid(module, frame, &bl->target_guid, &target_dn, parent);
        if (ret != LDB_SUCCESS) {
                struct GUID_txt_buf guid_str;
-               DEBUG(2,(__location__ ": WARNING: Failed to find target DN for linked attribute with GUID %s\n",
-                        GUID_buf_string(&bl->target_guid, &guid_str)));
+               DBG_WARNING("Failed to find target DN for linked attribute with GUID %s\n",
+                           GUID_buf_string(&bl->target_guid, &guid_str));
+               DBG_WARNING("Please run 'samba-tool dbcheck' to resolve any missing backlinks.\n");
                talloc_free(frame);
                return LDB_SUCCESS;
        }
@@ -562,9 +603,41 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
        }
 
        if (ares->error != LDB_SUCCESS) {
-               DEBUG(5,("%s failure. Error is: %s\n", __FUNCTION__, ldb_strerror(ares->error)));
+               struct GUID_txt_buf guid_txt;
+               struct ldb_message *msg = NULL;
+               char *s = NULL;
+
+               if (ac->apply_mode == false) {
+                       DBG_NOTICE("Originating update failure. Error is: %s\n",
+                                  ldb_strerror(ares->error));
+                       return ldb_module_done(ac->req, controls,
+                                              ares->response, ares->error);
+               }
+
+               msg = ac->objs->objects[ac->index_current].msg;
+               /*
+                * Set at DBG_NOTICE as once these start to happe, they
+                * will happen a lot until resolved, due to repeated
+                * replication.  The caller will probably print the
+                * ldb error string anyway.
+                */
+               DBG_NOTICE("DRS replication apply failure for %s. Error is: %s\n",
+                          ldb_dn_get_linearized(msg->dn),
+                          ldb_strerror(ares->error));
+
+               s = ldb_ldif_message_redacted_string(ldb_module_get_ctx(ac->module),
+                                                    ac,
+                                                    LDB_CHANGETYPE_ADD,
+                                                    msg);
+
+               DBG_INFO("Failing DRS %s replication message was %s:\n%s\n",
+                        ac->search_msg == NULL ? "ADD" : "MODIFY",
+                        GUID_buf_string(&ac->objs->objects[ac->index_current].object_guid,
+                                        &guid_txt),
+                        s);
+               talloc_free(s);
                return ldb_module_done(ac->req, controls,
-                                       ares->response, ares->error);
+                                      ares->response, ares->error);
        }
 
        if (ares->type != LDB_REPLY_DONE) {
@@ -889,13 +962,19 @@ static void replmd_ldb_message_sort(struct ldb_message *msg,
 }
 
 static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted);
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime);
+
+static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2);
 
 static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                          struct ldb_message_element *el, struct parsed_dn **pdn,
                          const char *ldap_oid, struct ldb_request *parent);
 
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn);
+
 /*
   fix up linked attributes in replmd_add.
   This involves setting up the right meta-data in extended DN
@@ -917,9 +996,27 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
        /* We will take a reference to the schema in replmd_add_backlink */
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, NULL);
        struct ldb_val *new_values = NULL;
+       int ret;
+
+       if (dsdb_check_single_valued_link(sa, el) == LDB_SUCCESS) {
+               el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
+       } else {
+               ldb_asprintf_errstring(ldb,
+                                      "Attribute %s is single valued but "
+                                      "more than one value has been supplied",
+                                      el->name);
+               talloc_free(tmp_ctx);
+               return LDB_ERR_CONSTRAINT_VIOLATION;
+       }
        
-       int ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
-                                sa->syntax->ldap_oid, parent);
+       ret = get_parsed_dns(module, tmp_ctx, el, &pdn,
+                            sa->syntax->ldap_oid, parent);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       ret = check_parsed_dn_duplicates(module, el, pdn);
        if (ret != LDB_SUCCESS) {
                talloc_free(tmp_ctx);
                return ret;
@@ -936,7 +1033,7 @@ static int replmd_add_fix_la(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                struct parsed_dn *p = &pdn[i];
                ret = replmd_build_la_val(el->values, p->v, p->dsdb_dn,
                                          &ac->our_invocation_id,
-                                         ac->seq_num, ac->seq_num, now, 0, false);
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -1373,6 +1470,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      const struct GUID *our_invocation_id,
                                      NTTIME now,
                                      bool is_schema_nc,
+                                     bool is_forced_rodc,
                                      struct ldb_request *req)
 {
        uint32_t i;
@@ -1511,6 +1609,7 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1 = &omd->ctr.ctr1.array[i];
        md1->version++;
        md1->attid = attid;
+
        if (md1->attid == DRSUAPI_ATTID_isDeleted) {
                const struct ldb_val *rdn_val = ldb_dn_get_rdn_val(msg->dn);
                const char* rdn;
@@ -1537,6 +1636,11 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
        md1->originating_usn           = *seq_num;
        md1->local_usn                 = *seq_num;
 
+       if (is_forced_rodc) {
+               /* Force version to 0 to be overriden later via replication */
+               md1->version = 0;
+       }
+
        return LDB_SUCCESS;
 }
 
@@ -1557,7 +1661,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
                                       struct replPropertyMetaDataBlob *omd,
                                       struct replmd_replicated_request *ar,
                                       NTTIME now,
-                                      bool is_schema_nc)
+                                      bool is_schema_nc,
+                                      bool is_forced_rodc)
 {
        const char *rdn_name = ldb_dn_get_rdn_name(msg->dn);
        const struct dsdb_attribute *rdn_attr =
@@ -1588,7 +1693,8 @@ static int replmd_update_rpmd_rdn_attr(struct ldb_context *ldb,
        return replmd_update_rpmd_element(ldb, msg, &new_el, NULL,
                                          omd, ar->schema, &ar->seq_num,
                                          &ar->our_invocation_id,
-                                         now, is_schema_nc, ar->req);
+                                         now, is_schema_nc, is_forced_rodc,
+                                         ar->req);
 
 }
 
@@ -1635,6 +1741,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
        bool rmd_is_provided;
        bool rmd_is_just_resorted = false;
        const char *not_rename_attrs[4 + msg->num_elements];
+       bool is_forced_rodc = false;
 
        if (rename_attrs) {
                attrs = rename_attrs;
@@ -1651,6 +1758,17 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
        ldb = ldb_module_get_ctx(module);
 
+       ret = samdb_rodc(ldb, rodc);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
+               *rodc = false;
+       }
+
+       if (*rodc &&
+           ldb_request_get_control(req, DSDB_CONTROL_FORCE_RODC_LOCAL_CHANGE)) {
+               is_forced_rodc = true;
+       }
+
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
                /* this happens during an initial vampire while
@@ -1787,6 +1905,7 @@ static int replmd_update_rpmd(struct ldb_module *module,
                                                         &omd, schema, seq_num,
                                                         our_invocation_id,
                                                         now, is_schema_nc,
+                                                        is_forced_rodc,
                                                         req);
                        if (ret != LDB_SUCCESS) {
                                return ret;
@@ -1842,13 +1961,11 @@ static int replmd_update_rpmd(struct ldb_module *module,
 
                /*if we are RODC and this is a DRSR update then its ok*/
                if (!ldb_request_get_control(req, DSDB_CONTROL_REPLICATED_UPDATE_OID)
-                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)) {
+                   && !ldb_request_get_control(req, DSDB_CONTROL_DBCHECK_MODIFY_RO_REPLICA)
+                   && !is_forced_rodc) {
                        unsigned instanceType;
 
-                       ret = samdb_rodc(ldb, rodc);
-                       if (ret != LDB_SUCCESS) {
-                               DEBUG(4, (__location__ ": unable to tell if we are an RODC\n"));
-                       } else if (*rodc) {
+                       if (*rodc) {
                                ldb_set_errstring(ldb, "RODC modify is forbidden!");
                                return LDB_ERR_REFERRAL;
                        }
@@ -1894,61 +2011,6 @@ static int replmd_update_rpmd(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
-struct compare_ctx {
-       struct GUID *guid;
-       struct ldb_context *ldb;
-       TALLOC_CTX *mem_ctx;
-       const char *ldap_oid;
-       int err;
-       const struct GUID *invocation_id;
-       DATA_BLOB extra_part;
-       bool compare_extra_part;
-};
-
-/* When a parsed_dn comes from the database, sometimes it is not really parsed. */
-
-static int really_parse_trusted_dn(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
-                                  struct parsed_dn *pdn, const char *ldap_oid)
-{
-       NTSTATUS status;
-       struct dsdb_dn *dsdb_dn = dsdb_dn_parse_trusted(mem_ctx, ldb, pdn->v,
-                                                       ldap_oid);
-       if (dsdb_dn == NULL) {
-               return LDB_ERR_INVALID_DN_SYNTAX;
-       }
-
-       status = dsdb_get_extended_dn_guid(dsdb_dn->dn, &pdn->guid, "GUID");
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       pdn->dsdb_dn = dsdb_dn;
-       return LDB_SUCCESS;
-}
-
-/*
- * We choose, as the sort order, the same order as is used in DRS replication,
- * which is the memcmp() order of the NDR GUID, not that obtained from
- * GUID_compare().
- *
- * This means that sorted links will be in the same order as a new DC would
- * see them.
- */
-static int ndr_guid_compare(struct GUID *guid1, struct GUID *guid2)
-{
-       uint8_t v1_data[16];
-       struct ldb_val v1 = data_blob_const(v1_data, sizeof(v1_data));
-       uint8_t v2_data[16];
-       struct ldb_val v2 = data_blob_const(v2_data, sizeof(v2_data));
-
-       /* This can't fail */
-       ndr_push_struct_into_fixed_blob(&v1, guid1,
-                                       (ndr_push_flags_fn_t)ndr_push_GUID);
-       /* This can't fail */
-       ndr_push_struct_into_fixed_blob(&v2, guid2,
-                                       (ndr_push_flags_fn_t)ndr_push_GUID);
-       return data_blob_cmp(&v1, &v2);
-}
-
 static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
 {
        int ret = ndr_guid_compare(&pdn1->guid, &pdn2->guid);
@@ -1959,136 +2021,6 @@ static int parsed_dn_compare(struct parsed_dn *pdn1, struct parsed_dn *pdn2)
        return ret;
 }
 
-static int la_guid_compare_with_trusted_dn(struct compare_ctx *ctx,
-                                          struct parsed_dn *p)
-{
-       int cmp = 0;
-       /*
-        * This works like a standard compare function in its return values,
-        * but has an extra trick to deal with errors: zero is returned and
-        * ctx->err is set to the ldb error code.
-        *
-        * That is, if (as is expected in most cases) you get a non-zero
-        * result, you don't need to check for errors.
-        *
-        * We assume the second argument refers to a DN is from the database
-        * and has a GUID -- but this GUID might not have been parsed out yet.
-        */
-       if (p->dsdb_dn == NULL) {
-               int ret = really_parse_trusted_dn(ctx->mem_ctx, ctx->ldb, p,
-                                                 ctx->ldap_oid);
-               if (ret != LDB_SUCCESS) {
-                       ctx->err = ret;
-                       return 0;
-               }
-       }
-       cmp = ndr_guid_compare(ctx->guid, &p->guid);
-       if (cmp == 0 && ctx->compare_extra_part) {
-               return data_blob_cmp(&ctx->extra_part, &p->dsdb_dn->extra_part);
-       }
-
-       return cmp;
-}
-
-
-
-static int parsed_dn_find(struct ldb_context *ldb, struct parsed_dn *pdn,
-                         unsigned int count,
-                         struct GUID *guid,
-                         struct ldb_dn *target_dn,
-                         DATA_BLOB extra_part,
-                         struct parsed_dn **exact,
-                         struct parsed_dn **next,
-                         const char *ldap_oid,
-                         bool compare_extra_part)
-{
-       unsigned int i;
-       struct compare_ctx ctx;
-       if (pdn == NULL) {
-               *exact = NULL;
-               *next = NULL;
-               return LDB_SUCCESS;
-       }
-
-       if (unlikely(GUID_all_zero(guid))) {
-               /*
-                * When updating a link using DRS, we sometimes get a NULL
-                * GUID when a forward link has been deleted and its GUID has
-                * for some reason been forgotten. The best we can do is try
-                * and match by DN via a linear search. Note that this
-                * probably only happens in the ADD case, in which we only
-                * allow modification of link if it is already deleted, so
-                * this seems very close to an elaborate NO-OP, but we are not
-                * quite prepared to declare it so.
-                *
-                * If the DN is not in our list, we have to add it to the
-                * beginning of the list, where it would naturally sort.
-                */
-               struct parsed_dn *p;
-               if (target_dn == NULL) {
-                       /* We don't know the target DN, so we can't search for DN */
-                       DEBUG(1, ("parsed_dn_find has a NULL GUID for a linked "
-                                 "attribute but we don't have a DN to compare "
-                                 "it with\n"));
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *exact = NULL;
-               *next = NULL;
-
-               DEBUG(3, ("parsed_dn_find has a NULL GUID for a link to DN "
-                         "%s; searching through links for it",
-                         ldb_dn_get_linearized(target_dn)));
-
-               for (i = 0; i < count; i++) {
-                       int cmp;
-                       p = &pdn[i];
-                       if (p->dsdb_dn == NULL) {
-                               int ret = really_parse_trusted_dn(pdn, ldb, p, ldap_oid);
-                               if (ret != LDB_SUCCESS) {
-                                       return LDB_ERR_OPERATIONS_ERROR;
-                               }
-                       }
-
-                       cmp = ldb_dn_compare(p->dsdb_dn->dn, target_dn);
-                       if (cmp == 0) {
-                               *exact = p;
-                               return LDB_SUCCESS;
-                       }
-               }
-               /*
-                * Here we have a null guid which doesn't match any existing
-                * link. This is a bit unexpected because null guids occur
-                * when a forward link has been deleted and we are replicating
-                * that deletion.
-                *
-                * The best thing to do is weep into the logs and add the
-                * offending link to the beginning of the list which is
-                * at least the correct sort position.
-                */
-               DEBUG(1, ("parsed_dn_find has been given a NULL GUID for a "
-                         "link to unknown DN %s\n",
-                         ldb_dn_get_linearized(target_dn)));
-               *next = pdn;
-               return LDB_SUCCESS;
-       }
-
-       ctx.guid = guid;
-       ctx.ldb = ldb;
-       ctx.mem_ctx = pdn;
-       ctx.ldap_oid = ldap_oid;
-       ctx.extra_part = extra_part;
-       ctx.compare_extra_part = compare_extra_part;
-       ctx.err = 0;
-
-       BINARY_ARRAY_SEARCH_GTE(pdn, count, &ctx, la_guid_compare_with_trusted_dn,
-                               *exact, *next);
-
-       if (ctx.err != 0) {
-               return ctx.err;
-       }
-       return LDB_SUCCESS;
-}
-
 /*
   get a series of message element values as an array of DNs and GUIDs
   the result is sorted by GUID
@@ -2133,8 +2065,12 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
-                               ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
-                                                      ldb_dn_get_linearized(dn));
+                               char *dn_str = NULL;
+                               dn_str = ldb_dn_get_extended_linearized(mem_ctx,
+                                                                       (dn), 1);
+                               ldb_asprintf_errstring(ldb,
+                                               "Unable to find GUID for DN %s\n",
+                                               dn_str);
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
@@ -2232,6 +2168,37 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
        return LDB_SUCCESS;
 }
 
+/*
+   Return LDB_SUCCESS if a parsed_dn list contains no duplicate values,
+   otherwise an error code. For compatibility the error code differs depending
+   on whether or not the attribute is "member".
+
+   As always, the parsed_dn list is assumed to be sorted.
+ */
+static int check_parsed_dn_duplicates(struct ldb_module *module,
+                                     struct ldb_message_element *el,
+                                     struct parsed_dn *pdn)
+{
+       unsigned int i;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       for (i = 1; i < el->num_values; i++) {
+               struct parsed_dn *p = &pdn[i];
+               if (parsed_dn_compare(p, &pdn[i - 1]) == 0) {
+                       ldb_asprintf_errstring(ldb,
+                                              "Linked attribute %s has "
+                                              "multiple identical values",
+                                              el->name);
+                       if (ldb_attr_cmp(el->name, "member") == 0) {
+                               return LDB_ERR_ENTRY_ALREADY_EXISTS;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
+       }
+       return LDB_SUCCESS;
+}
+
 /*
   build a new extended DN, including all meta data fields
 
@@ -2243,85 +2210,20 @@ static int get_parsed_dns_trusted(struct ldb_module *module,
   RMD_LOCAL_USN       = local_usn
   RMD_VERSION         = version
  */
-static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                              const struct GUID *invocation_id, uint64_t seq_num,
-                              uint64_t local_usn, NTTIME nttime, uint32_t version, bool deleted)
+static int replmd_build_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v,
+                              struct dsdb_dn *dsdb_dn,
+                              const struct GUID *invocation_id,
+                              uint64_t local_usn, NTTIME nttime)
 {
-       struct ldb_dn *dn = dsdb_dn->dn;
-       const char *tstring, *usn_string, *flags_string;
-       struct ldb_val tval;
-       struct ldb_val iid;
-       struct ldb_val usnv, local_usnv;
-       struct ldb_val vers, flagsv;
-       NTSTATUS status;
-       int ret;
-       const char *dnstring;
-       char *vstring;
-       uint32_t rmd_flags = deleted?DSDB_RMD_FLAG_DELETED:0;
-
-       tstring = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)nttime);
-       if (!tstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       tval = data_blob_string_const(tstring);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)seq_num);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       usnv = data_blob_string_const(usn_string);
-
-       usn_string = talloc_asprintf(mem_ctx, "%llu", (unsigned long long)local_usn);
-       if (!usn_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       local_usnv = data_blob_string_const(usn_string);
-
-       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
-       if (!vstring) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       vers = data_blob_string_const(vstring);
-
-       status = GUID_to_ndr_blob(invocation_id, dn, &iid);
-       if (!NT_STATUS_IS_OK(status)) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       flags_string = talloc_asprintf(mem_ctx, "%u", rmd_flags);
-       if (!flags_string) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       flagsv = data_blob_string_const(flags_string);
-
-       ret = ldb_dn_set_extended_component(dn, "RMD_FLAGS", &flagsv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ADDTIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_INVOCID", &iid);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_CHANGETIME", &tval);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_ORIGINATING_USN", &usnv);
-       if (ret != LDB_SUCCESS) return ret;
-       ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
-       if (ret != LDB_SUCCESS) return ret;
-
-       dnstring = dsdb_dn_get_extended_linearized(mem_ctx, dsdb_dn, 1);
-       if (dnstring == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-       *v = data_blob_string_const(dnstring);
-
-       return LDB_SUCCESS;
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, NULL, invocation_id,
+                                local_usn, local_usn, nttime,
+                                RMD_VERSION_INITIAL, false);
 }
 
 static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                                struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                                uint64_t seq_num, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted);
+                               bool deleted);
 
 /*
   check if any links need upgrading from w2k format
@@ -2375,7 +2277,7 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                /* it's an old one that needs upgrading */
                ret = replmd_update_la_val(el->values, dns[i].v,
                                           dns[i].dsdb_dn, dns[i].dsdb_dn,
-                                          invocation_id, 1, 1, 0, 0, false);
+                                          invocation_id, 1, 1, 0, false);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -2397,14 +2299,14 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
 }
 
 /*
-  update an extended DN, including all meta data fields
+  Sets the value for a linked attribute, including all meta data fields
 
   see replmd_build_la_val for value names
  */
-static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
-                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
-                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
-                               uint32_t version, bool deleted)
+static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                            struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                            uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                            uint32_t version, bool deleted)
 {
        struct ldb_dn *dn = dsdb_dn->dn;
        const char *tstring, *usn_string, *flags_string;
@@ -2412,8 +2314,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        struct ldb_val iid;
        struct ldb_val usnv, local_usnv;
        struct ldb_val vers, flagsv;
-       const struct ldb_val *old_addtime;
-       uint32_t old_version;
+       const struct ldb_val *old_addtime = NULL;
        NTSTATUS status;
        int ret;
        const char *dnstring;
@@ -2453,7 +2354,10 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        if (ret != LDB_SUCCESS) return ret;
 
        /* get the ADDTIME from the original */
-       old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn, "RMD_ADDTIME");
+       if (old_dsdb_dn != NULL) {
+               old_addtime = ldb_dn_get_extended_component(old_dsdb_dn->dn,
+                                                           "RMD_ADDTIME");
+       }
        if (old_addtime == NULL) {
                old_addtime = &tval;
        }
@@ -2478,12 +2382,7 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        ret = ldb_dn_set_extended_component(dn, "RMD_LOCAL_USN", &local_usnv);
        if (ret != LDB_SUCCESS) return ret;
 
-       /* increase the version by 1 */
-       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version, "RMD_VERSION");
-       if (NT_STATUS_IS_OK(status) && old_version >= version) {
-               version = old_version+1;
-       }
-       vstring = talloc_asprintf(dn, "%lu", (unsigned long)version);
+       vstring = talloc_asprintf(mem_ctx, "%lu", (unsigned long)version);
        vers = data_blob_string_const(vstring);
        ret = ldb_dn_set_extended_component(dn, "RMD_VERSION", &vers);
        if (ret != LDB_SUCCESS) return ret;
@@ -2497,17 +2396,43 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
        return LDB_SUCCESS;
 }
 
+/**
+ * Updates the value for a linked attribute, including all meta data fields
+ */
+static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
+                               struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
+                               uint64_t usn, uint64_t local_usn, NTTIME nttime,
+                               bool deleted)
+{
+       uint32_t old_version;
+       uint32_t version = RMD_VERSION_INITIAL;
+       NTSTATUS status;
+
+       /*
+        * We're updating the linked attribute locally, so increase the version
+        * by 1 so that other DCs will see the change when it gets replicated out
+        */
+       status = dsdb_get_extended_dn_uint32(old_dsdb_dn->dn, &old_version,
+                                            "RMD_VERSION");
+
+       if (NT_STATUS_IS_OK(status)) {
+               version = old_version + 1;
+       }
+
+       return replmd_set_la_val(mem_ctx, v, dsdb_dn, old_dsdb_dn, invocation_id,
+                                usn, local_usn, nttime, version, deleted);
+}
+
 /*
   handle adding a linked attribute
  */
 static int replmd_modify_la_add(struct ldb_module *module,
                                struct replmd_private *replmd_private,
-                               const struct dsdb_schema *schema,
+                               struct replmd_replicated_request *ac,
                                struct ldb_message *msg,
                                struct ldb_message_element *el,
                                struct ldb_message_element *old_el,
                                const struct dsdb_attribute *schema_attr,
-                               uint64_t seq_num,
                                time_t t,
                                struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
@@ -2520,17 +2445,10 @@ static int replmd_modify_la_add(struct ldb_module *module,
        unsigned old_num_values = old_el ? old_el->num_values : 0;
        unsigned num_values = 0;
        unsigned max_num_values;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /* get the DNs to be added, fully parsed.
         *
         * We need full parsing because they came off the wire and we don't
@@ -2557,7 +2475,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
        max_num_values = old_num_values + el->num_values;
        if (max_num_values < old_num_values) {
                DEBUG(0, ("we seem to have overflow in replmd_modify_la_add. "
-                         "old values: %u, new values: %u, sum: %u",
+                         "old values: %u, new values: %u, sum: %u\n",
                          old_num_values, el->num_values, max_num_values));
                talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
@@ -2584,7 +2502,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
                int err = parsed_dn_find(ldb, old_dns, old_num_values,
                                         &dns[i].guid,
                                         dns[i].dsdb_dn->dn,
-                                        dns[i].dsdb_dn->extra_part,
+                                        dns[i].dsdb_dn->extra_part, 0,
                                         &exact, &next,
                                         schema_attr->syntax->ldap_oid,
                                         true);
@@ -2593,22 +2511,125 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        return err;
                }
 
-               if (exact != NULL) {
+               if (ac->fix_link_sid) {
+                       char *fixed_dnstring = NULL;
+                       struct dom_sid tmp_sid = { 0, };
+                       DATA_BLOB sid_blob = data_blob_null;
+                       enum ndr_err_code ndr_err;
+                       NTSTATUS status;
+                       int num;
+
+                       if (exact == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       if (dns[i].dsdb_dn->dn_format != DSDB_NORMAL_DN) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
                        /*
-                        * We are trying to add one that exists, which is only
-                        * allowed if it was previously deleted.
+                        * Only "<GUID=...><SID=...>" is allowed.
                         *
-                        * When we do undelete a link we change it in place.
-                        * It will be copied across into the right spot in due
-                        * course.
+                        * We get the GUID to just to find the old
+                        * value and the SID in order to add it
+                        * to the found value.
                         */
-                       uint32_t rmd_flags;
-                       rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
 
-                       if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
+                       num = ldb_dn_get_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 0) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       num = ldb_dn_get_extended_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 2) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(exact->dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
+                               /* this is what we expect */
+                       } else if (NT_STATUS_IS_OK(status)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                                                      "i[%u] SID NOT MISSING... Attribute %s already "
+                                                      "exists for target GUID %s, SID %s, DN: %s",
+                                                      i, el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str),
+                                                      dom_sid_string(tmp_ctx, &tmp_sid),
+                                                      dsdb_dn_get_extended_linearized(tmp_ctx,
+                                                              exact->dsdb_dn, 1));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       } else {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(dns[i].dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (!NT_STATUS_IS_OK(status)) {
                                struct GUID_txt_buf guid_str;
                                ldb_asprintf_errstring(ldb,
-                                                      "Attribute %s already "
+                                                      "NO SID PROVIDED... Attribute %s already "
+                                                      "exists for target GUID %s",
+                                                      el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+
+                       ndr_err = ndr_push_struct_blob(&sid_blob, tmp_ctx, &tmp_sid,
+                                                      (ndr_push_flags_fn_t)ndr_push_dom_sid);
+                       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       ret = ldb_dn_set_extended_component(exact->dsdb_dn->dn, "SID", &sid_blob);
+                       data_blob_free(&sid_blob);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+
+                       fixed_dnstring = dsdb_dn_get_extended_linearized(
+                                       new_values, exact->dsdb_dn, 1);
+                       if (fixed_dnstring == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       /*
+                        * We just replace the existing value...
+                        */
+                       *exact->v = data_blob_string_const(fixed_dnstring);
+
+                       continue;
+               }
+
+               if (exact != NULL) {
+                       /*
+                        * We are trying to add one that exists, which is only
+                        * allowed if it was previously deleted.
+                        *
+                        * When we do undelete a link we change it in place.
+                        * It will be copied across into the right spot in due
+                        * course.
+                        */
+                       uint32_t rmd_flags;
+                       rmd_flags = dsdb_dn_rmd_flags(exact->dsdb_dn->dn);
+
+                       if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_asprintf_errstring(ldb,
+                                                      "Attribute %s already "
                                                       "exists for target GUID %s",
                                                       el->name,
                                                       GUID_buf_string(&exact->guid,
@@ -2626,15 +2647,16 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, exact->v,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, 0, false);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
 
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &dns[i].guid, 
                                                  true,
@@ -2676,15 +2698,14 @@ static int replmd_modify_la_add(struct ldb_module *module,
                }
 
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &dns[i].guid,
                                          true, schema_attr,
                                          parent);
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
-                                         dns[i].dsdb_dn, invocation_id,
-                                         seq_num, seq_num,
-                                         now, 0, false);
+                                         dns[i].dsdb_dn, &ac->our_invocation_id,
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2723,12 +2744,11 @@ static int replmd_modify_la_add(struct ldb_module *module,
  */
 static int replmd_modify_la_delete(struct ldb_module *module,
                                   struct replmd_private *replmd_private,
-                                  const struct dsdb_schema *schema,
+                                  struct replmd_replicated_request *ac,
                                   struct ldb_message *msg,
                                   struct ldb_message_element *el,
                                   struct ldb_message_element *old_el,
                                   const struct dsdb_attribute *schema_attr,
-                                  uint64_t seq_num,
                                   time_t t,
                                   struct ldb_dn *msg_dn,
                                   struct ldb_request *parent)
@@ -2742,16 +2762,10 @@ static int replmd_modify_la_delete(struct ldb_module *module,
        bool vanish_links = false;
        unsigned int num_to_delete = el->num_values;
        uint32_t rmd_flags;
-       const struct GUID *invocation_id;
        NTTIME now;
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        if (old_el == NULL || old_el->num_values == 0) {
                /* there is nothing to delete... */
                if (num_to_delete == 0) {
@@ -2815,7 +2829,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                                }
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, msg_dn, &p->guid,
+                                                 ac->schema, msg_dn, &p->guid,
                                                  false, schema_attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
@@ -2833,8 +2847,9 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, 0, true);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2856,7 +2871,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                ret = parsed_dn_find(ldb, old_dns, old_el->num_values,
                                     &p->guid,
                                     NULL,
-                                    p->dsdb_dn->extra_part,
+                                    p->dsdb_dn->extra_part, 0,
                                     &exact, &next,
                                     schema_attr->syntax->ldap_oid,
                                     true);
@@ -2896,7 +2911,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        /* remove the backlink */
                        ret = replmd_add_backlink(module,
                                                  replmd_private,
-                                                 schema, 
+                                                 ac->schema,
                                                  msg_dn,
                                                  &p->guid,
                                                  false, schema_attr,
@@ -2930,14 +2945,15 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
-                                          invocation_id, seq_num, seq_num,
-                                          now, 0, true);
+                                          &ac->our_invocation_id,
+                                          ac->seq_num, ac->seq_num,
+                                          now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &p->guid,
                                          false, schema_attr,
                                          parent);
@@ -2949,11 +2965,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
@@ -2975,12 +3003,11 @@ static int replmd_modify_la_delete(struct ldb_module *module,
  */
 static int replmd_modify_la_replace(struct ldb_module *module,
                                    struct replmd_private *replmd_private,
-                                   const struct dsdb_schema *schema,
+                                   struct replmd_replicated_request *ac,
                                    struct ldb_message *msg,
                                    struct ldb_message_element *el,
                                    struct ldb_message_element *old_el,
                                    const struct dsdb_attribute *schema_attr,
-                                   uint64_t seq_num,
                                    time_t t,
                                    struct ldb_dn *msg_dn,
                                    struct ldb_request *parent)
@@ -2989,7 +3016,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = talloc_new(msg);
        int ret;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_val *new_values = NULL;
        const char *ldap_oid = schema_attr->syntax->ldap_oid;
@@ -3000,11 +3026,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /*
         * The replace operation is unlike the replace and delete cases in that
         * we need to look at every existing link to see whether it is being
@@ -3043,6 +3064,12 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                return ret;
        }
 
+       ret = check_parsed_dn_duplicates(module, el, dns);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
        ret = get_parsed_dns(module, tmp_ctx, old_el, &old_dns,
                             ldap_oid, parent);
        if (ret != LDB_SUCCESS) {
@@ -3098,16 +3125,16 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                ret = replmd_update_la_val(new_values, old_p->v,
                                                           old_p->dsdb_dn,
                                                           old_p->dsdb_dn,
-                                                          invocation_id,
-                                                          seq_num, seq_num,
-                                                          now, 0, true);
+                                                          &ac->our_invocation_id,
+                                                          ac->seq_num, ac->seq_num,
+                                                          now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
                                        return ret;
                                }
 
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &old_p->guid, false,
                                                          schema_attr,
@@ -3132,9 +3159,9 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, old_p->v,
                                                   new_p->dsdb_dn,
                                                   old_p->dsdb_dn,
-                                                  invocation_id,
-                                                  seq_num, seq_num,
-                                                  now, 0, false);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -3143,7 +3170,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
                        if ((rmd_flags & DSDB_RMD_FLAG_DELETED) != 0) {
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &new_p->guid, true,
                                                          schema_attr,
@@ -3165,15 +3192,14 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_build_la_val(new_values,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
-                                                 invocation_id,
-                                                 seq_num, seq_num,
-                                                 now, 0, false);
+                                                 &ac->our_invocation_id,
+                                                 ac->seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &new_p->guid, true,
                                                  schema_attr,
@@ -3204,8 +3230,9 @@ static int replmd_modify_la_replace(struct ldb_module *module,
  */
 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                                               struct replmd_private *replmd_private,
+                                              struct replmd_replicated_request *ac,
                                               struct ldb_message *msg,
-                                              uint64_t seq_num, time_t t,
+                                              time_t t,
                                               struct ldb_request *parent)
 {
        struct ldb_result *res;
@@ -3214,8 +3241,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *old_msg;
 
-       const struct dsdb_schema *schema;
-
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
                 * Nothing special is required for modifying or vanishing links
@@ -3249,18 +3274,15 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       schema = dsdb_get_schema(ldb, res);
-       if (!schema) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
        old_msg = res->msgs[0];
 
        for (i=0; i<msg->num_elements; i++) {
                struct ldb_message_element *el = &msg->elements[i];
                struct ldb_message_element *old_el, *new_el;
+               unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
-                       = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
+                       = dsdb_attribute_by_lDAPDisplayName(ac->schema, el->name);
                if (!schema_attr) {
                        ldb_asprintf_errstring(ldb,
                                               "%s: attribute %s is not a valid attribute in schema",
@@ -3271,34 +3293,47 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
                        return LDB_ERR_UNWILLING_TO_PERFORM;
                }
                old_el = ldb_msg_find_element(old_msg, el->name);
-               switch (el->flags & LDB_FLAG_MOD_MASK) {
+               switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
-                                                      schema, msg, el, old_el,
-                                                      schema_attr, seq_num, t,
+                                                      ac, msg, el, old_el,
+                                                      schema_attr, t,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
-                                                     schema, msg, el, old_el,
-                                                     schema_attr, seq_num, t,
+                                                     ac, msg, el, old_el,
+                                                     schema_attr, t,
                                                      old_msg->dn,
                                                      parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
-                                                  schema, msg, el, old_el,
-                                                  schema_attr, seq_num, t,
+                                                  ac, msg, el, old_el,
+                                                  schema_attr, t,
                                                   old_msg->dn,
                                                   parent);
                        break;
@@ -3312,13 +3347,16 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        ldb_asprintf_errstring(ldb,
                                               "Attribute %s is single valued but more than one value has been supplied",
                                               el->name);
-                       return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       /* Return codes as found on Windows 2012r2 */
+                       if (mod_type == LDB_FLAG_MOD_REPLACE) {
+                               return LDB_ERR_CONSTRAINT_VIOLATION;
+                       } else {
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
                } else {
                        el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
                }
 
-
-
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
@@ -3344,6 +3382,47 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
 }
 
 
+static int send_rodc_referral(struct ldb_request *req,
+                             struct ldb_context *ldb,
+                             struct ldb_dn *dn)
+{
+       char *referral = NULL;
+       struct loadparm_context *lp_ctx = NULL;
+       struct ldb_dn *fsmo_role_dn = NULL;
+       struct ldb_dn *role_owner_dn = NULL;
+       const char *domain = NULL;
+       WERROR werr;
+
+       lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
+                                struct loadparm_context);
+
+       werr = dsdb_get_fsmo_role_info(req, ldb, DREPL_PDC_MASTER,
+                                      &fsmo_role_dn, &role_owner_dn);
+
+       if (W_ERROR_IS_OK(werr)) {
+               struct ldb_dn *server_dn = ldb_dn_copy(req, role_owner_dn);
+               if (server_dn != NULL) {
+                       ldb_dn_remove_child_components(server_dn, 1);
+                       domain = samdb_dn_to_dnshostname(ldb, req,
+                                                        server_dn);
+               }
+       }
+
+       if (domain == NULL) {
+               domain = lpcfg_dnsdomain(lp_ctx);
+       }
+
+       referral = talloc_asprintf(req, "ldap://%s/%s",
+                                  domain,
+                                  ldb_dn_get_linearized(dn));
+       if (referral == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       return ldb_module_send_referral(req, referral);
+}
+
 
 static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 {
@@ -3358,6 +3437,9 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
+       struct ldb_control *fix_dn_name_control = NULL;
+       struct ldb_control *fix_dn_sid_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3383,6 +3465,102 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
+       fix_dn_name_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_NAME);
+       if (fix_dn_name_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 2) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_DELETE) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].flags != LDB_FLAG_MOD_ADD) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (ldb_attr_cmp(req->op.mod.message->elements[0].name,
+                                req->op.mod.message->elements[1].name) != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format == DSDB_INVALID_DN) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               /*
+                * If we are run from dbcheck and we are not updating
+                * a link (as these would need to be sorted and so
+                * can't go via such a simple update, then do not
+                * trigger replicated updates and a new USN from this
+                * change, it wasn't a real change, just a new
+                * (correct) string DN
+                */
+
+               fix_dn_name_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3407,6 +3585,44 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       fix_dn_sid_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_SID);
+       if (fix_dn_sid_control != NULL) {
+               const struct dsdb_attribute *sa = NULL;
+
+               if (msg->num_elements != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].flags != LDB_FLAG_MOD_ADD) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].num_values != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(ac->schema,
+                               msg->elements[0].name);
+               if (sa == NULL) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format != DSDB_NORMAL_DN) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               fix_dn_sid_control->critical = false;
+               ac->fix_link_sid = true;
+
+               goto handle_linked_attribs;
+       }
+
        ldb_msg_remove_attr(msg, "whenChanged");
        ldb_msg_remove_attr(msg, "uSNChanged");
 
@@ -3416,19 +3632,10 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                                 msg, &ac->seq_num, t, is_schema_nc,
                                 &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(msg->dn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, msg->dn);
                talloc_free(ac);
                return ret;
+
        }
 
        if (ret != LDB_SUCCESS) {
@@ -3436,8 +3643,9 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return ret;
        }
 
+ handle_linked_attribs:
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
-                                                 msg, ac->seq_num, t, req);
+                                                 ac, msg, t, req);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
@@ -3688,18 +3896,7 @@ static int replmd_rename_callback(struct ldb_request *req, struct ldb_reply *are
                                 msg, &ac->seq_num, t,
                                 is_schema_nc, &is_urgent, &rodc);
        if (rodc && (ret == LDB_ERR_REFERRAL)) {
-               struct ldb_dn *olddn = ac->req->op.rename.olddn;
-               struct loadparm_context *lp_ctx;
-               char *referral;
-
-               lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
-                                        struct loadparm_context);
-
-               referral = talloc_asprintf(req,
-                                          "ldap://%s/%s",
-                                          lpcfg_dnsdomain(lp_ctx),
-                                          ldb_dn_get_linearized(olddn));
-               ret = ldb_module_send_referral(req, referral);
+               ret = send_rodc_referral(req, ldb, ac->req->op.rename.olddn);
                talloc_free(ares);
                return ldb_module_done(req, NULL, NULL, ret);
        }
@@ -3833,7 +4030,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3864,7 +4062,9 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                }
 
                ret = parsed_dn_find(ldb, link_dns, link_el->num_values,
-                                    guid, dn, data_blob_null, &p, &unused,
+                                    guid, dn,
+                                    data_blob_null, 0,
+                                    &p, &unused,
                                     target_attr->syntax->ldap_oid, false);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
@@ -3924,7 +4124,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
 {
        int ret = LDB_ERR_OTHER;
        bool retb, disallow_move_on_delete;
-       struct ldb_dn *old_dn, *new_dn;
+       struct ldb_dn *old_dn = NULL, *new_dn = NULL;
        const char *rdn_name;
        const struct ldb_val *rdn_value, *new_rdn_value;
        struct GUID guid;
@@ -3985,7 +4185,12 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                "*",
                NULL
        };
-       unsigned int i, el_count = 0;
+       static const struct ldb_val true_val = {
+               .data = discard_const_p(uint8_t, "TRUE"),
+               .length = 4
+       };
+       
+       unsigned int i;
        uint32_t dsdb_flags = 0;
        struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
@@ -4136,27 +4341,28 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
-               /* Add a formatted child */
-               retb = ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ADEL:%s",
-                                           rdn_name,
-                                           ldb_dn_escape_value(tmp_ctx, *rdn_value),
-                                           GUID_string(tmp_ctx, &guid));
-               if (!retb) {
-                       ldb_asprintf_errstring(ldb, __location__
-                                              ": Unable to add a formatted child to dn: %s",
-                                              ldb_dn_get_linearized(new_dn));
+               struct ldb_message_element *is_deleted_el;
+
+               ret = replmd_make_deleted_child_dn(tmp_ctx,
+                                                  ldb,
+                                                  new_dn,
+                                                  rdn_name, rdn_value,
+                                                  guid);
+
+               if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
 
-               ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
+               ret = ldb_msg_add_value(msg, "isDeleted", &true_val,
+                                       &is_deleted_el);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               is_deleted_el->flags = LDB_FLAG_MOD_REPLACE;
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
@@ -4196,6 +4402,10 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
             - preserved if in above list, or is rDN
          - remove all linked attribs from this object
          - remove all links from other objects to this object
+           (note we use the backlinks to do this, so we won't find one-way
+            links that still point to this object, or deactivated two-way
+            links, i.e. 'member' after the user has been removed from the
+            group)
          - add lastKnownParent
          - update replPropertyMetaData?
 
@@ -4205,6 +4415,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
+               struct ldb_message_element *p_el;
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
@@ -4248,7 +4459,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               p_el = ldb_msg_find_element(msg,
+                                           "lastKnownParent");
+               if (p_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_operr(module);
+               }
+               p_el->flags = LDB_FLAG_MOD_REPLACE;
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
@@ -4260,7 +4477,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       p_el = ldb_msg_find_element(msg,
+                                                   "msDS-LastKnownRDN");
+                       if (p_el == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_module_operr(module);
+                       }
+                       p_el->flags = LDB_FLAG_MOD_ADD;
                }
        }
 
@@ -4289,14 +4512,17 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
-                       ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
+                       struct ldb_message_element *is_recycled_el;
+
+                       ret = ldb_msg_add_value(msg, "isRecycled", &true_val,
+                                               &is_recycled_el);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+                       is_recycled_el->flags = LDB_FLAG_MOD_REPLACE;
                }
 
                replmd_private = talloc_get_type(ldb_module_get_private(module),
@@ -4314,20 +4540,30 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
-                                 we have a backlink in this object
-                                 that needs to be removed. We're not
-                                 allowed to remove it directly
-                                 however, so we instead setup a
-                                 modify to delete the corresponding
-                                 forward link
+                                * we have a backlink in this object
+                                * that needs to be removed. We're not
+                                * allowed to remove it directly
+                                * however, so we instead setup a
+                                * modify to delete the corresponding
+                                * forward link
                                 */
                                ret = replmd_delete_remove_link(module, schema,
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4340,11 +4576,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4580,14 +4821,108 @@ static bool replmd_replPropertyMetaData1_new_should_be_taken(uint32_t dsdb_repl_
 }
 
 
+/*
+  form a DN for a deleted (DEL:) or conflict (CNF:) DN
+ */
+static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
+                                      struct ldb_context *ldb,
+                                      struct ldb_dn *dn,
+                                      const char *four_char_prefix,
+                                      const char *rdn_name,
+                                      const struct ldb_val *rdn_value,
+                                      struct GUID guid)
+{
+       struct ldb_val deleted_child_rdn_val;
+       struct GUID_txt_buf guid_str;
+       int ret;
+       bool retb;
+
+       GUID_buf_string(&guid, &guid_str);
+
+       retb = ldb_dn_add_child_fmt(dn, "X=Y");
+       if (!retb) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
+       deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
+
+       /*
+        * sizeof(guid_str.buf) will always be longer than
+        * strlen(guid_str.buf) but we allocate using this and
+        * waste the trailing bytes to avoid scaring folks
+        * with memcpy() using strlen() below
+        */
+
+       deleted_child_rdn_val.data
+               = talloc_realloc(tmp_ctx, deleted_child_rdn_val.data,
+                                uint8_t,
+                                rdn_value->length + 5
+                                + sizeof(guid_str.buf));
+       if (!deleted_child_rdn_val.data) {
+               ldb_asprintf_errstring(ldb, __location__
+                                      ": Unable to add a formatted child to dn: %s",
+                                      ldb_dn_get_linearized(dn));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.length =
+               rdn_value->length + 5
+               + strlen(guid_str.buf);
+
+       SMB_ASSERT(deleted_child_rdn_val.length <
+                  talloc_get_size(deleted_child_rdn_val.data));
+
+       /*
+        * talloc won't allocate more than 256MB so we can't
+        * overflow but just to be sure
+        */
+       if (deleted_child_rdn_val.length < rdn_value->length) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       deleted_child_rdn_val.data[rdn_value->length] = 0x0a;
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 1],
+              four_char_prefix, 4);
+       memcpy(&deleted_child_rdn_val.data[rdn_value->length + 5],
+              guid_str.buf,
+              sizeof(guid_str.buf));
+
+       /* Now set the value into the RDN, without parsing it */
+       ret = ldb_dn_set_component(
+               dn,
+               0,
+               rdn_name,
+               deleted_child_rdn_val);
+
+       return ret;
+}
+
+
 /*
   form a conflict DN
  */
-static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn, struct GUID *guid)
+static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx,
+                                        struct ldb_context *ldb,
+                                        struct ldb_dn *dn,
+                                        struct GUID *guid)
 {
        const struct ldb_val *rdn_val;
        const char *rdn_name;
        struct ldb_dn *new_dn;
+       int ret;
 
        rdn_val = ldb_dn_get_rdn_val(dn);
        rdn_name = ldb_dn_get_rdn_name(dn);
@@ -4595,32 +4930,48 @@ static struct ldb_dn *replmd_conflict_dn(TALLOC_CTX *mem_ctx, struct ldb_dn *dn,
                return NULL;
        }
 
-       new_dn = ldb_dn_copy(mem_ctx, dn);
+       new_dn = ldb_dn_get_parent(mem_ctx, dn);
        if (!new_dn) {
                return NULL;
        }
 
-       if (!ldb_dn_remove_child_components(new_dn, 1)) {
-               return NULL;
-       }
-
-       if (!ldb_dn_add_child_fmt(new_dn, "%s=%s\\0ACNF:%s",
-                                 rdn_name,
-                                 ldb_dn_escape_value(new_dn, *rdn_val),
-                                 GUID_string(new_dn, guid))) {
+       ret = replmd_make_prefix_child_dn(mem_ctx,
+                                         ldb, new_dn,
+                                         "CNF:",
+                                         rdn_name,
+                                         rdn_val,
+                                         *guid);
+       if (ret != LDB_SUCCESS) {
                return NULL;
        }
-
        return new_dn;
 }
 
+/*
+  form a deleted DN
+ */
+static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
+                                       struct ldb_context *ldb,
+                                       struct ldb_dn *dn,
+                                       const char *rdn_name,
+                                       const struct ldb_val *rdn_value,
+                                       struct GUID guid)
+{
+       return replmd_make_prefix_child_dn(tmp_ctx,
+                                          ldb, dn,
+                                          "DEL:",
+                                          rdn_name,
+                                          rdn_value,
+                                          guid);
+}
+
 
 /*
   perform a modify operation which sets the rDN and name attributes to
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
-  resolution are propogated to other DCs
+  resolution are propagated to other DCs
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
@@ -4667,10 +5018,17 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       ret = dsdb_module_modify(ar->module, msg, DSDB_FLAG_OWN_MODULE, req);
-       if (ret != LDB_SUCCESS) {
-               DEBUG(0,(__location__ ": Failed to modify rDN/name of conflict DN '%s' - %s",
-                        ldb_dn_get_linearized(dn),
+       /*
+        * We have to mark this as a replicated update otherwise
+        * schema_data may reject a rename in the schema partition
+        */
+
+       ret = dsdb_module_modify(ar->module, msg,
+                                DSDB_FLAG_OWN_MODULE|DSDB_FLAG_REPLICATED_UPDATE,
+                                req);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(0,(__location__ ": Failed to modify rDN/name of DN being DRS renamed '%s' - %s",
+                        ldb_dn_get_linearized(dn),
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                return ret;
        }
@@ -4681,7 +5039,7 @@ static int replmd_name_modify(struct replmd_replicated_request *ar,
 
 failed:
        talloc_free(msg);
-       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of conflict DN '%s'",
+       DEBUG(0,(__location__ ": Failed to setup modify rDN/name of DN being DRS renamed '%s'",
                 ldb_dn_get_linearized(dn)));
        return LDB_ERR_OPERATIONS_ERROR;
 }
@@ -4818,6 +5176,7 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -4889,7 +5248,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                 ldb_dn_get_linearized(conflict_dn)));
                        goto failed;
                }
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4914,7 +5275,9 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                        goto failed;
                }
 
-               new_dn = replmd_conflict_dn(req, conflict_dn, &guid);
+               new_dn = replmd_conflict_dn(req,
+                                           ldb_module_get_ctx(ar->module),
+                                           conflict_dn, &guid);
                if (new_dn == NULL) {
                        DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                                 ldb_dn_get_linearized(conflict_dn)));
@@ -4990,6 +5353,9 @@ failed:
         * replication will stop with an error, but there is not much
         * else we can do.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
@@ -5089,16 +5455,22 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_ADD, msg);
-               DEBUG(4, ("DRS replication add message of %s:\n%s\n",
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_ADD,
+                                                          msg);
+               DEBUG(8, ("DRS replication add message of %s:\n%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+               DEBUG(4, ("DRS replication add DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
-
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
                                                     "isDeleted", false);
 
@@ -5110,7 +5482,8 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
 
        rdn_val = ldb_dn_get_rdn_val(msg->dn);
        ret = replmd_update_rpmd_rdn_attr(ldb, msg, rdn_val, NULL,
-                                    md, ar, now, is_schema_nc);
+                                         md, ar, now, is_schema_nc,
+                                         false);
        if (ret != LDB_SUCCESS) {
                ldb_asprintf_errstring(ldb, "%s: error during DRS repl ADD: %s", __func__, ldb_errstring(ldb));
                return replmd_replicated_request_error(ar, ret);
@@ -5206,7 +5579,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -5445,6 +5818,7 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -5509,7 +5883,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
 
        if (rename_incoming_record) {
 
-               new_dn = replmd_conflict_dn(msg, msg->dn,
+               new_dn = replmd_conflict_dn(msg,
+                                           ldb_module_get_ctx(ar->module),
+                                           msg->dn,
                                            &ar->objs->objects[ar->index_current].object_guid);
                if (new_dn == NULL) {
                        ldb_asprintf_errstring(ldb_module_get_ctx(ar->module),
@@ -5545,7 +5921,9 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                goto failed;
        }
 
-       new_dn = replmd_conflict_dn(tmp_ctx, conflict_dn, &guid);
+       new_dn = replmd_conflict_dn(tmp_ctx,
+                                   ldb_module_get_ctx(ar->module),
+                                   conflict_dn, &guid);
        if (new_dn == NULL) {
                DEBUG(0,(__location__ ": Failed to form conflict DN for %s\n",
                         ldb_dn_get_linearized(conflict_dn)));
@@ -5588,8 +5966,10 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
-failed:
 
+       talloc_free(tmp_ctx);
+       return ret;
+failed:
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
@@ -5597,6 +5977,9 @@ failed:
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
 
        talloc_free(tmp_ctx);
        return ret;
@@ -5660,11 +6043,12 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(5)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(5, ("Initial DRS replication modify message of %s is:\n%s\n"
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY, msg);
+               DEBUG(8, ("Initial DRS replication modify message of %s is:\n%s\n"
                          "%s\n"
                          "%s\n",
                          GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
@@ -5678,8 +6062,15 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                                                  "incoming replPropertyMetaData",
                                                  rmd)));
                talloc_free(s);
-       }
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
 
+               DEBUG(4, ("Initial DRS replication modify DN of %s is: %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
+       }
+               
        local_isDeleted = ldb_msg_find_attr_as_bool(ar->search_msg,
                                                    "isDeleted", false);
        remote_isDeleted = ldb_msg_find_attr_as_bool(msg,
@@ -5840,7 +6231,8 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
 
        if (renamed) {
                ret = replmd_update_rpmd_rdn_attr(ldb, msg, new_rdn, old_rdn,
-                                                 &nmd, ar, now, is_schema_nc);
+                                                 &nmd, ar, now, is_schema_nc,
+                                                 false);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, "%s: error during DRS repl merge: %s", __func__, ldb_errstring(ldb));
                        return replmd_replicated_request_error(ar, ret);
@@ -5935,14 +6327,24 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       if (DEBUGLVL(4)) {
+       if (DEBUGLVL(8)) {
                struct GUID_txt_buf guid_txt;
 
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
-               DEBUG(4, ("Final DRS replication modify message of %s:\n%s\n",
-                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid, &guid_txt),
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
+               DEBUG(8, ("Final DRS replication modify message of %s:\n%s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
                          s));
                talloc_free(s);
+       } else if (DEBUGLVL(4)) {
+               struct GUID_txt_buf guid_txt;
+
+               DEBUG(4, ("Final DRS replication modify DN of %s is %s\n",
+                         GUID_buf_string(&ar->objs->objects[ar->index_current].object_guid,
+                                         &guid_txt),
+                         ldb_dn_get_linearized(msg->dn)));
        }
 
        ret = ldb_build_mod_req(&change_req,
@@ -6150,6 +6552,115 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
        return LDB_SUCCESS;
 }
 
+/**
+ * Returns true if we can group together processing this link attribute,
+ * i.e. it has the same source-object and attribute ID as other links
+ * already in the group
+ */
+static bool la_entry_matches_group(struct la_entry *la_entry,
+                                  struct la_group *la_group)
+{
+       struct la_entry *prev = la_group->la_entries;
+
+       return (la_entry->la->attid == prev->la->attid &&
+               GUID_equal(&la_entry->la->identifier->guid,
+                          &prev->la->identifier->guid));
+}
+
+/**
+ * Stores the linked attributes received in the replication chunk - these get
+ * applied at the end of the transaction. We also check that each linked
+ * attribute is valid, i.e. source and target objects are known.
+ */
+static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
+{
+       int ret = LDB_SUCCESS;
+       uint32_t i;
+       struct ldb_module *module = ar->module;
+       struct replmd_private *replmd_private =
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct la_group *la_group = NULL;
+       struct ldb_context *ldb;
+       TALLOC_CTX *tmp_ctx = NULL;
+       struct ldb_message *src_msg = NULL;
+       const struct dsdb_attribute *attr = NULL;
+
+       ldb = ldb_module_get_ctx(module);
+
+       DEBUG(4,("linked_attributes_count=%u\n", ar->objs->linked_attributes_count));
+
+       /* save away the linked attributes for the end of the transaction */
+       for (i = 0; i < ar->objs->linked_attributes_count; i++) {
+               struct la_entry *la_entry;
+
+               tmp_ctx = talloc_new(ar);
+
+               if (replmd_private->la_ctx == NULL) {
+                       replmd_private->la_ctx = talloc_new(replmd_private);
+               }
+               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
+               if (la_entry == NULL) {
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
+               if (la_entry->la == NULL) {
+                       talloc_free(la_entry);
+                       ldb_oom(ldb);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               *la_entry->la = ar->objs->linked_attributes[i];
+               la_entry->dsdb_repl_flags = ar->objs->dsdb_repl_flags;
+
+               /* we need to steal the non-scalars so they stay
+                  around until the end of the transaction */
+               talloc_steal(la_entry->la, la_entry->la->identifier);
+               talloc_steal(la_entry->la, la_entry->la->value.blob);
+
+               /* verify the source object exists for the link */
+               ret = replmd_get_la_entry_source(module, la_entry, tmp_ctx,
+                                                &attr, &src_msg);
+
+               /*
+                * When we fail to find the source object, the error
+                * code we pass back here is really important. It flags
+                * back to the callers to retry this request with
+                * DRSUAPI_DRS_GET_ANC. This case should never happen
+                * if we're replicating from a Samba DC, but it is
+                * needed to talk to a Windows DC
+                */
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       WERROR err = WERR_DS_DRA_MISSING_PARENT;
+                       ret = replmd_replicated_request_werror(ar, err);
+                       break;
+               }
+
+               ret = replmd_verify_link_target(ar, tmp_ctx, la_entry,
+                                               src_msg, attr);
+               if (ret != LDB_SUCCESS) {
+                       break;
+               }
+
+               /* group the links together by source-object for efficiency */
+               if (la_group == NULL ||
+                   !la_entry_matches_group(la_entry, la_group)) {
+
+                       la_group = talloc_zero(replmd_private->la_ctx,
+                                              struct la_group);
+                       if (la_group == NULL) {
+                               ldb_oom(ldb);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       DLIST_ADD(replmd_private->la_list, la_group);
+               }
+               DLIST_ADD(la_group->la_entries, la_entry);
+               replmd_private->total_links++;
+               TALLOC_FREE(tmp_ctx);
+       }
+
+       return ret;
+}
+
 static int replmd_replicated_uptodate_vector(struct replmd_replicated_request *ar);
 
 static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
@@ -6166,7 +6677,18 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
        struct GUID_txt_buf guid_str_buf;
 
        if (ar->index_current >= ar->objs->num_objects) {
-               /* done with it, go to next stage */
+
+               /*
+                * Now that we've applied all the objects, check the new linked
+                * attributes and store them (we apply them in .prepare_commit)
+                */
+               ret = replmd_store_linked_attributes(ar);
+
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+
+               /* done applying objects, move on to the next stage */
                return replmd_replicated_uptodate_vector(ar);
        }
 
@@ -6538,7 +7060,9 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
        nrf_el->flags = LDB_FLAG_MOD_REPLACE;
 
        if (CHECK_DEBUGLVL(4)) {
-               char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
+               char *s = ldb_ldif_message_redacted_string(ldb, ar,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg);
                DEBUG(4, ("DRS replication uptodate modify message:\n%s\n", s));
                talloc_free(s);
        }
@@ -6644,9 +7168,6 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        struct replmd_replicated_request *ar;
        struct ldb_control **ctrls;
        int ret;
-       uint32_t i;
-       struct replmd_private *replmd_private =
-               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
        ldb = ldb_module_get_ctx(module);
 
@@ -6703,72 +7224,251 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        ar->controls = req->controls;
        req->controls = ctrls;
 
-       DEBUG(4,("linked_attributes_count=%u\n", objs->linked_attributes_count));
+       return replmd_replicated_apply_next(ar);
+}
+
+/**
+ * Checks how to handle an missing target - either we need to fail the
+ * replication and retry with GET_TGT, ignore the link and continue, or try to
+ * add a partial link to an unknown target.
+ */
+static int replmd_allow_missing_target(struct ldb_module *module,
+                                      TALLOC_CTX *mem_ctx,
+                                      struct ldb_dn *target_dn,
+                                      struct ldb_dn *source_dn,
+                                      bool is_obj_commit,
+                                      struct GUID *guid,
+                                      uint32_t dsdb_repl_flags,
+                                      bool *ignore_link,
+                                      const char * missing_str)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       bool is_in_same_nc;
 
-       /* save away the linked attributes for the end of the
-          transaction */
-       for (i=0; i<ar->objs->linked_attributes_count; i++) {
-               struct la_entry *la_entry;
+       /*
+        * we may not be able to resolve link targets properly when
+        * dealing with subsets of objects, e.g. the source is a
+        * critical object and the target isn't
+        *
+        * TODO:
+        * When we implement Trusted Domains we need to consider
+        * whether they get treated as an incomplete replica here or not
+        */
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_OBJECT_SUBSET) {
 
-               if (replmd_private->la_ctx == NULL) {
-                       replmd_private->la_ctx = talloc_new(replmd_private);
-               }
-               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
-               if (la_entry == NULL) {
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
-               if (la_entry->la == NULL) {
-                       talloc_free(la_entry);
-                       ldb_oom(ldb);
-                       return LDB_ERR_OPERATIONS_ERROR;
-               }
-               *la_entry->la = ar->objs->linked_attributes[i];
+               /*
+                * Ignore the link. We don't increase the highwater-mark in
+                * the object subset cases, so subsequent replications should
+                * resolve any missing links
+                */
+               DEBUG(2, ("%s target %s linked from %s\n", missing_str,
+                         ldb_dn_get_linearized(target_dn),
+                         ldb_dn_get_linearized(source_dn)));
+               *ignore_link = true;
+               return LDB_SUCCESS;
+       }
 
-               /* we need to steal the non-scalars so they stay
-                  around until the end of the transaction */
-               talloc_steal(la_entry->la, la_entry->la->identifier);
-               talloc_steal(la_entry->la, la_entry->la->value.blob);
+       if (dsdb_repl_flags & DSDB_REPL_FLAG_TARGETS_UPTODATE) {
 
-               DLIST_ADD(replmd_private->la_list, la_entry);
+               /*
+                * target should already be up-to-date so there's no point in
+                * retrying. This could be due to bad timing, or if a target
+                * on a one-way link was deleted. We ignore the link rather
+                * than failing the replication cycle completely
+                */
+               *ignore_link = true;
+               DBG_WARNING("%s is %s but up to date. Ignoring link from %s\n",
+                           ldb_dn_get_linearized(target_dn), missing_str,
+                           ldb_dn_get_linearized(source_dn));
+               return LDB_SUCCESS;
+       }
+       
+       is_in_same_nc = dsdb_objects_have_same_nc(ldb,
+                                                 mem_ctx,
+                                                 source_dn,
+                                                 target_dn);
+       if (is_in_same_nc) {
+               /* fail the replication and retry with GET_TGT */
+               ldb_asprintf_errstring(ldb, "%s target %s GUID %s linked from %s\n",
+                                      missing_str,
+                                      ldb_dn_get_linearized(target_dn),
+                                      GUID_string(mem_ctx, guid),
+                                      ldb_dn_get_linearized(source_dn));
+               return LDB_ERR_NO_SUCH_OBJECT;
        }
 
-       return replmd_replicated_apply_next(ar);
+       /*
+        * The target of the cross-partition link is missing. Continue
+        * and try to at least add the forward-link. This isn't great,
+        * but a partial link can be fixed by dbcheck, so it's better
+        * than dropping the link completely.
+        */
+       *ignore_link = false;
+
+       if (is_obj_commit) {
+
+               /*
+                * Only log this when we're actually committing the objects.
+                * This avoids spurious logs, i.e. if we're just verifying the
+                * received link during a join.
+                */
+               DBG_WARNING("%s cross-partition target %s linked from %s\n",
+                           missing_str, ldb_dn_get_linearized(target_dn),
+                           ldb_dn_get_linearized(source_dn));
+       }
+       
+       return LDB_SUCCESS;
 }
 
-/*
-  process one linked attribute structure
+/**
+ * Checks that the target object for a linked attribute exists.
+ * @param guid returns the target object's GUID (is returned)if it exists)
+ * @param ignore_link set to true if the linked attribute should be ignored
+ * (i.e. the target doesn't exist, but that it's OK to skip the link)
  */
-static int replmd_process_linked_attribute(struct ldb_module *module,
-                                          struct replmd_private *replmd_private,
-                                          struct la_entry *la_entry,
-                                          struct ldb_request *parent)
+static int replmd_check_target_exists(struct ldb_module *module,
+                                     struct dsdb_dn *dsdb_dn,
+                                     struct la_entry *la_entry,
+                                     struct ldb_dn *source_dn,
+                                     bool is_obj_commit,
+                                     struct GUID *guid,
+                                     bool *ignore_link)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       struct ldb_message *target_msg = NULL;
+       struct ldb_result *target_res;
        TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
-       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       const char *attrs[] = { "isDeleted", "isRecycled", NULL };
+       NTSTATUS ntstatus;
+       int ret;
+       enum deletion_state target_deletion_state = OBJECT_REMOVED;
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE) ? true : false;
+
+       *ignore_link = false;
+       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, guid, "GUID");
+
+       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+
+               /*
+                * This strange behaviour (allowing a NULL/missing
+                * GUID) originally comes from:
+                *
+                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
+                * Author: Andrew Tridgell <tridge@samba.org>
+                * Date:   Mon Dec 21 21:21:55 2009 +1100
+                *
+                *  s4-drs: cope better with NULL GUIDS from DRS
+                *
+                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
+                *  need to match by DN if possible when seeing if we should update an
+                *  existing link.
+                *
+                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                */
+               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
+                                           dsdb_dn->dn, attrs,
+                                           DSDB_FLAG_NEXT_MODULE |
+                                           DSDB_SEARCH_SHOW_RECYCLED |
+                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                           NULL);
+       } else if (!NT_STATUS_IS_OK(ntstatus)) {
+               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute 0x%x blob for %s from %s",
+                                      la->attid,
+                                      ldb_dn_get_linearized(dsdb_dn->dn),
+                                      ldb_dn_get_linearized(source_dn));
+               talloc_free(tmp_ctx);
+               return LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               ret = dsdb_module_search(module, tmp_ctx, &target_res,
+                                        NULL, LDB_SCOPE_SUBTREE,
+                                        attrs,
+                                        DSDB_FLAG_NEXT_MODULE |
+                                        DSDB_SEARCH_SHOW_RECYCLED |
+                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
+                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
+                                        NULL,
+                                        "objectGUID=%s",
+                                        GUID_string(tmp_ctx, guid));
+       }
+
+       if (ret != LDB_SUCCESS) {
+               ldb_asprintf_errstring(ldb, "Failed to re-resolve GUID %s: %s\n",
+                                      GUID_string(tmp_ctx, guid),
+                                      ldb_errstring(ldb));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (target_res->count == 0) {
+
+               /*
+                * target object is unknown. Check whether to ignore the link,
+                * fail the replication, or add a partial link
+                */
+               ret = replmd_allow_missing_target(module, tmp_ctx, dsdb_dn->dn,
+                                                 source_dn, is_obj_commit, guid,
+                                                 la_entry->dsdb_repl_flags,
+                                                 ignore_link, "Unknown");
+
+       } else if (target_res->count != 1) {
+               ldb_asprintf_errstring(ldb, "More than one object found matching objectGUID %s\n",
+                                      GUID_string(tmp_ctx, guid));
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       } else {
+               struct ldb_message *target_msg = target_res->msgs[0];
+
+               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
+
+               /* Get the object's state (i.e. Not Deleted, Tombstone, etc) */
+               replmd_deletion_state(module, target_msg,
+                                     &target_deletion_state, NULL);
+
+               /*
+                * Check for deleted objects as per MS-DRSR 4.1.10.6.14
+                * ProcessLinkValue(). Link updates should not be sent for
+                * recycled and tombstone objects (deleting the links should
+                * happen when we delete the object). This probably means our
+                * copy of the target object isn't up to date.
+                */
+               if (target_deletion_state >= OBJECT_RECYCLED) {
+
+                       /*
+                        * target object is deleted. Check whether to ignore the
+                        * link, fail the replication, or add a partial link
+                        */
+                       ret = replmd_allow_missing_target(module, tmp_ctx,
+                                                         dsdb_dn->dn, source_dn,
+                                                         is_obj_commit, guid,
+                                                         la_entry->dsdb_repl_flags,
+                                                         ignore_link, "Deleted");
+               }
+       }
+
+       talloc_free(tmp_ctx);
+       return ret;
+}
+
+/**
+ * Extracts the key details about the source object for a
+ * linked-attribute entry.
+ * This returns the following details:
+ * @param ret_attr the schema details for the linked attribute
+ * @param source_msg the search result for the source object
+ */
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
        int ret;
        const struct dsdb_attribute *attr;
-       struct dsdb_dn *dsdb_dn;
-       uint64_t seq_num = 0;
-       struct ldb_message_element *old_el;
-       WERROR status;
-       time_t t = time(NULL);
        struct ldb_result *res;
-       struct ldb_result *target_res;
        const char *attrs[4];
-       const char *attrs2[] = { "isDeleted", "isRecycled", NULL };
-       struct parsed_dn *pdn_list, *pdn, *next;
-       struct GUID guid = GUID_zero();
-       NTSTATUS ntstatus;
-       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
-
-       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
-       enum deletion_state target_deletion_state = OBJECT_NOT_DELETED;
 
 /*
 linked_attributes[0]:
@@ -6813,256 +7513,483 @@ linked_attributes[0]:
                                       la->attid,
                                       GUID_buf_string(&la->identifier->guid,
                                                       &guid_str));
-               talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       /*
+        * All attributes listed here must be dealt with in some way
+        * by replmd_process_linked_attribute() otherwise in the case
+        * of isDeleted: FALSE the modify will fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
        attrs[3] = NULL;
 
-       /* get the existing message from the db for the object with
-          this GUID, returning attribute being modified. We will then
-          use this msg as the basis for a modify call */
-       ret = dsdb_module_search(module, tmp_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
+       /*
+        * get the existing message from the db for the object with
+        * this GUID, returning attribute being modified. We will then
+        * use this msg as the basis for a modify call
+        */
+       ret = dsdb_module_search(module, mem_ctx, &res, NULL, LDB_SCOPE_SUBTREE, attrs,
                                 DSDB_FLAG_NEXT_MODULE |
                                 DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
                                 DSDB_SEARCH_SHOW_RECYCLED |
                                 DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT |
                                 DSDB_SEARCH_REVEAL_INTERNALS,
-                                parent,
-                                "objectGUID=%s", GUID_string(tmp_ctx, &la->identifier->guid));
+                                NULL,
+                                "objectGUID=%s", GUID_string(mem_ctx, &la->identifier->guid));
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
        if (res->count != 1) {
                ldb_asprintf_errstring(ldb, "DRS linked attribute for GUID %s - DN not found",
-                                      GUID_string(tmp_ctx, &la->identifier->guid));
-               talloc_free(tmp_ctx);
+                                      GUID_string(mem_ctx, &la->identifier->guid));
                return LDB_ERR_NO_SUCH_OBJECT;
        }
-       msg = res->msgs[0];
 
-       /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
-        */
-
-       replmd_deletion_state(module, msg, &deletion_state, NULL);
+       *source_msg = res->msgs[0];
+       *ret_attr = attr;
 
-       if (deletion_state >= OBJECT_RECYCLED) {
-               talloc_free(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
+/**
+ * Verifies the target object is known for a linked attribute
+ */
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_message *src_msg,
+                                    const struct dsdb_attribute *attr)
+{
+       int ret = LDB_SUCCESS;
+       struct ldb_module *module = ar->module;
+       struct dsdb_dn *tgt_dsdb_dn = NULL;
+       struct GUID guid = GUID_zero();
+       bool dummy;
+       WERROR status;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
+
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &tgt_dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(src_msg->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /*
+        * We can skip the target object checks if we're only syncing critical
+        * objects, or we know the target is up-to-date. If either case, we
+        * still continue even if the target doesn't exist
+        */
+       if ((la_entry->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                         DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la_entry,
+                                                src_msg->dn, false, &guid,
+                                                &dummy);
+       }
+
+       /*
+        * When we fail to find the target object, the error code we pass
+        * back here is really important. It flags back to the callers to
+        * retry this request with DRSUAPI_DRS_GET_TGT
+        */
+       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_RECYCLED_TARGET);
+       }
+
+       return ret;
+}
+
+/**
+ * Finds the current active Parsed-DN value for a single-valued linked
+ * attribute, if one exists.
+ * @param ret_pdn assigned the active Parsed-DN, or NULL if none was found
+ * @returns LDB_SUCCESS (regardless of whether a match was found), unless
+ * an error occurred
+ */
+static int replmd_get_active_singleval_link(struct ldb_module *module,
+                                           TALLOC_CTX *mem_ctx,
+                                           struct parsed_dn pdn_list[],
+                                           unsigned int count,
+                                           const struct dsdb_attribute *attr,
+                                           struct parsed_dn **ret_pdn)
+{
+       unsigned int i;
+
+       *ret_pdn = NULL;
+
+       if (!(attr->ldb_schema_attribute->flags & LDB_ATTR_FLAG_SINGLE_VALUE)) {
+
+               /* nothing to do for multi-valued linked attributes */
                return LDB_SUCCESS;
        }
 
-       old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
-       if (old_el == NULL) {
-               ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
+       for (i = 0; i < count; i++) {
+               int ret = LDB_SUCCESS;
+               struct parsed_dn *pdn = &pdn_list[i];
+
+               /* skip any inactive links */
+               if (dsdb_dn_is_deleted_val(pdn->v)) {
+                       continue;
+               }
+
+               /* we've found an active value for this attribute */
+               *ret_pdn = pdn;
+
+               if (pdn->dsdb_dn == NULL) {
+                       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+                       ret = really_parse_trusted_dn(mem_ctx, ldb, pdn,
+                                                     attr->syntax->ldap_oid);
+               }
+
+               return ret;
+       }
+
+       /* no active link found */
+       return LDB_SUCCESS;
+}
+
+/**
+ * @returns true if the replication linked attribute info is newer than we
+ * already have in our DB
+ * @param pdn the existing linked attribute info in our DB
+ * @param la the new linked attribute info received during replication
+ */
+static bool replmd_link_update_is_newer(struct parsed_dn *pdn,
+                                       struct drsuapi_DsReplicaLinkedAttribute *la)
+{
+       /* see if this update is newer than what we have already */
+       struct GUID invocation_id = GUID_zero();
+       uint32_t version = 0;
+       NTTIME change_time = 0;
+
+       if (pdn == NULL) {
+
+               /* no existing info so update is newer */
+               return true;
+       }
+
+       dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
+       dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
+       dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
+
+       return replmd_update_is_newer(&invocation_id,
+                                     &la->meta_data.originating_invocation_id,
+                                     version,
+                                     la->meta_data.version,
+                                     change_time,
+                                     la->meta_data.originating_change_time);
+}
+
+/**
+ * Marks an existing linked attribute value as deleted in the DB
+ * @param pdn the parsed-DN of the target-value to delete
+ */
+static int replmd_delete_link_value(struct ldb_module *module,
+                                   struct replmd_private *replmd_private,
+                                   TALLOC_CTX *mem_ctx,
+                                   struct ldb_dn *src_obj_dn,
+                                   const struct dsdb_schema *schema,
+                                   const struct dsdb_attribute *attr,
+                                   uint64_t seq_num,
+                                   bool is_active,
+                                   struct GUID *target_guid,
+                                   struct dsdb_dn *target_dsdb_dn,
+                                   struct ldb_val *output_val)
+{
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       time_t t;
+       NTTIME now;
+       const struct GUID *invocation_id = NULL;
+       int ret;
+
+       t = time(NULL);
+       unix_to_nt_time(&now, t);
+
+       invocation_id = samdb_ntds_invocation_id(ldb);
+       if (invocation_id == NULL) {
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* if the existing link is active, remove its backlink */
+       if (is_active) {
+
+               /*
+                * NOTE WELL: After this we will never (at runtime) be
+                * able to find this forward link (for instant
+                * removal) if/when the link target is deleted.
+                *
+                * We have dbcheck rules to cover this and cope otherwise
+                * by filtering at runtime (i.e. in the extended_dn module).
+                */
+               ret = replmd_add_backlink(module, replmd_private, schema,
+                                         src_obj_dn, target_guid, false,
+                                         attr, NULL);
                if (ret != LDB_SUCCESS) {
-                       ldb_module_oom(module);
-                       talloc_free(tmp_ctx);
-                       return LDB_ERR_OPERATIONS_ERROR;
+                       return ret;
                }
-       } else {
-               old_el->flags = LDB_FLAG_MOD_REPLACE;
        }
 
-       /* parse the existing links */
-       ret = get_parsed_dns_trusted(module, replmd_private, tmp_ctx, old_el, &pdn_list,
-                                    attr->syntax->ldap_oid, parent);
+       /* mark the existing value as deleted */
+       ret = replmd_update_la_val(mem_ctx, output_val, target_dsdb_dn,
+                                  target_dsdb_dn, invocation_id, seq_num,
+                                  seq_num, now, true);
+       return ret;
+}
+
+/**
+ * Checks for a conflict in single-valued link attributes, and tries to
+ * resolve the problem if possible.
+ *
+ * Single-valued links should only ever have one active value. If we already
+ * have an active link value, and during replication we receive an active link
+ * value for a different target DN, then we need to resolve this inconsistency
+ * and determine which value should be active. If the received info is better/
+ * newer than the existing link attribute, then we need to set our existing
+ * link as deleted. If the received info is worse/older, then we should continue
+ * to add it, but set it as an inactive link.
+ *
+ * Note that this is a corner-case that is unlikely to happen (but if it does
+ * happen, we don't want it to break replication completely).
+ *
+ * @param pdn_being_modified the parsed DN corresponding to the received link
+ * target (note this is NULL if the link does not already exist in our DB)
+ * @param pdn_list all the source object's Parsed-DNs for this attribute, i.e.
+ * any existing active or inactive values for the attribute in our DB.
+ * @param dsdb_dn the target DN for the received link attribute
+ * @param add_as_inactive gets set to true if the received link is worse than
+ * the existing link - it should still be added, but as an inactive link.
+ */
+static int replmd_check_singleval_la_conflict(struct ldb_module *module,
+                                             struct replmd_private *replmd_private,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct ldb_dn *src_obj_dn,
+                                             struct drsuapi_DsReplicaLinkedAttribute *la,
+                                             struct dsdb_dn *dsdb_dn,
+                                             struct parsed_dn *pdn_being_modified,
+                                             struct parsed_dn *pdn_list,
+                                             struct ldb_message_element *old_el,
+                                             const struct dsdb_schema *schema,
+                                             const struct dsdb_attribute *attr,
+                                             uint64_t seq_num,
+                                             bool *add_as_inactive)
+{
+       struct parsed_dn *active_pdn = NULL;
+       bool update_is_newer = false;
+       int ret;
+
+       /*
+        * check if there's a conflict for single-valued links, i.e. an active
+        * linked attribute already exists, but it has a different target value
+        */
+       ret = replmd_get_active_singleval_link(module, mem_ctx, pdn_list,
+                                              old_el->num_values, attr,
+                                              &active_pdn);
 
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
-       status = dsdb_dn_la_from_blob(ldb, attr, schema, tmp_ctx, la->value.blob, &dsdb_dn);
-       if (!W_ERROR_IS_OK(status)) {
-               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
-                                      old_el->name, ldb_dn_get_linearized(msg->dn), win_errstr(status));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
+       /*
+        * If no active value exists (or the received info is for the currently
+        * active value), then no conflict exists
+        */
+       if (active_pdn == NULL || active_pdn == pdn_being_modified) {
+               return LDB_SUCCESS;
        }
 
-       ntstatus = dsdb_get_extended_dn_guid(dsdb_dn->dn, &guid, "GUID");
-       if (!NT_STATUS_IS_OK(ntstatus) && !active) {
+       DBG_WARNING("Link conflict for %s attribute on %s\n",
+                   attr->lDAPDisplayName, ldb_dn_get_linearized(src_obj_dn));
+
+       /* Work out how to resolve the conflict based on which info is better */
+       update_is_newer = replmd_link_update_is_newer(active_pdn, la);
+
+       if (update_is_newer) {
+               DBG_WARNING("Using received value %s, over existing target %s\n",
+                           ldb_dn_get_linearized(dsdb_dn->dn),
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn));
+
                /*
-                * This strange behaviour (allowing a NULL/missing
-                * GUID) originally comes from:
-                *
-                * commit e3054ce0fe0f8f62d2f5b2a77893e7a1479128bd
-                * Author: Andrew Tridgell <tridge@samba.org>
-                * Date:   Mon Dec 21 21:21:55 2009 +1100
-                *
-                *  s4-drs: cope better with NULL GUIDS from DRS
-                *
-                *  It is valid to get a NULL GUID over DRS for a deleted forward link. We
-                *  need to match by DN if possible when seeing if we should update an
-                *  existing link.
-                *
-                *  Pair-Programmed-With: Andrew Bartlett <abartlet@samba.org>
+                * Delete our existing active link. The received info will then
+                * be added (through normal link processing) as the active value
                 */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              src_obj_dn, schema, attr,
+                                              seq_num, true, &active_pdn->guid,
+                                              active_pdn->dsdb_dn,
+                                              active_pdn->v);
 
-               ret = dsdb_module_search_dn(module, tmp_ctx, &target_res,
-                                           dsdb_dn->dn, attrs2,
-                                           DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_RECYCLED |
-                                           DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                           DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                           parent);
-       } else if (!NT_STATUS_IS_OK(ntstatus)) {
-               ldb_asprintf_errstring(ldb, "Failed to find GUID in linked attribute blob for %s on %s from %s",
-                                      old_el->name,
-                                      ldb_dn_get_linearized(dsdb_dn->dn),
-                                      ldb_dn_get_linearized(msg->dn));
-               talloc_free(tmp_ctx);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+       } else {
+               DBG_WARNING("Using existing target %s, over received value %s\n",
+                           ldb_dn_get_linearized(active_pdn->dsdb_dn->dn),
+                           ldb_dn_get_linearized(dsdb_dn->dn));
+
+               /*
+                * we want to keep our existing active link and add the
+                * received link as inactive
+                */
+               *add_as_inactive = true;
+       }
+
+       return LDB_SUCCESS;
+}
+
+/*
+  process one linked attribute structure
+ */
+static int replmd_process_linked_attribute(struct ldb_module *module,
+                                          TALLOC_CTX *mem_ctx,
+                                          struct replmd_private *replmd_private,
+                                          struct ldb_message *msg,
+                                          const struct dsdb_attribute *attr,
+                                          struct la_entry *la_entry,
+                                          struct ldb_request *parent)
+{
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
+       int ret;
+       struct dsdb_dn *dsdb_dn = NULL;
+       uint64_t seq_num = 0;
+       struct ldb_message_element *old_el;
+       time_t t = time(NULL);
+       struct parsed_dn *pdn_list, *pdn, *next;
+       struct GUID guid = GUID_zero();
+       bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
+       bool ignore_link;
+       struct dsdb_dn *old_dsdb_dn = NULL;
+       struct ldb_val *val_to_update = NULL;
+       bool add_as_inactive = false;
+       WERROR status;
+
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(msg->dn),
+                                      win_errstr(status));
                return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
+       if (old_el == NULL) {
+               ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
+               if (ret != LDB_SUCCESS) {
+                       ldb_module_oom(module);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        } else {
-               ret = dsdb_module_search(module, tmp_ctx, &target_res,
-                                        NULL, LDB_SCOPE_SUBTREE,
-                                        attrs2,
-                                        DSDB_FLAG_NEXT_MODULE |
-                                        DSDB_SEARCH_SHOW_RECYCLED |
-                                        DSDB_SEARCH_SEARCH_ALL_PARTITIONS |
-                                        DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
-                                        parent,
-                                        "objectGUID=%s",
-                                        GUID_string(tmp_ctx, &guid));
+               old_el->flags = LDB_FLAG_MOD_REPLACE;
        }
 
+       /* parse the existing links */
+       ret = get_parsed_dns_trusted(module, replmd_private, mem_ctx, old_el, &pdn_list,
+                                    attr->syntax->ldap_oid, parent);
+
        if (ret != LDB_SUCCESS) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "Failed to re-resolve GUID %s: %s\n",
-                                      GUID_string(tmp_ctx, &guid),
-                                      ldb_errstring(ldb_module_get_ctx(module)));
-               talloc_free(tmp_ctx);
                return ret;
        }
 
-       if (target_res->count == 0) {
-               DEBUG(2,(__location__ ": WARNING: Failed to re-resolve GUID %s - using %s\n",
-                        GUID_string(tmp_ctx, &guid),
-                        ldb_dn_get_linearized(dsdb_dn->dn)));
-       } else if (target_res->count != 1) {
-               ldb_asprintf_errstring(ldb_module_get_ctx(module), "More than one object found matching objectGUID %s\n",
-                                      GUID_string(tmp_ctx, &guid));
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       } else {
-               target_msg = target_res->msgs[0];
-               dsdb_dn->dn = talloc_steal(dsdb_dn, target_msg->dn);
+       ret = replmd_check_target_exists(module, dsdb_dn, la_entry, msg->dn,
+                                        true, &guid, &ignore_link);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
        }
 
        /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.13
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
+        * there are some cases where the target object doesn't exist, but it's
+        * OK to ignore the linked attribute
         */
-       replmd_deletion_state(module, target_msg,
-                             &target_deletion_state, NULL);
-
-       if (target_deletion_state >= OBJECT_RECYCLED) {
-               talloc_free(tmp_ctx);
-               return LDB_SUCCESS;
+       if (ignore_link) {
+               return ret;
        }
 
        /* see if this link already exists */
        ret = parsed_dn_find(ldb, pdn_list, old_el->num_values,
                             &guid,
                             dsdb_dn->dn,
-                            dsdb_dn->extra_part,
+                            dsdb_dn->extra_part, 0,
                             &pdn, &next,
                             attr->syntax->ldap_oid,
                             true);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
+       if (!replmd_link_update_is_newer(pdn, la)) {
+               DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
+                        old_el->name, ldb_dn_get_linearized(msg->dn),
+                        GUID_string(mem_ctx, &la->meta_data.originating_invocation_id)));
+               return LDB_SUCCESS;
+       }
 
-       if (pdn != NULL) {
-               /* see if this update is newer than what we have already */
-               struct GUID invocation_id = GUID_zero();
-               uint32_t version = 0;
-               uint32_t originating_usn = 0;
-               NTTIME change_time = 0;
-               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
-
-               dsdb_get_extended_dn_guid(pdn->dsdb_dn->dn, &invocation_id, "RMD_INVOCID");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &version, "RMD_VERSION");
-               dsdb_get_extended_dn_uint32(pdn->dsdb_dn->dn, &originating_usn, "RMD_ORIGINATING_USN");
-               dsdb_get_extended_dn_nttime(pdn->dsdb_dn->dn, &change_time, "RMD_CHANGETIME");
-
-               if (!replmd_update_is_newer(&invocation_id,
-                                           &la->meta_data.originating_invocation_id,
-                                           version,
-                                           la->meta_data.version,
-                                           change_time,
-                                           la->meta_data.originating_change_time)) {
-                       DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
-                                old_el->name, ldb_dn_get_linearized(msg->dn),
-                                GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-                       talloc_free(tmp_ctx);
-                       return LDB_SUCCESS;
-               }
+       /* get a seq_num for this change */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
 
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       /*
+        * check for single-valued link conflicts, i.e. an active linked
+        * attribute already exists, but it has a different target value
+        */
+       if (active) {
+               ret = replmd_check_singleval_la_conflict(module, replmd_private,
+                                                        mem_ctx, msg->dn, la,
+                                                        dsdb_dn, pdn, pdn_list,
+                                                        old_el, schema, attr,
+                                                        seq_num,
+                                                        &add_as_inactive);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
+       }
+
+       if (pdn != NULL) {
+               uint32_t rmd_flags = dsdb_dn_rmd_flags(pdn->dsdb_dn->dn);
 
                if (!(rmd_flags & DSDB_RMD_FLAG_DELETED)) {
                        /* remove the existing backlink */
                        ret = replmd_add_backlink(module, replmd_private,
                                                  schema, 
                                                  msg->dn,
-                                                 &guid, false, attr,
+                                                 &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
                                return ret;
                        }
                }
 
-               ret = replmd_update_la_val(tmp_ctx, pdn->v, dsdb_dn, pdn->dsdb_dn,
-                                          &la->meta_data.originating_invocation_id,
-                                          la->meta_data.originating_usn, seq_num,
-                                          la->meta_data.originating_change_time,
-                                          la->meta_data.version,
-                                          !active);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+               val_to_update = pdn->v;
+               old_dsdb_dn = pdn->dsdb_dn;
 
-               if (active) {
-                       /* add the new backlink */
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
-               }
        } else {
                unsigned offset;
-               /* get a seq_num for this change */
-               ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-               if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
+
                /*
                 * We know where the new one needs to be, from the *next
                 * pointer into pdn_list.
@@ -7071,7 +7998,7 @@ linked_attributes[0]:
                        offset = old_el->num_values;
                } else {
                        if (next->dsdb_dn == NULL) {
-                               ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
+                               ret = really_parse_trusted_dn(mem_ctx, ldb, next,
                                                              attr->syntax->ldap_oid);
                                if (ret != LDB_SUCCESS) {
                                        return ret;
@@ -7079,7 +8006,6 @@ linked_attributes[0]:
                        }
                        offset = next - pdn_list;
                        if (offset > old_el->num_values) {
-                               talloc_free(tmp_ctx);
                                return LDB_ERR_OPERATIONS_ERROR;
                        }
                }
@@ -7098,71 +8024,74 @@ linked_attributes[0]:
 
                old_el->num_values++;
 
-               ret = replmd_build_la_val(tmp_ctx, &old_el->values[offset], dsdb_dn,
-                                         &la->meta_data.originating_invocation_id,
-                                         la->meta_data.originating_usn, seq_num,
-                                         la->meta_data.originating_change_time,
-                                         la->meta_data.version,
-                                         !active);
+               val_to_update = &old_el->values[offset];
+               old_dsdb_dn = NULL;
+       }
+
+       /* set the link attribute's value to the info that was received */
+       ret = replmd_set_la_val(mem_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+                               &la->meta_data.originating_invocation_id,
+                               la->meta_data.originating_usn, seq_num,
+                               la->meta_data.originating_change_time,
+                               la->meta_data.version,
+                               !active);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       if (add_as_inactive) {
+
+               /* Set the new link as inactive/deleted to avoid conflicts */
+               ret = replmd_delete_link_value(module, replmd_private, old_el,
+                                              msg->dn, schema, attr, seq_num,
+                                              false, &guid, dsdb_dn,
+                                              val_to_update);
+
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
 
-               if (active) {
-                       ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, 
-                                                 msg->dn,
-                                                 &guid, true, attr,
-                                                 parent);
-                       if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
-                               return ret;
-                       }
+       } else if (active) {
+
+               /* if the new link is active, then add the new backlink */
+               ret = replmd_add_backlink(module, replmd_private,
+                                         schema,
+                                         msg->dn,
+                                         &guid, true, attr,
+                                         parent);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
        }
 
        /* we only change whenChanged and uSNChanged if the seq_num
           has changed */
+       ldb_msg_remove_attr(msg, "whenChanged");
+       ldb_msg_remove_attr(msg, "uSNChanged");
        ret = add_time_element(msg, "whenChanged", t);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        ret = add_uint64_element(ldb, msg, "uSNChanged", seq_num);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
-               talloc_free(tmp_ctx);
                return ldb_operr(ldb);
        }
 
        ret = dsdb_check_single_valued_link(attr, old_el);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
        old_el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
 
-       ret = linked_attr_modify(module, msg, parent);
-       if (ret != LDB_SUCCESS) {
-               ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s'\n%s\n",
-                         ldb_errstring(ldb),
-                         ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       talloc_free(tmp_ctx);
-
        return ret;
 }
 
@@ -7203,6 +8132,106 @@ static int replmd_start_transaction(struct ldb_module *module)
        return ldb_next_start_trans(module);
 }
 
+/**
+ * Processes a group of linked attributes that apply to the same source-object
+ * and attribute-ID
+ */
+static int replmd_process_la_group(struct ldb_module *module,
+                                  struct replmd_private *replmd_private,
+                                  struct la_group *la_group)
+{
+       struct la_entry *la = NULL;
+       struct la_entry *prev = NULL;
+       int ret;
+       TALLOC_CTX *tmp_ctx = talloc_new(la_group);
+       struct la_entry *first_la = DLIST_TAIL(la_group->la_entries);
+       struct ldb_message *msg = NULL;
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_attribute *attr = NULL;
+
+       /*
+        * get the attribute being modified and the search result for the
+        * source object
+        */
+       ret = replmd_get_la_entry_source(module, first_la, tmp_ctx, &attr,
+                                        &msg);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.14
+        * ProcessLinkValue, because link updates are not applied to
+        * recycled and tombstone objects.  We don't have to delete
+        * any existing link, that should have happened when the
+        * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
+        */
+       replmd_deletion_state(module, msg, &deletion_state, NULL);
+
+       if (deletion_state >= OBJECT_RECYCLED) {
+               TALLOC_FREE(tmp_ctx);
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
+       /* go through and process the link targets for this source object */
+       for (la = DLIST_TAIL(la_group->la_entries); la; la=prev) {
+               prev = DLIST_PREV(la);
+               DLIST_REMOVE(la_group->la_entries, la);
+               ret = replmd_process_linked_attribute(module, tmp_ctx,
+                                                     replmd_private,
+                                                     msg, attr, la, NULL);
+               if (ret != LDB_SUCCESS) {
+                       replmd_txn_cleanup(replmd_private);
+                       return ret;
+               }
+
+               if ((++replmd_private->num_processed % 8192) == 0) {
+                       DBG_NOTICE("Processed %u/%u linked attributes\n",
+                                  replmd_private->num_processed,
+                                  replmd_private->total_links);
+               }
+       }
+
+       /* apply the link changes to the source object */
+       ret = linked_attr_modify(module, msg, NULL);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         "Failed to apply linked attribute change '%s'\n%s\n",
+                         ldb_errstring(ldb),
+                         ldb_ldif_message_redacted_string(ldb,
+                                                          tmp_ctx,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg));
+               TALLOC_FREE(tmp_ctx);
+               return ret;
+       }
+
+       TALLOC_FREE(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
 /*
   on prepare commit we loop over our queued la_context structures and
   apply each of them
@@ -7211,21 +8240,31 @@ static int replmd_prepare_commit(struct ldb_module *module)
 {
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-       struct la_entry *la, *prev;
+       struct la_group *la_group, *prev;
        int ret;
 
+       if (replmd_private->la_list != NULL) {
+               DBG_NOTICE("Processing linked attributes\n");
+       }
+
        /*
         * Walk the list of linked attributes from DRS replication.
         *
         * We walk backwards, to do the first entry first, as we
         * added the entries with DLIST_ADD() which puts them at the
         * start of the list
+        *
+        * Links are grouped together so we process links for the same
+        * source object in one go.
         */
-       for (la = DLIST_TAIL(replmd_private->la_list); la; la=prev) {
-               prev = DLIST_PREV(la);
-               DLIST_REMOVE(replmd_private->la_list, la);
-               ret = replmd_process_linked_attribute(module, replmd_private,
-                                                     la, NULL);
+       for (la_group = DLIST_TAIL(replmd_private->la_list);
+            la_group != NULL;
+            la_group = prev) {
+
+               prev = DLIST_PREV(la_group);
+               DLIST_REMOVE(replmd_private->la_list, la_group);
+               ret = replmd_process_la_group(module, replmd_private,
+                                             la_group);
                if (ret != LDB_SUCCESS) {
                        replmd_txn_cleanup(replmd_private);
                        return ret;