replmd: Split up replmd_verify_linked_attribute() into src/target checks
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index c7aacd83615c5a3d26eda63a68e0dcab38334724..5bf819382f225dbd6d94df567922f5773247a8eb 100644 (file)
@@ -65,7 +65,7 @@ static const NTTIME DELETED_OBJECT_CONTAINER_CHANGE_TIME = 2650466015990000000UL
 
 struct replmd_private {
        TALLOC_CTX *la_ctx;
-       struct la_entry *la_list;
+       struct la_group *la_list;
        struct nc_entry {
                struct nc_entry *prev, *next;
                struct ldb_dn *dn;
@@ -75,6 +75,22 @@ struct replmd_private {
        struct ldb_dn *schema_dn;
        bool originating_updates;
        bool sorted_links;
+       uint32_t total_links;
+       uint32_t num_processed;
+};
+
+/*
+ * groups link attributes together by source-object and attribute-ID,
+ * to improve processing efficiency (i.e. for 'member' attribute, which
+ * could have 100s or 1000s of links).
+ * Note this grouping is best effort - the same source object could still
+ * correspond to several la_groups (a lot depends on the order DRS sends
+ * the links in). The groups currently don't span replication chunks (which
+ * caps the size to ~1500 links by default).
+ */
+struct la_group {
+       struct la_group *next, *prev;
+       struct la_entry *la_entries;
 };
 
 struct la_entry {
@@ -112,6 +128,8 @@ struct replmd_replicated_request {
        bool is_urgent;
 
        bool isDeleted;
+
+       bool fix_link_sid;
 };
 
 static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar);
@@ -120,8 +138,16 @@ static int replmd_check_upgrade_links(struct ldb_context *ldb,
                                      struct parsed_dn *dns, uint32_t count,
                                      struct ldb_message_element *el,
                                      const char *ldap_oid);
-static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
-                                         struct la_entry *la);
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_message *src_msg,
+                                    const struct dsdb_attribute *attr);
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg);
 static int replmd_set_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct dsdb_dn *dsdb_dn,
                             struct dsdb_dn *old_dsdb_dn, const struct GUID *invocation_id,
                             uint64_t usn, uint64_t local_usn, NTTIME nttime,
