s4:repl_meta_data: pass down struct replmd_replicated_request to replmd_modify_la_add()
[samba.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index bdc6aee944ebb6a8e0945081abed998532edf1bc..d5d9c4f8eeb171cafc5ca4a37bbc2f80b5bc9cb3 100644 (file)
@@ -2039,8 +2039,12 @@ static int get_parsed_dns(struct ldb_module *module, TALLOC_CTX *mem_ctx,
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
                        /* we got a DN without a GUID - go find the GUID */
                        int ret = dsdb_module_guid_by_dn(module, dn, &p->guid, parent);
                        if (ret != LDB_SUCCESS) {
-                               ldb_asprintf_errstring(ldb, "Unable to find GUID for DN %s\n",
-                                                      ldb_dn_get_linearized(dn));
+                               char *dn_str = NULL;
+                               dn_str = ldb_dn_get_extended_linearized(mem_ctx,
+                                                                       (dn), 1);
+                               ldb_asprintf_errstring(ldb,
+                                               "Unable to find GUID for DN %s\n",
+                                               dn_str);
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
                                if (ret == LDB_ERR_NO_SUCH_OBJECT &&
                                    LDB_FLAG_MOD_TYPE(el->flags) == LDB_FLAG_MOD_DELETE &&
                                    ldb_attr_cmp(el->name, "member") == 0) {
@@ -2398,12 +2402,11 @@ static int replmd_update_la_val(TALLOC_CTX *mem_ctx, struct ldb_val *v, struct d
  */
 static int replmd_modify_la_add(struct ldb_module *module,
                                struct replmd_private *replmd_private,
  */
 static int replmd_modify_la_add(struct ldb_module *module,
                                struct replmd_private *replmd_private,
-                               const struct dsdb_schema *schema,
+                               struct replmd_replicated_request *ac,
                                struct ldb_message *msg,
                                struct ldb_message_element *el,
                                struct ldb_message_element *old_el,
                                const struct dsdb_attribute *schema_attr,
                                struct ldb_message *msg,
                                struct ldb_message_element *el,
                                struct ldb_message_element *old_el,
                                const struct dsdb_attribute *schema_attr,
-                               uint64_t seq_num,
                                time_t t,
                                struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
                                time_t t,
                                struct ldb_dn *msg_dn,
                                struct ldb_request *parent)
@@ -2416,17 +2419,10 @@ static int replmd_modify_la_add(struct ldb_module *module,
        unsigned old_num_values = old_el ? old_el->num_values : 0;
        unsigned num_values = 0;
        unsigned max_num_values;
        unsigned old_num_values = old_el ? old_el->num_values : 0;
        unsigned num_values = 0;
        unsigned max_num_values;
-       const struct GUID *invocation_id;
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
        unix_to_nt_time(&now, t);
 
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        NTTIME now;
        unix_to_nt_time(&now, t);
 
-       invocation_id = samdb_ntds_invocation_id(ldb);
-       if (!invocation_id) {
-               talloc_free(tmp_ctx);
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
        /* get the DNs to be added, fully parsed.
         *
         * We need full parsing because they came off the wire and we don't
        /* get the DNs to be added, fully parsed.
         *
         * We need full parsing because they came off the wire and we don't
@@ -2522,15 +2518,16 @@ static int replmd_modify_la_add(struct ldb_module *module,
                        ret = replmd_update_la_val(new_values, exact->v,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
                        ret = replmd_update_la_val(new_values, exact->v,
                                                   dns[i].dsdb_dn,
                                                   exact->dsdb_dn,
-                                                  invocation_id, seq_num,
-                                                  seq_num, now, false);
+                                                  &ac->our_invocation_id,
+                                                  ac->seq_num, ac->seq_num,
+                                                  now, false);
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
 
                        ret = replmd_add_backlink(module, replmd_private,
                        if (ret != LDB_SUCCESS) {
                                talloc_free(tmp_ctx);
                                return ret;
                        }
 
                        ret = replmd_add_backlink(module, replmd_private,
-                                                 schema,
+                                                 ac->schema,
                                                  msg_dn,
                                                  &dns[i].guid, 
                                                  true,
                                                  msg_dn,
                                                  &dns[i].guid, 
                                                  true,
@@ -2572,14 +2569,14 @@ static int replmd_modify_la_add(struct ldb_module *module,
                }
 
                ret = replmd_add_backlink(module, replmd_private,
                }
 
                ret = replmd_add_backlink(module, replmd_private,
-                                         schema, msg_dn,
+                                         ac->schema, msg_dn,
                                          &dns[i].guid,
                                          true, schema_attr,
                                          parent);
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
                                          &dns[i].guid,
                                          true, schema_attr,
                                          parent);
                /* Make the new linked attribute ldb_val. */
                ret = replmd_build_la_val(new_values, &new_values[num_values],
-                                         dns[i].dsdb_dn, invocation_id,
-                                         seq_num, now);
+                                         dns[i].dsdb_dn, &ac->our_invocation_id,
+                                         ac->seq_num, now);
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
                if (ret != LDB_SUCCESS) {
                        talloc_free(tmp_ctx);
                        return ret;
@@ -2844,11 +2841,23 @@ static int replmd_modify_la_delete(struct ldb_module *module,
 
        if (vanish_links) {
                unsigned j = 0;
 
        if (vanish_links) {
                unsigned j = 0;
+               struct ldb_val *tmp_vals = NULL;
+
+               tmp_vals = talloc_array(tmp_ctx, struct ldb_val,
+                                       old_el->num_values);
+               if (tmp_vals == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_oom(module);
+               }
                for (i = 0; i < old_el->num_values; i++) {
                for (i = 0; i < old_el->num_values; i++) {
-                       if (old_dns[i].v != NULL) {
-                               old_el->values[j] = *old_dns[i].v;
-                               j++;
+                       if (old_dns[i].v == NULL) {
+                               continue;
                        }
                        }
+                       tmp_vals[j] = *old_dns[i].v;
+                       j++;
+               }
+               for (i = 0; i < j; i++) {
+                       old_el->values[i] = tmp_vals[i];
                }
                old_el->num_values = j;
        }
                }
                old_el->num_values = j;
        }
@@ -3104,8 +3113,9 @@ static int replmd_modify_la_replace(struct ldb_module *module,
  */
 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                                               struct replmd_private *replmd_private,
  */
 static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                                               struct replmd_private *replmd_private,
+                                              struct replmd_replicated_request *ac,
                                               struct ldb_message *msg,
                                               struct ldb_message *msg,
-                                              uint64_t seq_num, time_t t,
+                                              time_t t,
                                               struct ldb_request *parent)
 {
        struct ldb_result *res;
                                               struct ldb_request *parent)
 {
        struct ldb_result *res;
@@ -3114,8 +3124,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *old_msg;
 
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        struct ldb_message *old_msg;
 
-       const struct dsdb_schema *schema;
-
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
                 * Nothing special is required for modifying or vanishing links
        if (dsdb_functional_level(ldb) == DS_DOMAIN_FUNCTION_2000) {
                /*
                 * Nothing special is required for modifying or vanishing links
@@ -3149,10 +3157,6 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
        if (ret != LDB_SUCCESS) {
                return ret;
        }
        if (ret != LDB_SUCCESS) {
                return ret;
        }
-       schema = dsdb_get_schema(ldb, res);
-       if (!schema) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
 
        old_msg = res->msgs[0];
 
 
        old_msg = res->msgs[0];
 
@@ -3161,7 +3165,7 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                struct ldb_message_element *old_el, *new_el;
                unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
                struct ldb_message_element *old_el, *new_el;
                unsigned int mod_type = LDB_FLAG_MOD_TYPE(el->flags);
                const struct dsdb_attribute *schema_attr
-                       = dsdb_attribute_by_lDAPDisplayName(schema, el->name);
+                       = dsdb_attribute_by_lDAPDisplayName(ac->schema, el->name);
                if (!schema_attr) {
                        ldb_asprintf_errstring(ldb,
                                               "%s: attribute %s is not a valid attribute in schema",
                if (!schema_attr) {
                        ldb_asprintf_errstring(ldb,
                                               "%s: attribute %s is not a valid attribute in schema",
@@ -3172,9 +3176,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
                        continue;
                }
                if ((schema_attr->linkID & 1) == 1) {
-                       if (parent && ldb_request_get_control(parent, DSDB_CONTROL_DBCHECK)) {
-                               continue;
+                       if (parent) {
+                               struct ldb_control *ctrl;
+
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_REPLMD_VANISH_LINKS);
+                               if (ctrl != NULL) {
+                                       ctrl->critical = false;
+                                       continue;
+                               }
+                               ctrl = ldb_request_get_control(parent,
+                                               DSDB_CONTROL_DBCHECK);
+                               if (ctrl != NULL) {
+                                       continue;
+                               }
                        }
                        }
+
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
                        /* Odd is for the target.  Illegal to modify */
                        ldb_asprintf_errstring(ldb,
                                               "attribute %s must not be modified directly, it is a linked attribute", el->name);
@@ -3184,22 +3201,22 @@ static int replmd_modify_handle_linked_attribs(struct ldb_module *module,
                switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
                switch (mod_type) {
                case LDB_FLAG_MOD_REPLACE:
                        ret = replmd_modify_la_replace(module, replmd_private,
-                                                      schema, msg, el, old_el,
-                                                      schema_attr, seq_num, t,
+                                                      ac->schema, msg, el, old_el,
+                                                      schema_attr, ac->seq_num, t,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
                                                       old_msg->dn,
                                                       parent);
                        break;
                case LDB_FLAG_MOD_DELETE:
                        ret = replmd_modify_la_delete(module, replmd_private,
-                                                     schema, msg, el, old_el,
-                                                     schema_attr, seq_num, t,
+                                                     ac->schema, msg, el, old_el,
+                                                     schema_attr, ac->seq_num, t,
                                                      old_msg->dn,
                                                      parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
                                                      old_msg->dn,
                                                      parent);
                        break;
                case LDB_FLAG_MOD_ADD:
                        ret = replmd_modify_la_add(module, replmd_private,
-                                                  schema, msg, el, old_el,
-                                                  schema_attr, seq_num, t,
+                                                  ac, msg, el, old_el,
+                                                  schema_attr, t,
                                                   old_msg->dn,
                                                   parent);
                        break;
                                                   old_msg->dn,
                                                   parent);
                        break;
@@ -3303,6 +3320,8 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
        unsigned int functional_level;
        const struct ldb_message_element *guid_el = NULL;
        struct ldb_control *sd_propagation_control;
+       struct ldb_control *fix_links_control = NULL;
+       struct ldb_control *fix_dn_name_control = NULL;
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
        struct replmd_private *replmd_private =
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
 
@@ -3328,6 +3347,102 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
 
        ldb = ldb_module_get_ctx(module);
 
 
        ldb = ldb_module_get_ctx(module);
 
+       fix_links_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_DUPLICATE_LINKS);
+       if (fix_links_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_REPLACE) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID == 0) {
+                       return ldb_module_operr(module);
+               }
+
+               fix_links_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
+       fix_dn_name_control = ldb_request_get_control(req,
+                                       DSDB_CONTROL_DBCHECK_FIX_LINK_DN_NAME);
+       if (fix_dn_name_control != NULL) {
+               struct dsdb_schema *schema = NULL;
+               const struct dsdb_attribute *sa = NULL;
+
+               if (req->op.mod.message->num_elements != 2) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].flags != LDB_FLAG_MOD_DELETE) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].flags != LDB_FLAG_MOD_ADD) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[0].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               if (req->op.mod.message->elements[1].num_values != 1) {
+                       return ldb_module_operr(module);
+               }
+
+               schema = dsdb_get_schema(ldb, req);
+               if (schema == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (ldb_attr_cmp(req->op.mod.message->elements[0].name,
+                                req->op.mod.message->elements[1].name) != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               sa = dsdb_attribute_by_lDAPDisplayName(schema,
+                               req->op.mod.message->elements[0].name);
+               if (sa == NULL) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->dn_format == DSDB_INVALID_DN) {
+                       return ldb_module_operr(module);
+               }
+
+               if (sa->linkID != 0) {
+                       return ldb_module_operr(module);
+               }
+
+               /*
+                * If we are run from dbcheck and we are not updating
+                * a link (as these would need to be sorted and so
+                * can't go via such a simple update, then do not
+                * trigger replicated updates and a new USN from this
+                * change, it wasn't a real change, just a new
+                * (correct) string DN
+                */
+
+               fix_dn_name_control->critical = false;
+               return ldb_next_request(module, req);
+       }
+
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_modify\n");
 
        guid_el = ldb_msg_find_element(req->op.mod.message, "objectGUID");
@@ -3373,7 +3488,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        }
 
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
        }
 
        ret = replmd_modify_handle_linked_attribs(module, replmd_private,
-                                                 msg, ac->seq_num, t, req);
+                                                 ac, msg, t, req);
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
        if (ret != LDB_SUCCESS) {
                talloc_free(ac);
                return ret;
@@ -3758,7 +3873,8 @@ static int replmd_delete_remove_link(struct ldb_module *module,
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
                ret = dsdb_module_search_dn(module, tmp_ctx, &link_res,
                                            msg->dn, attrs,
                                            DSDB_FLAG_NEXT_MODULE |
-                                           DSDB_SEARCH_SHOW_EXTENDED_DN,
+                                           DSDB_SEARCH_SHOW_EXTENDED_DN |
+                                           DSDB_SEARCH_SHOW_RECYCLED,
                                            parent);
 
                if (ret != LDB_SUCCESS) {
                                            parent);
 
                if (ret != LDB_SUCCESS) {
@@ -3912,7 +4028,12 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                "*",
                NULL
        };
                "*",
                NULL
        };
-       unsigned int i, el_count = 0;
+       static const struct ldb_val true_val = {
+               .data = discard_const_p(uint8_t, "TRUE"),
+               .length = 4
+       };
+       
+       unsigned int i;
        uint32_t dsdb_flags = 0;
        struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
        uint32_t dsdb_flags = 0;
        struct replmd_private *replmd_private;
        enum deletion_state deletion_state, next_deletion_state;
@@ -4063,6 +4184,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
        guid = samdb_result_guid(old_msg, "objectGUID");
 
        if (deletion_state == OBJECT_NOT_DELETED) {
+               struct ldb_message_element *is_deleted_el;
 
                ret = replmd_make_deleted_child_dn(tmp_ctx,
                                                   ldb,
 
                ret = replmd_make_deleted_child_dn(tmp_ctx,
                                                   ldb,
@@ -4075,14 +4197,15 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        return ret;
                }
 
                        return ret;
                }
 
-               ret = ldb_msg_add_string(msg, "isDeleted", "TRUE");
+               ret = ldb_msg_add_value(msg, "isDeleted", &true_val,
+                                       &is_deleted_el);
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
                if (ret != LDB_SUCCESS) {
                        ldb_asprintf_errstring(ldb, __location__
                                               ": Failed to add isDeleted string to the msg");
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               is_deleted_el->flags = LDB_FLAG_MOD_REPLACE;
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
        } else {
                /*
                 * No matter what has happened with other renames etc, try again to
@@ -4131,6 +4254,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
        if (deletion_state == OBJECT_NOT_DELETED) {
                struct ldb_dn *parent_dn = ldb_dn_get_parent(tmp_ctx, old_dn);
                char *parent_dn_str = NULL;
+               struct ldb_message_element *p_el;
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
 
                /* we need the storage form of the parent GUID */
                ret = dsdb_module_search_dn(module, tmp_ctx, &parent_res,
@@ -4174,7 +4298,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                        talloc_free(tmp_ctx);
                        return ret;
                }
                        talloc_free(tmp_ctx);
                        return ret;
                }
-               msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+               p_el = ldb_msg_find_element(msg,
+                                           "lastKnownParent");
+               if (p_el == NULL) {
+                       talloc_free(tmp_ctx);
+                       return ldb_module_operr(module);
+               }
+               p_el->flags = LDB_FLAG_MOD_REPLACE;
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
 
                if (next_deletion_state == OBJECT_DELETED) {
                        ret = ldb_msg_add_value(msg, "msDS-LastKnownRDN", rdn_value, NULL);
@@ -4186,7 +4316,13 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_ADD;
+                       p_el = ldb_msg_find_element(msg,
+                                                   "msDS-LastKnownRDN");
+                       if (p_el == NULL) {
+                               talloc_free(tmp_ctx);
+                               return ldb_module_operr(module);
+                       }
+                       p_el->flags = LDB_FLAG_MOD_ADD;
                }
        }
 
                }
        }
 
@@ -4215,14 +4351,17 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
                 * not activated and what ever the forest level is.
                 */
                if (dsdb_attribute_by_lDAPDisplayName(schema, "isRecycled") != NULL) {
-                       ret = ldb_msg_add_string(msg, "isRecycled", "TRUE");
+                       struct ldb_message_element *is_recycled_el;
+
+                       ret = ldb_msg_add_value(msg, "isRecycled", &true_val,
+                                               &is_recycled_el);
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
                        if (ret != LDB_SUCCESS) {
                                DEBUG(0,(__location__ ": Failed to add isRecycled string to the msg\n"));
                                ldb_module_oom(module);
                                talloc_free(tmp_ctx);
                                return ret;
                        }
-                       msg->elements[el_count++].flags = LDB_FLAG_MOD_REPLACE;
+                       is_recycled_el->flags = LDB_FLAG_MOD_REPLACE;
                }
 
                replmd_private = talloc_get_type(ldb_module_get_private(module),
                }
 
                replmd_private = talloc_get_type(ldb_module_get_private(module),
@@ -4240,6 +4379,7 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                /* don't remove the rDN */
                                continue;
                        }
                                /* don't remove the rDN */
                                continue;
                        }
+
                        if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
                        if (sa->linkID & 1) {
                                /*
                                  we have a backlink in this object
@@ -4253,7 +4393,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
                                                                replmd_private,
                                                                old_dn, &guid,
                                                                el, sa, req);
-                               if (ret != LDB_SUCCESS) {
+                               if (ret == LDB_SUCCESS) {
+                                       /*
+                                        * now we continue, which means we
+                                        * won't remove this backlink
+                                        * directly
+                                        */
+                                       continue;
+                               }
+
+                               if (ret != LDB_ERR_NO_SUCH_ATTRIBUTE) {
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
                                        const char *old_dn_str
                                                = ldb_dn_get_linearized(old_dn);
                                        ldb_asprintf_errstring(ldb,
@@ -4266,11 +4415,16 @@ static int replmd_delete_internals(struct ldb_module *module, struct ldb_request
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
                                        talloc_free(tmp_ctx);
                                        return LDB_ERR_OPERATIONS_ERROR;
                                }
-                               /* now we continue, which means we
-                                  won't remove this backlink
-                                  directly
-                               */
-                               continue;
+
+                               /*
+                                * Otherwise vanish the link, we are
+                                * out of sync and the controlling
+                                * object does not have the source
+                                * link any more
+                                */
+
+                               dsdb_flags |= DSDB_REPLMD_VANISH_LINKS;
+
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
                        } else if (sa->linkID == 0) {
                                if (ldb_attr_in_list(preserved_attrs, el->name)) {
                                        continue;
@@ -4519,6 +4673,7 @@ static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
 {
        struct ldb_val deleted_child_rdn_val;
        struct GUID_txt_buf guid_str;
 {
        struct ldb_val deleted_child_rdn_val;
        struct GUID_txt_buf guid_str;
+       int ret;
        bool retb;
 
        GUID_buf_string(&guid, &guid_str);
        bool retb;
 
        GUID_buf_string(&guid, &guid_str);
@@ -4531,7 +4686,16 @@ static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-
+       /*
+        * TODO: Per MS-ADTS 3.1.1.5.5 Delete Operation
+        * we should truncate this value to ensure the RDN is not more than 255 chars.
+        *
+        * However we MS-ADTS 3.1.1.5.1.2 Naming Constraints indicates that:
+        *
+        * "Naming constraints are not enforced for replicated
+        * updates." so this is safe and we don't have to work out not
+        * splitting a UTF8 char right now.
+        */
        deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
 
        /*
        deleted_child_rdn_val = ldb_val_dup(tmp_ctx, rdn_value);
 
        /*
@@ -4576,10 +4740,13 @@ static int replmd_make_prefix_child_dn(TALLOC_CTX *tmp_ctx,
               sizeof(guid_str.buf));
 
        /* Now set the value into the RDN, without parsing it */
               sizeof(guid_str.buf));
 
        /* Now set the value into the RDN, without parsing it */
-       ldb_dn_set_component(dn, 0, rdn_name,
-                            deleted_child_rdn_val);
+       ret = ldb_dn_set_component(
+               dn,
+               0,
+               rdn_name,
+               deleted_child_rdn_val);
 
 
-       return LDB_SUCCESS;
+       return ret;
 }
 
 
 }
 
 
@@ -4643,7 +4810,7 @@ static int replmd_make_deleted_child_dn(TALLOC_CTX *tmp_ctx,
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
   their current values. This has the effect of changing these
   attributes to have been last updated by the current DC. This is
   needed to ensure that renames performed as part of conflict
-  resolution are propogated to other DCs
+  resolution are propagated to other DCs
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
  */
 static int replmd_name_modify(struct replmd_replicated_request *ar,
                              struct ldb_request *req, struct ldb_dn *dn)
@@ -4848,6 +5015,7 @@ static int replmd_op_possible_conflict_callback(struct ldb_request *req, struct
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
                                       "Conflict adding object '%s' from incoming replication as we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
                goto failed;
        }
 
@@ -5024,6 +5192,9 @@ failed:
         * replication will stop with an error, but there is not much
         * else we can do.
         */
         * replication will stop with an error, but there is not much
         * else we can do.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
        return ldb_module_done(ar->req, NULL, NULL,
                               ret);
 }
@@ -5247,7 +5418,7 @@ static int replmd_replicated_apply_search_for_parent_callback(struct ldb_request
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
        {
                struct ldb_message *parent_msg = ares->message;
                struct ldb_message *msg = ar->objs->objects[ar->index_current].msg;
-               struct ldb_dn *parent_dn;
+               struct ldb_dn *parent_dn = NULL;
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
                int comp_num;
 
                if (!ldb_msg_check_string_attribute(msg, "isDeleted", "TRUE")
@@ -5486,6 +5657,7 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
                                       "Conflict adding object '%s' from incoming replication but we are read only for the partition.  \n"
                                       " - We must fail the operation until a master for this partition resolves the conflict",
                                       ldb_dn_get_linearized(conflict_dn));
+               ret = LDB_ERR_OPERATIONS_ERROR;
                goto failed;
        }
 
                goto failed;
        }
 
@@ -5633,8 +5805,10 @@ static int replmd_replicated_handle_rename(struct replmd_replicated_request *ar,
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
                         ldb_errstring(ldb_module_get_ctx(ar->module))));
                        goto failed;
        }
-failed:
 
 
+       talloc_free(tmp_ctx);
+       return ret;
+failed:
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
        /*
         * On failure make the caller get the error
         * This means replication will stop with an error,
@@ -5642,6 +5816,9 @@ failed:
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
         * LDB_ERR_ENTRY_ALREADY_EXISTS case this is exactly what is
         * needed.
         */
+       if (ret == LDB_SUCCESS) {
+               ret = LDB_ERR_OPERATIONS_ERROR;
+       }
 
        talloc_free(tmp_ctx);
        return ret;
 
        talloc_free(tmp_ctx);
        return ret;
@@ -7128,6 +7305,19 @@ linked_attributes[0]:
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       /*
+        * All attributes listed here must be dealt with in some way
+        * by replmd_process_linked_attribute() otherwise in the case
+        * of isDeleted: FALSE the modify will fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
        attrs[0] = attr->lDAPDisplayName;
        attrs[1] = "isDeleted";
        attrs[2] = "isRecycled";
@@ -7508,6 +7698,9 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
         * recycled and tombstone objects.  We don't have to delete
         * any existing link, that should have happened when the
         * object deletion was replicated or initiated.
         * recycled and tombstone objects.  We don't have to delete
         * any existing link, that should have happened when the
         * object deletion was replicated or initiated.
+        *
+        * This needs isDeleted and isRecycled to be included as
+        * attributes in the search and so in msg if set.
         */
        replmd_deletion_state(module, msg, &deletion_state, NULL);
 
         */
        replmd_deletion_state(module, msg, &deletion_state, NULL);
 
@@ -7516,6 +7709,24 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
                return LDB_SUCCESS;
        }
 
                return LDB_SUCCESS;
        }
 
+       /*
+        * Now that we know the deletion_state, remove the extra
+        * attributes added for that purpose.  We need to do this
+        * otherwise in the case of isDeleted: FALSE the modify will
+        * fail with:
+        *
+        * Failed to apply linked attribute change 'attribute 'isDeleted':
+        * invalid modify flags on
+        * 'CN=g1_1527570609273,CN=Users,DC=samba,DC=example,DC=com':
+        * 0x0'
+        *
+        * This is becaue isDeleted is a Boolean, so FALSE is a
+        * legitimate value (set by Samba's deletetest.py)
+        */
+
+       ldb_msg_remove_attr(msg, "isDeleted");
+       ldb_msg_remove_attr(msg, "isRecycled");
+
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);
        old_el = ldb_msg_find_element(msg, attr->lDAPDisplayName);
        if (old_el == NULL) {
                ret = ldb_msg_add_empty(msg, attr->lDAPDisplayName, LDB_FLAG_MOD_REPLACE, &old_el);