@@ -2039,8 +2065,12 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
-                               ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
-                                                      ldb_dn_get_linearized(dn));
+                               char *dn_str = NULL;
+                               dn_str = ldb_dn_get_extended_linearized(mem_ctx,
+                                                                       (dn), 1);
+                               ldb_asprintf_errstring(ldb,
+                                               "Unable to find GUID for DN %s\n",
+                                               dn_str);
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
@@ -2398,12 +2428,11 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
  */
 static int replmd_modify_la_add(struct ldb_module *module,
                                struct replmd_private *replmd_private,
-                               const struct dsdb_schema *schema,
+                               struct replmd_replicated_request *ac,
                                struct ldb_message *msg,
                                struct ldb_message_element *el,
                                struct ldb_message_element *old_el,
                                const struct dsdb_attribute *schema_attr,
-                               uint64_t seq_num,
                                time_t t,
                                struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
@@ -2416,17 +2445,10 @@ static int replmd_modify_la_add(struct ldb_module *module,
        unsigned old_num_values = old_el ? old_el->num_values : 0;
        unsigned num_values = 0;
        unsigned max_num_values;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /* get the DNs to be added, fully parsed.
         *
         * We need full parsing because they came off the wire and we don't
@@ -2453,7 +2475,7 @@ static int replmd_modify_la_add(struct ldb_module *module,
        max_num_values = old_num_values + el->num_values;
        if (max_num_values < old_num_values) {
                DEBUG(0, ("we seem to have overflow in replmd_modify_la_add. "
-                         "old values: %u, new values: %u, sum: %u",
+                         "old values: %u, new values: %u, sum: %u\n",
                          old_num_values, el->num_values, max_num_values));
                talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
@@ -2489,6 +2511,109 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        return err;
                }
 
+               if (ac->fix_link_sid) {
+                       char *fixed_dnstring = NULL;
+                       struct dom_sid tmp_sid = { 0, };
+                       DATA_BLOB sid_blob = data_blob_null;
+                       enum ndr_err_code ndr_err;
+                       NTSTATUS status;
+                       int num;
+
+                       if (exact == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       if (dns[i].dsdb_dn->dn_format != DSDB_NORMAL_DN) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       /*
+                        * Only "<GUID=...><SID=...>" is allowed.
+                        *
+                        * We get the GUID to just to find the old
+                        * value and the SID in order to add it
+                        * to the found value.
+                        */
+
+                       num = ldb_dn_get_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 0) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       num = ldb_dn_get_extended_comp_num(dns[i].dsdb_dn->dn);
+                       if (num != 2) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(exact->dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
+                               /* this is what we expect */
+                       } else if (NT_STATUS_IS_OK(status)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+                                                      "i[%u] SID NOT MISSING... Attribute %s already "
+                                                      "exists for target GUID %s, SID %s, DN: %s",
+                                                      i, el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str),
+                                                      dom_sid_string(tmp_ctx, &tmp_sid),
+                                                      dsdb_dn_get_extended_linearized(tmp_ctx,
+                                                              exact->dsdb_dn, 1));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       } else {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       status = dsdb_get_extended_dn_sid(dns[i].dsdb_dn->dn,
+                                                         &tmp_sid, "SID");
+                       if (!NT_STATUS_IS_OK(status)) {
+                               struct GUID_txt_buf guid_str;
+                               ldb_asprintf_errstring(ldb,
+                                                      "NO SID PROVIDED... Attribute %s already "
+                                                      "exists for target GUID %s",
+                                                      el->name,
+                                                      GUID_buf_string(&exact->guid,
+                                                                      &guid_str));
+                               talloc_free(tmp_ctx);
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+
+                       ndr_err = ndr_push_struct_blob(&sid_blob, tmp_ctx, &tmp_sid,
+                                                      (ndr_push_flags_fn_t)ndr_push_dom_sid);
+                       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       ret = ldb_dn_set_extended_component(exact->dsdb_dn->dn, "SID", &sid_blob);
+                       data_blob_free(&sid_blob);
+                       if (ret != LDB_SUCCESS) {
+                               talloc_free(tmp_ctx);
+                               return ret;
+                       }
+
+                       fixed_dnstring = dsdb_dn_get_extended_linearized(
+                                       new_values, exact->dsdb_dn, 1);
+                       if (fixed_dnstring == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_operr(ldb);
+                       }
+
+                       /*
+                        * We just replace the existing value...
+                        */
+                       *exact->v = data_blob_string_const(fixed_dnstring);
+
+                       continue;
+               }
+
                if (exact != NULL) {
                        /*
                         * We are trying to add one that exists, which is only
@@ -2522,15 +2647,16 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, exact->v,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, false);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
 
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &dns[i].guid, 
                                                  true,
@@ -2572,14 +2698,14 @@ static int replmd_modify_la_add(struct ldb_module *module,
                }
 
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &dns[i].guid,
                                          true, schema_attr,
                                          parent);
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
-                                         dns[i].dsdb_dn, invocation_id,
-                                         seq_num, now);
+                                         dns[i].dsdb_dn, &ac->our_invocation_id,
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2618,12 +2744,11 @@ static int replmd_modify_la_add(struct ldb_module *module,
  */
 static int replmd_modify_la_delete(struct ldb_module *module,
                                   struct replmd_private *replmd_private,
-                                  const struct dsdb_schema *schema,
+                                  struct replmd_replicated_request *ac,
                                   struct ldb_message *msg,
                                   struct ldb_message_element *el,
                                   struct ldb_message_element *old_el,
                                   const struct dsdb_attribute *schema_attr,
-                                  uint64_t seq_num,
                                   time_t t,
                                   struct ldb_dn *msg_dn,
                                   struct ldb_request *parent)
@@ -2637,16 +2762,10 @@ static int replmd_modify_la_delete(struct ldb_module *module,
        bool vanish_links = false;
        unsigned int num_to_delete = el->num_values;
        uint32_t rmd_flags;
-       const struct GUID *invocation_id;
        NTTIME now;
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        if (old_el == NULL || old_el->num_values == 0) {
                /* there is nothing to delete... */
                if (num_to_delete == 0) {
@@ -2710,7 +2829,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                                }
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema, msg_dn, &p->guid,
+                                                 ac->schema, msg_dn, &p->guid,
                                                  false, schema_attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
@@ -2728,8 +2847,9 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                        ret = replmd_update_la_val(old_el->values, p->v,
                                                   p->dsdb_dn, p->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, true);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, true);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
@@ -2791,7 +2911,7 @@ static int replmd_modify_la_delete(struct ldb_module *module,
                        /* remove the backlink */
                        ret = replmd_add_backlink(module,
                                                  replmd_private,
-                                                 schema, 
+                                                 ac->schema,
                                                  msg_dn,
                                                  &p->guid,
                                                  false, schema_attr,
@@ -2825,14 +2945,15 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
                ret = replmd_update_la_val(old_el->values, exact->v,
                                           exact->dsdb_dn, exact->dsdb_dn,
-                                          invocation_id, seq_num, seq_num,
+                                          &ac->our_invocation_id,
+                                          ac->seq_num, ac->seq_num,
                                           now, true);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                }
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &p->guid,
                                          false, schema_attr,
                                          parent);
@@ -2844,11 +2965,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
@@ -2870,12 +3003,11 @@ static int replmd_modify_la_delete(struct ldb_module *module,
  */
 static int replmd_modify_la_replace(struct ldb_module *module,
                                    struct replmd_private *replmd_private,
-                                   const struct dsdb_schema *schema,
+                                   struct replmd_replicated_request *ac,
                                    struct ldb_message *msg,
                                    struct ldb_message_element *el,
                                    struct ldb_message_element *old_el,
                                    const struct dsdb_attribute *schema_attr,
-                                   uint64_t seq_num,
                                    time_t t,
                                    struct ldb_dn *msg_dn,
                                    struct ldb_request *parent)
@@ -2884,7 +3016,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
        struct parsed_dn *dns, *old_dns;
        TALLOC_CTX *tmp_ctx = talloc_new(msg);
        int ret;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_val *new_values = NULL;
        const char *ldap_oid = schema_attr->syntax->ldap_oid;
@@ -2895,11 +3026,6 @@ static int replmd_modify_la_replace(struct ldb_module *module,
 
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /*
         * The replace operation is unlike the replace and delete cases in that
         * we need to look at every existing link to see whether it is being
@@ -2999,8 +3125,8 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                ret = replmd_update_la_val(new_values, old_p->v,
                                                           old_p->dsdb_dn,
                                                           old_p->dsdb_dn,
-                                                          invocation_id,
-                                                          seq_num, seq_num,
+                                                          &ac->our_invocation_id,
+                                                          ac->seq_num, ac->seq_num,
                                                           now, true);
                                if (ret != LDB_SUCCESS) {
                                        talloc_free(tmp_ctx);
@@ -3008,7 +3134,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                                }
 
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &old_p->guid, false,
                                                          schema_attr,
@@ -3033,8 +3159,8 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, old_p->v,
                                                   new_p->dsdb_dn,
                                                   old_p->dsdb_dn,
-                                                  invocation_id,
-                                                  seq_num, seq_num,
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
                                                   now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
@@ -3044,7 +3170,7 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        rmd_flags = dsdb_dn_rmd_flags(old_p->dsdb_dn->dn);
                        if ((rmd_flags & DSDB_RMD_FLAG_DELETED) != 0) {
                                ret = replmd_add_backlink(module, replmd_private,
-                                                         schema, 
+                                                         ac->schema,
                                                          msg_dn,
                                                          &new_p->guid, true,
                                                          schema_attr,
@@ -3066,14 +3192,14 @@ static int replmd_modify_la_replace(struct ldb_module *module,
                        ret = replmd_build_la_val(new_values,
                                                  new_p->v,
                                                  new_p->dsdb_dn,
-                                                 invocation_id,
-                                                 seq_num, now);
+                                                 &ac->our_invocation_id,
+                                                 ac->seq_num, now);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &new_p->guid, true,
                                                  schema_attr,
@@ -3104,8 +3230,9 @@ static int replmd_modify_la_replace(struct ldb_module *module,
  */
 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                                               struct replmd_private *replmd_private,
+                                              struct replmd_replicated_request *ac,
                                               struct ldb_message *msg,
-                                              uint64_t seq_num, time_t t,
+                                              time_t t,
                                               struct ldb_request *parent)
 {
        struct ldb_result *res;
@@ -3114,8 +3241,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *old_msg;
 
-       const struct dsdb_schema *schema;
-
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
                 * Nothing special is required for modifying or vanishing links
@@ -3149,10 +3274,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       schema = dsdb_get_schema(ldb, res);
-       if (!schema) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
        old_msg = res->msgs[0];
 
@@ -3161,7 +3282,7 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                struct ldb_message_element *old_el, *new_el;
                unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
-                       = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
+                       = dsdb_attribute_by_lDAPDisplayName(ac->schema, el->name);
                if (!schema_attr) {
                        ldb_asprintf_errstring(ldb,
                                               "%s: attribute %s is not a valid attribute in schema",
@@ -3172,9 +3293,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
@@ -3184,22 +3318,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
-                                                      schema, msg, el, old_el,
-                                                      schema_attr, seq_num, t,
+                                                      ac, msg, el, old_el,
+                                                      schema_attr, t,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
-                                                     schema, msg, el, old_el,
-                                                     schema_attr, seq_num, t,
+                                                     ac, msg, el, old_el,
+                                                     schema_attr, t,
                                                      old_msg->dn,
                                                      parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
-                                                  schema, msg, el, old_el,
-                                                  schema_attr, seq_num, t,
+                                                  ac, msg, el, old_el,
+                                                  schema_attr, t,
                                                   old_msg->dn,
                                                   parent);
                        break;
@@ -3304,6 +3438,8 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
        struct ldb_control *fix_links_control = NULL;
+       struct ldb_control *fix_dn_name_control = NULL;
+       struct ldb_control *fix_dn_sid_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3362,6 +3498,69 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return ldb_next_request(module, req);
        }
 
+       fix_dn_name_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_NAME);
+       if (fix_dn_name_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 2) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_DELETE) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].flags != LDB_FLAG_MOD_ADD) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (ldb_attr_cmp(req->op.mod.message->elements[0].name,
+                                req->op.mod.message->elements[1].name) != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format == DSDB_INVALID_DN) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               /*
+                * If we are run from dbcheck and we are not updating
+                * a link (as these would need to be sorted and so
+                * can't go via such a simple update, then do not
+                * trigger replicated updates and a new USN from this
+                * change, it wasn't a real change, just a new
+                * (correct) string DN
+                */
+
+               fix_dn_name_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3386,6 +3585,44 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       fix_dn_sid_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_SID);
+       if (fix_dn_sid_control != NULL) {
+               const struct dsdb_attribute *sa = NULL;
+
+               if (msg->num_elements != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].flags != LDB_FLAG_MOD_ADD) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (msg->elements[0].num_values != 1) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(ac->schema,
+                               msg->elements[0].name);
+               if (sa == NULL) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format != DSDB_NORMAL_DN) {
+                       talloc_free(ac);
+                       return ldb_module_operr(module);
+               }
+
+               fix_dn_sid_control->critical = false;
+               ac->fix_link_sid = true;
+
+               goto handle_linked_attribs;
+       }
+
        ldb_msg_remove_attr(msg, "whenChanged");
        ldb_msg_remove_attr(msg, "uSNChanged");
 
@@ -3406,8 +3643,9 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                return ret;
        }
 
+ handle_linked_attribs:
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
-                                                 msg, ac->seq_num, t, req);
+                                                 ac, msg, t, req);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
@@ -3792,7 +4030,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3946,7 +4185,12 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                "*",
                NULL
        };
-       unsigned int i, el_count = 0;
+       static const struct ldb_val true_val = {
+               .data = discard_const_p(uint8_t, "TRUE"),
+               .length = 4
+       };
+       
+       unsigned int i;
        uint32_t dsdb_flags = 0;
        struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
@@ -4097,6 +4341,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
+               struct ldb_message_element *is_deleted_el;
 
                ret = replmd_make_deleted_child_dn(tmp_ctx,
                                                   ldb,
@@ -4109,14 +4354,15 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        return ret;
                }
 
-               ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
+               ret = ldb_msg_add_value(msg, "isDeleted", &true_val,
+                                       &is_deleted_el);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               is_deleted_el->flags = LDB_FLAG_MOD_REPLACE;
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
@@ -4156,6 +4402,10 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
             - preserved if in above list, or is rDN
          - remove all linked attribs from this object
          - remove all links from other objects to this object
+           (note we use the backlinks to do this, so we won't find one-way
+            links that still point to this object, or deactivated two-way
+            links, i.e. 'member' after the user has been removed from the
+            group)
          - add lastKnownParent
          - update replPropertyMetaData?
 
@@ -4165,6 +4415,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
+               struct ldb_message_element *p_el;
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
@@ -4208,7 +4459,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               p_el = ldb_msg_find_element(msg,
+                                           "lastKnownParent");
+               if (p_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_operr(module);
+               }
+               p_el->flags = LDB_FLAG_MOD_REPLACE;
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
@@ -4220,7 +4477,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       p_el = ldb_msg_find_element(msg,
+                                                   "msDS-LastKnownRDN");
+                       if (p_el == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_module_operr(module);
+                       }
+                       p_el->flags = LDB_FLAG_MOD_ADD;
                }
        }
 
@@ -4249,14 +4512,17 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
-                       ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
+                       struct ldb_message_element *is_recycled_el;
+
+                       ret = ldb_msg_add_value(msg, "isRecycled", &true_val,
+                                               &is_recycled_el);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+                       is_recycled_el->flags = LDB_FLAG_MOD_REPLACE;
                }
 
                replmd_private = talloc_get_type(ldb_module_get_private(module),
@@ -4274,20 +4540,30 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
-                                 we have a backlink in this object
-                                 that needs to be removed. We're not
-                                 allowed to remove it directly
-                                 however, so we instead setup a
-                                 modify to delete the corresponding
-                                 forward link
+                                * we have a backlink in this object
+                                * that needs to be removed. We're not
+                                * allowed to remove it directly
+                                * however, so we instead setup a
+                                * modify to delete the corresponding
+                                * forward link
                                 */
                                ret = replmd_delete_remove_link(module, schema,
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4300,11 +4576,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4553,6 +4834,7 @@ static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
 {
        struct ldb_val deleted_child_rdn_val;
        struct GUID_txt_buf guid_str;
+       int ret;
        bool retb;
 
        GUID_buf_string(&guid, &guid_str);
@@ -4619,10 +4901,13 @@ static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
               sizeof(guid_str.buf));
 
        /* Now set the value into the RDN, without parsing it */
-       ldb_dn_set_component(dn, 0, rdn_name,
-                            deleted_child_rdn_val);
+       ret = ldb_dn_set_component(
+               dn,
+               0,
+               rdn_name,
+               deleted_child_rdn_val);
 
-       return LDB_SUCCESS;
+       return ret;
 }
 
 
@@ -4686,7 +4971,7 @@ static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
-  resolution are propogated to other DCs
+  resolution are propagated to other DCs
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
@@ -4891,6 +5176,7 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -5067,6 +5353,9 @@ failed:
         * replication will stop with an error, but there is not much
         * else we can do.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
@@ -5529,6 +5818,7 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
@@ -5676,8 +5966,10 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
-failed:
 
+       talloc_free(tmp_ctx);
+       return ret;
+failed:
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
@@ -5685,6 +5977,9 @@ failed:
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
 
        talloc_free(tmp_ctx);
        return ret;
@@ -6257,6 +6552,21 @@ static int replmd_replicated_apply_search_callback(struct ldb_request *req,
        return LDB_SUCCESS;
 }
 
+/**
+ * Returns true if we can group together processing this link attribute,
+ * i.e. it has the same source-object and attribute ID as other links
+ * already in the group
+ */
+static bool la_entry_matches_group(struct la_entry *la_entry,
+                                  struct la_group *la_group)
+{
+       struct la_entry *prev = la_group->la_entries;
+
+       return (la_entry->la->attid == prev->la->attid &&
+               GUID_equal(&la_entry->la->identifier->guid,
+                          &prev->la->identifier->guid));
+}
+
 /**
  * Stores the linked attributes received in the replication chunk - these get
  * applied at the end of the transaction. We also check that each linked
@@ -6269,7 +6579,11 @@ static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
        struct ldb_module *module = ar->module;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct la_group *la_group = NULL;
        struct ldb_context *ldb;
+       TALLOC_CTX *tmp_ctx = NULL;
+       struct ldb_message *src_msg = NULL;
+       const struct dsdb_attribute *attr = NULL;
 
        ldb = ldb_module_get_ctx(module);
 
@@ -6279,6 +6593,8 @@ static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
        for (i = 0; i < ar->objs->linked_attributes_count; i++) {
                struct la_entry *la_entry;
 
+               tmp_ctx = talloc_new(ar);
+
                if (replmd_private->la_ctx == NULL) {
                        replmd_private->la_ctx = talloc_new(replmd_private);
                }
@@ -6301,13 +6617,45 @@ static int replmd_store_linked_attributes(struct replmd_replicated_request *ar)
                talloc_steal(la_entry->la, la_entry->la->identifier);
                talloc_steal(la_entry->la, la_entry->la->value.blob);
 
-               ret = replmd_verify_linked_attribute(ar, la_entry);
+               /* verify the source object exists for the link */
+               ret = replmd_get_la_entry_source(module, la_entry, tmp_ctx,
+                                                &attr, &src_msg);
+
+               /*
+                * When we fail to find the source object, the error
+                * code we pass back here is really important. It flags
+                * back to the callers to retry this request with
+                * DRSUAPI_DRS_GET_ANC. This case should never happen
+                * if we're replicating from a Samba DC, but it is
+                * needed to talk to a Windows DC
+                */
+               if (ret == LDB_ERR_NO_SUCH_OBJECT) {
+                       WERROR err = WERR_DS_DRA_MISSING_PARENT;
+                       ret = replmd_replicated_request_werror(ar, err);
+                       break;
+               }
 
+               ret = replmd_verify_link_target(ar, tmp_ctx, la_entry,
+                                               src_msg, attr);
                if (ret != LDB_SUCCESS) {
                        break;
                }
 
-               DLIST_ADD(replmd_private->la_list, la_entry);
+               /* group the links together by source-object for efficiency */
+               if (la_group == NULL ||
+                   !la_entry_matches_group(la_entry, la_group)) {
+
+                       la_group = talloc_zero(replmd_private->la_ctx,
+                                              struct la_group);
+                       if (la_group == NULL) {
+                               ldb_oom(ldb);
+                               return LDB_ERR_OPERATIONS_ERROR;
+                       }
+                       DLIST_ADD(replmd_private->la_list, la_group);
+               }
+               DLIST_ADD(la_group->la_entries, la_entry);
+               replmd_private->total_links++;
+               TALLOC_FREE(tmp_ctx);
        }
 
        return ret;
@@ -7102,26 +7450,23 @@ static int replmd_check_target_exists(struct ldb_module *module,
 }
 
 /**
- * Extracts the key details about the source/target object for a
+ * Extracts the key details about the source object for a
  * linked-attribute entry.
  * This returns the following details:
  * @param ret_attr the schema details for the linked attribute
  * @param source_msg the search result for the source object
- * @param target_dsdb_dn the unpacked DN info for the target object
  */
-static int replmd_extract_la_entry_details(struct ldb_module *module,
-                                          struct la_entry *la_entry,
-                                          TALLOC_CTX *mem_ctx,
-                                          const struct dsdb_attribute **ret_attr,
-                                          struct ldb_message **source_msg,
-                                          struct dsdb_dn **target_dsdb_dn)
+static int replmd_get_la_entry_source(struct ldb_module *module,
+                                     struct la_entry *la_entry,
+                                     TALLOC_CTX *mem_ctx,
+                                     const struct dsdb_attribute **ret_attr,
+                                     struct ldb_message **source_msg)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
        int ret;
        const struct dsdb_attribute *attr;
-       WERROR status;
        struct ldb_result *res;
        const char *attrs[4];
 
@@ -7171,6 +7516,19 @@ linked_attributes[0]:
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       /*
+        * All attributes listed here must be dealt with in some way
+        * by replmd_process_linked_attribute() otherwise in the case
+        * of isDeleted: FALSE the modify will fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
@@ -7199,54 +7557,39 @@ linked_attributes[0]:
        }
 
        *source_msg = res->msgs[0];
-
-       /* the value blob for the attribute holds the target object DN */
-       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx, la->value.blob, target_dsdb_dn);
-       if (!W_ERROR_IS_OK(status)) {
-               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
-                                      attr->lDAPDisplayName,
-                                      ldb_dn_get_linearized(res->msgs[0]->dn),
-                                      win_errstr(status));
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        *ret_attr = attr;
 
        return LDB_SUCCESS;
 }
 
 /**
- * Verifies the source and target objects are known for a linked attribute
+ * Verifies the target object is known for a linked attribute
  */
-static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
-                                         struct la_entry *la)
+static int replmd_verify_link_target(struct replmd_replicated_request *ar,
+                                    TALLOC_CTX *mem_ctx,
+                                    struct la_entry *la_entry,
+                                    struct ldb_message *src_msg,
+                                    const struct dsdb_attribute *attr)
 {
        int ret = LDB_SUCCESS;
-       TALLOC_CTX *tmp_ctx = talloc_new(la);
        struct ldb_module *module = ar->module;
-       struct ldb_message *src_msg;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *tgt_dsdb_dn;
+       struct dsdb_dn *tgt_dsdb_dn = NULL;
        struct GUID guid = GUID_zero();
        bool dummy;
+       WERROR status;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
 
-       ret = replmd_extract_la_entry_details(module, la, tmp_ctx, &attr,
-                                             &src_msg, &tgt_dsdb_dn);
-
-       /*
-        * When we fail to find the source object, the error code we pass
-        * back here is really important. It flags back to the callers to
-        * retry this request with DRSUAPI_DRS_GET_ANC. This case should
-        * never happen if we're replicating from a Samba DC, but it is
-        * needed to talk to a Windows DC
-        */
-       if (ret == LDB_ERR_NO_SUCH_OBJECT) {
-               ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_MISSING_PARENT);
-       }
-
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &tgt_dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(src_msg->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /*
@@ -7254,10 +7597,10 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
         * objects, or we know the target is up-to-date. If either case, we
         * still continue even if the target doesn't exist
         */
-       if ((la->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
-                                   DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
+       if ((la_entry->dsdb_repl_flags & (DSDB_REPL_FLAG_OBJECT_SUBSET |
+                                         DSDB_REPL_FLAG_TARGETS_UPTODATE)) == 0) {
 
-               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la,
+               ret = replmd_check_target_exists(module, tgt_dsdb_dn, la_entry,
                                                 src_msg->dn, false, &guid,
                                                 &dummy);
        }
@@ -7271,7 +7614,6 @@ static int replmd_verify_linked_attribute(struct replmd_replicated_request *ar,
                ret = replmd_replicated_request_werror(ar, WERR_DS_DRA_RECYCLED_TARGET);
        }
 
-       talloc_free(tmp_ctx);
        return ret;
 }
 
@@ -7390,6 +7732,14 @@ static int replmd_delete_link_value(struct ldb_module *module,
        /* if the existing link is active, remove its backlink */
        if (is_active) {
 
+               /*
+                * NOTE WELL: After this we will never (at runtime) be
+                * able to find this forward link (for instant
+                * removal) if/when the link target is deleted.
+                *
+                * We have dbcheck rules to cover this and cope otherwise
+                * by filtering at runtime (i.e. in the extended_dn module).
+                */
                ret = replmd_add_backlink(module, replmd_private, schema,
                                          src_obj_dn, target_guid, false,
                                          attr, NULL);
@@ -7509,18 +7859,18 @@ static int replmd_check_singleval_la_conflict(struct ldb_module *module,
   process one linked attribute structure
  */
 static int replmd_process_linked_attribute(struct ldb_module *module,
+                                          TALLOC_CTX *mem_ctx,
                                           struct replmd_private *replmd_private,
+                                          struct ldb_message *msg,
+                                          const struct dsdb_attribute *attr,
                                           struct la_entry *la_entry,
                                           struct ldb_request *parent)
 {
        struct drsuapi_DsReplicaLinkedAttribute *la = la_entry->la;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
-       struct ldb_message *msg;
-       TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
-       const struct dsdb_schema *schema = dsdb_get_schema(ldb, tmp_ctx);
+       const struct dsdb_schema *schema = dsdb_get_schema(ldb, mem_ctx);
        int ret;
-       const struct dsdb_attribute *attr;
-       struct dsdb_dn *dsdb_dn;
+       struct dsdb_dn *dsdb_dn = NULL;
        uint64_t seq_num = 0;
        struct ldb_message_element *old_el;
        time_t t = time(NULL);
@@ -7528,35 +7878,20 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        struct GUID guid = GUID_zero();
        bool active = (la->flags & DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE)?true:false;
        bool ignore_link;
-       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
        struct dsdb_dn *old_dsdb_dn = NULL;
        struct ldb_val *val_to_update = NULL;
        bool add_as_inactive = false;
+       WERROR status;
 
-       /*
-        * get the attribute being modified, the search result for the source object,
-        * and the target object's DN details
-        */
-       ret = replmd_extract_la_entry_details(module, la_entry, tmp_ctx, &attr,
-                                             &msg, &dsdb_dn);
-
-       if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       /*
-        * Check for deleted objects per MS-DRSR 4.1.10.6.14
-        * ProcessLinkValue, because link updates are not applied to
-        * recycled and tombstone objects.  We don't have to delete
-        * any existing link, that should have happened when the
-        * object deletion was replicated or initiated.
-        */
-       replmd_deletion_state(module, msg, &deletion_state, NULL);
-
-       if (deletion_state >= OBJECT_RECYCLED) {
-               talloc_free(tmp_ctx);
-               return LDB_SUCCESS;
+       /* the value blob for the attribute holds the target object DN */
+       status = dsdb_dn_la_from_blob(ldb, attr, schema, mem_ctx,
+                                     la->value.blob, &dsdb_dn);
+       if (!W_ERROR_IS_OK(status)) {
+               ldb_asprintf_errstring(ldb, "Failed to parsed linked attribute blob for %s on %s - %s\n",
+                                      attr->lDAPDisplayName,
+                                      ldb_dn_get_linearized(msg->dn),
+                                      win_errstr(status));
+               return LDB_ERR_OPERATIONS_ERROR;
        }
 
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
@@ -7564,7 +7899,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
                if (ret != LDB_SUCCESS) {
                        ldb_module_oom(module);
-                       talloc_free(tmp_ctx);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
        } else {
@@ -7572,11 +7906,10 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        /* parse the existing links */
-       ret = get_parsed_dns_trusted(module, replmd_private, tmp_ctx, old_el, &pdn_list,
+       ret = get_parsed_dns_trusted(module, replmd_private, mem_ctx, old_el, &pdn_list,
                                     attr->syntax->ldap_oid, parent);
 
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7584,7 +7917,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                         true, &guid, &ignore_link);
 
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7593,7 +7925,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         * OK to ignore the linked attribute
         */
        if (ignore_link) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7606,22 +7937,19 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                             attr->syntax->ldap_oid,
                             true);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
        if (!replmd_link_update_is_newer(pdn, la)) {
                DEBUG(3,("Discarding older DRS linked attribute update to %s on %s from %s\n",
                         old_el->name, ldb_dn_get_linearized(msg->dn),
-                        GUID_string(tmp_ctx, &la->meta_data.originating_invocation_id)));
-               talloc_free(tmp_ctx);
+                        GUID_string(mem_ctx, &la->meta_data.originating_invocation_id)));
                return LDB_SUCCESS;
        }
 
        /* get a seq_num for this change */
        ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7631,13 +7959,12 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         */
        if (active) {
                ret = replmd_check_singleval_la_conflict(module, replmd_private,
-                                                        tmp_ctx, msg->dn, la,
+                                                        mem_ctx, msg->dn, la,
                                                         dsdb_dn, pdn, pdn_list,
                                                         old_el, schema, attr,
                                                         seq_num,
                                                         &add_as_inactive);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
        }
@@ -7653,7 +7980,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                                  &pdn->guid, false, attr,
                                                  parent);
                        if (ret != LDB_SUCCESS) {
-                               talloc_free(tmp_ctx);
                                return ret;
                        }
                }
@@ -7672,7 +7998,7 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        offset = old_el->num_values;
                } else {
                        if (next->dsdb_dn == NULL) {
-                               ret = really_parse_trusted_dn(tmp_ctx, ldb, next,
+                               ret = really_parse_trusted_dn(mem_ctx, ldb, next,
                                                              attr->syntax->ldap_oid);
                                if (ret != LDB_SUCCESS) {
                                        return ret;
@@ -7680,7 +8006,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                        }
                        offset = next - pdn_list;
                        if (offset > old_el->num_values) {
-                               talloc_free(tmp_ctx);
                                return LDB_ERR_OPERATIONS_ERROR;
                        }
                }
@@ -7704,14 +8029,13 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        }
 
        /* set the link attribute's value to the info that was received */
-       ret = replmd_set_la_val(tmp_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
+       ret = replmd_set_la_val(mem_ctx, val_to_update, dsdb_dn, old_dsdb_dn,
                                &la->meta_data.originating_invocation_id,
                                la->meta_data.originating_usn, seq_num,
                                la->meta_data.originating_change_time,
                                la->meta_data.version,
                                !active);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
@@ -7724,7 +8048,6 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                               val_to_update);
 
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
 
@@ -7737,55 +8060,38 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                                          &guid, true, attr,
                                          parent);
                if (ret != LDB_SUCCESS) {
-                       talloc_free(tmp_ctx);
                        return ret;
                }
        }
 
        /* we only change whenChanged and uSNChanged if the seq_num
           has changed */
+       ldb_msg_remove_attr(msg, "whenChanged");
+       ldb_msg_remove_attr(msg, "uSNChanged");
        ret = add_time_element(msg, "whenChanged", t);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        ret = add_uint64_element(ldb, msg, "uSNChanged", seq_num);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                ldb_operr(ldb);
                return ret;
        }
 
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
-               talloc_free(tmp_ctx);
                return ldb_operr(ldb);
        }
 
        ret = dsdb_check_single_valued_link(attr, old_el);
        if (ret != LDB_SUCCESS) {
-               talloc_free(tmp_ctx);
                return ret;
        }
 
        old_el->flags |= LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK;
 
-       ret = linked_attr_modify(module, msg, parent);
-       if (ret != LDB_SUCCESS) {
-               ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s'\n%s\n",
-                         ldb_errstring(ldb),
-                         ldb_ldif_message_redacted_string(ldb,
-                                                          tmp_ctx,
-                                                          LDB_CHANGETYPE_MODIFY,
-                                                          msg));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       talloc_free(tmp_ctx);
-
        return ret;
 }
 
@@ -7826,6 +8132,106 @@ static int replmd_start_transaction(struct ldb_module *module)
        return ldb_next_start_trans(module);
 }
 
+/**
+ * Processes a group of linked attributes that apply to the same source-object
+ * and attribute-ID
+ */
+static int replmd_process_la_group(struct ldb_module *module,
+                                  struct replmd_private *replmd_private,
+                                  struct la_group *la_group)
+{
+       struct la_entry *la = NULL;
+       struct la_entry *prev = NULL;
+       int ret;
+       TALLOC_CTX *tmp_ctx = talloc_new(la_group);
+       struct la_entry *first_la = DLIST_TAIL(la_group->la_entries);
+       struct ldb_message *msg = NULL;
+       enum deletion_state deletion_state = OBJECT_NOT_DELETED;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+       const struct dsdb_attribute *attr = NULL;
+
+       /*
+        * get the attribute being modified and the search result for the
+        * source object
+        */
+       ret = replmd_get_la_entry_source(module, first_la, tmp_ctx, &attr,
+                                        &msg);
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /*
+        * Check for deleted objects per MS-DRSR 4.1.10.6.14
+        * ProcessLinkValue, because link updates are not applied to
+        * recycled and tombstone objects.  We don't have to delete
+        * any existing link, that should have happened when the
+        * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
+        */
+       replmd_deletion_state(module, msg, &deletion_state, NULL);
+
+       if (deletion_state >= OBJECT_RECYCLED) {
+               TALLOC_FREE(tmp_ctx);
+               return LDB_SUCCESS;
+       }
+
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
+       /* go through and process the link targets for this source object */
+       for (la = DLIST_TAIL(la_group->la_entries); la; la=prev) {
+               prev = DLIST_PREV(la);
+               DLIST_REMOVE(la_group->la_entries, la);
+               ret = replmd_process_linked_attribute(module, tmp_ctx,
+                                                     replmd_private,
+                                                     msg, attr, la, NULL);
+               if (ret != LDB_SUCCESS) {
+                       replmd_txn_cleanup(replmd_private);
+                       return ret;
+               }
+
+               if ((++replmd_private->num_processed % 8192) == 0) {
+                       DBG_NOTICE("Processed %u/%u linked attributes\n",
+                                  replmd_private->num_processed,
+                                  replmd_private->total_links);
+               }
+       }
+
+       /* apply the link changes to the source object */
+       ret = linked_attr_modify(module, msg, NULL);
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(ldb, LDB_DEBUG_WARNING,
+                         "Failed to apply linked attribute change '%s'\n%s\n",
+                         ldb_errstring(ldb),
+                         ldb_ldif_message_redacted_string(ldb,
+                                                          tmp_ctx,
+                                                          LDB_CHANGETYPE_MODIFY,
+                                                          msg));
+               TALLOC_FREE(tmp_ctx);
+               return ret;
+       }
+
+       TALLOC_FREE(tmp_ctx);
+       return LDB_SUCCESS;
+}
+
 /*
   on prepare commit we loop over our queued la_context structures and
   apply each of them
@@ -7834,21 +8240,31 @@ static int replmd_prepare_commit(struct ldb_module *module)
 {
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-       struct la_entry *la, *prev;
+       struct la_group *la_group, *prev;
        int ret;
 
+       if (replmd_private->la_list != NULL) {
+               DBG_NOTICE("Processing linked attributes\n");
+       }
+
        /*
         * Walk the list of linked attributes from DRS replication.
         *
         * We walk backwards, to do the first entry first, as we
         * added the entries with DLIST_ADD() which puts them at the
         * start of the list
+        *
+        * Links are grouped together so we process links for the same
+        * source object in one go.
         */
-       for (la = DLIST_TAIL(replmd_private->la_list); la; la=prev) {
-               prev = DLIST_PREV(la);
-               DLIST_REMOVE(replmd_private->la_list, la);
-               ret = replmd_process_linked_attribute(module, replmd_private,
-                                                     la, NULL);
+       for (la_group = DLIST_TAIL(replmd_private->la_list);
+            la_group != NULL;
+            la_group = prev) {
+
+               prev = DLIST_PREV(la_group);
+               DLIST_REMOVE(replmd_private->la_list, la_group);
+               ret = replmd_process_la_group(module, replmd_private,
+                                             la_group);
                if (ret != LDB_SUCCESS) {
                        replmd_txn_cleanup(replmd_private);
                        return ret;