s4:dsdb Set 'notification' after the success of a change.
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / repl_meta_data.c
index c4699366455fddd1eed6370b280ab38626c3d11b..a0dd111bdf4e83dddf6e6c706ee04e544a98c969 100644 (file)
@@ -6,22 +6,18 @@
    Copyright (C) Andrew Tridgell 2005
    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
 
-     ** NOTE! The following LGPL license applies to the ldb
-     ** library. This does NOT imply that all of Samba is released
-     ** under the LGPL
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
    
-   This library is free software; you can redistribute it and/or
-   modify it under the terms of the GNU Lesser General Public
-   License as published by the Free Software Foundation; either
-   version 3 of the License, or (at your option) any later version.
-
-   This library is distributed in the hope that it will be useful,
+   This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   Lesser General Public License for more details.
-
-   You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 /*
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "param/param.h"
 #include "libcli/security/dom_sid.h"
-#include "dlinklist.h"
+#include "lib/util/dlinklist.h"
 
 struct replmd_private {
+       TALLOC_CTX *la_ctx;
        struct la_entry *la_list;
+       uint32_t num_ncs;
+       struct nc_entry {
+               struct ldb_dn *dn;
+               struct GUID guid;
+               uint64_t mod_usn;
+       } *ncs;
 };
 
 struct la_entry {
@@ -73,8 +76,192 @@ struct replmd_replicated_request {
        uint32_t index_current;
 
        struct ldb_message *search_msg;
+
+       uint64_t seq_num;
 };
 
+
+/*
+  initialise the module
+  allocate the private structure and build the list
+  of partition DNs for use by replmd_notify()
+ */
+static int replmd_init(struct ldb_module *module)
+{
+       struct replmd_private *replmd_private;
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       replmd_private = talloc_zero(module, struct replmd_private);
+       if (replmd_private == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       ldb_module_set_private(module, replmd_private);
+
+       return ldb_next_init(module);
+}
+
+
+static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
+{
+       return ldb_dn_compare(n1->dn, n2->dn);
+}
+
+/*
+  build the list of partition DNs for use by replmd_notify()
+ */
+static int replmd_load_NCs(struct ldb_module *module)
+{
+       const char *attrs[] = { "namingContexts", NULL };
+       struct ldb_result *res = NULL;
+       int i, ret;
+       TALLOC_CTX *tmp_ctx;
+       struct ldb_context *ldb;
+       struct ldb_message_element *el;
+       struct replmd_private *replmd_private = 
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+
+       if (replmd_private->ncs != NULL) {
+               return LDB_SUCCESS;
+       }
+
+       ldb = ldb_module_get_ctx(module);
+       tmp_ctx = talloc_new(module);
+
+       /* load the list of naming contexts */
+       ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""), 
+                        LDB_SCOPE_BASE, attrs, NULL);
+       if (ret != LDB_SUCCESS ||
+           res->count != 1) {
+               DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       el = ldb_msg_find_element(res->msgs[0], "namingContexts");
+       if (el == NULL) {
+               DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+
+       replmd_private->num_ncs = el->num_values;
+       replmd_private->ncs = talloc_array(replmd_private, struct nc_entry, 
+                                          replmd_private->num_ncs);
+       if (replmd_private->ncs == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               replmd_private->ncs[i].dn = 
+                       ldb_dn_from_ldb_val(replmd_private->ncs, 
+                                           ldb, &el->values[i]);
+               replmd_private->ncs[i].mod_usn = 0;
+       }
+
+       talloc_free(res);
+
+       /* now find the GUIDs of each of those DNs */
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               const char *attrs2[] = { "objectGUID", NULL };
+               ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
+                                LDB_SCOPE_BASE, attrs2, NULL);
+               if (ret != LDB_SUCCESS ||
+                   res->count != 1) {
+                       /* this happens when the schema is first being
+                          setup */
+                       talloc_free(replmd_private->ncs);
+                       replmd_private->ncs = NULL;
+                       replmd_private->num_ncs = 0;
+                       talloc_free(tmp_ctx);
+                       return LDB_SUCCESS;
+               }
+               replmd_private->ncs[i].guid = 
+                       samdb_result_guid(res->msgs[0], "objectGUID");
+               talloc_free(res);
+       }       
+
+       /* sort the NCs into order, most to least specific */
+       qsort(replmd_private->ncs, replmd_private->num_ncs,
+             sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
+
+       
+       talloc_free(tmp_ctx);
+       
+       return LDB_SUCCESS;
+}
+
+
+/*
+ * notify the repl task that a object has changed. The notifies are
+ * gathered up in the replmd_private structure then written to the
+ * @REPLCHANGED object in each partition during the prepare_commit
+ */
+static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
+{
+       int ret, i;
+       struct replmd_private *replmd_private = 
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+
+       ret = replmd_load_NCs(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       if (replmd_private->num_ncs == 0) {
+               return LDB_SUCCESS;
+       }
+
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
+                       break;
+               }
+       }
+       if (i == replmd_private->num_ncs) {
+               DEBUG(0,(__location__ ": DN not within known NCs '%s'\n", 
+                        ldb_dn_get_linearized(dn)));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       if (uSN > replmd_private->ncs[i].mod_usn) {
+           replmd_private->ncs[i].mod_usn = uSN;
+       }
+
+       return LDB_SUCCESS;
+}
+
+
+/*
+ * update a @REPLCHANGED record in each partition if there have been
+ * any writes of replicated data in the partition
+ */
+static int replmd_notify_store(struct ldb_module *module)
+{
+       int i;
+       struct replmd_private *replmd_private = 
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+       struct ldb_context *ldb = ldb_module_get_ctx(module);
+
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               int ret;
+
+               if (replmd_private->ncs[i].mod_usn == 0) {
+                       /* this partition has not changed in this
+                          transaction */
+                       continue;
+               }
+
+               ret = dsdb_save_partition_usn(ldb, replmd_private->ncs[i].dn, 
+                                             replmd_private->ncs[i].mod_usn);
+               if (ret != LDB_SUCCESS) {
+                       DEBUG(0,(__location__ ": Failed to save partition uSN for %s\n",
+                                ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
+                       return ret;
+               }
+       }
+
+       return LDB_SUCCESS;
+}
+
+
 /*
   created a replmd_replicated_request context
  */
@@ -178,11 +365,33 @@ static int replmd_replPropertyMetaData1_attid_sort(const struct replPropertyMeta
        return m1->attid - m2->attid;
 }
 
-static void replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
-                                                const uint32_t *rdn_attid)
+static int replmd_replPropertyMetaDataCtr1_sort(struct replPropertyMetaDataCtr1 *ctr1,
+                                               const struct dsdb_schema *schema,
+                                               struct ldb_dn *dn)
 {
+       const char *rdn_name;
+       const struct dsdb_attribute *rdn_sa;
+
+       rdn_name = ldb_dn_get_rdn_name(dn);
+       if (!rdn_name) {
+               DEBUG(0,(__location__ ": No rDN for %s?\n", ldb_dn_get_linearized(dn)));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       rdn_sa = dsdb_attribute_by_lDAPDisplayName(schema, rdn_name);
+       if (rdn_sa == NULL) {
+               DEBUG(0,(__location__ ": No sa found for rDN %s for %s\n", rdn_name, ldb_dn_get_linearized(dn)));
+               return LDB_ERR_OPERATIONS_ERROR;                
+       }
+
+       DEBUG(6,("Sorting rpmd with attid exception %u rDN=%s DN=%s\n", 
+                rdn_sa->attributeID_id, rdn_name, ldb_dn_get_linearized(dn)));
+
        ldb_qsort(ctr1->array, ctr1->count, sizeof(struct replPropertyMetaData1),
-                 discard_const_p(void, rdn_attid), (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
+                 discard_const_p(void, &rdn_sa->attributeID_id), 
+                 (ldb_qsort_cmp_fn_t)replmd_replPropertyMetaData1_attid_sort);
+
+       return LDB_SUCCESS;
 }
 
 static int replmd_ldb_message_element_attid_sort(const struct ldb_message_element *e1,
@@ -222,6 +431,7 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
 {
        struct ldb_context *ldb;
        struct replmd_replicated_request *ac;
+       int ret;
 
        ac = talloc_get_type(req->context, struct replmd_replicated_request);
        ldb = ldb_module_get_ctx(ac->module);
@@ -243,6 +453,11 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
                                        LDB_ERR_OPERATIONS_ERROR);
        }
 
+       ret = replmd_notify(ac->module, req->op.add.message->dn, ac->seq_num);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
        return ldb_module_done(ac->req, ares->controls,
                                ares->response, LDB_SUCCESS);
 }
@@ -250,23 +465,32 @@ static int replmd_op_callback(struct ldb_request *req, struct ldb_reply *ares)
 static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 {
        struct ldb_context *ldb;
+        struct ldb_control *control;
+        struct ldb_control **saved_controls;
        struct replmd_replicated_request *ac;
        const struct dsdb_schema *schema;
        enum ndr_err_code ndr_err;
        struct ldb_request *down_req;
        struct ldb_message *msg;
-       const struct dsdb_attribute *rdn_attr = NULL;
+        const DATA_BLOB *guid_blob;
        struct GUID guid;
        struct ldb_val guid_value;
        struct replPropertyMetaDataBlob nmd;
        struct ldb_val nmd_value;
-       uint64_t seq_num;
        const struct GUID *our_invocation_id;
        time_t t = time(NULL);
        NTTIME now;
        char *time_str;
        int ret;
        uint32_t i, ni=0;
+       bool allow_add_guid = false;
+       bool remove_current_guid = false;
+
+        /* check if there's a show relax control (used by provision to say 'I know what I'm doing') */
+        control = ldb_request_get_control(req, LDB_CONTROL_RELAX_OID);
+       if (control) {
+               allow_add_guid = 1;
+       }
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.add.message->dn)) {
@@ -280,7 +504,8 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        schema = dsdb_get_schema(ldb);
        if (!schema) {
                ldb_debug_set(ldb, LDB_DEBUG_FATAL,
-                             "replmd_modify: no dsdb_schema loaded");
+                             "replmd_add: no dsdb_schema loaded");
+               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
                return LDB_ERR_CONSTRAINT_VIOLATION;
        }
 
@@ -291,26 +516,43 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
 
        ac->schema = schema;
 
-       if (ldb_msg_find_element(req->op.add.message, "objectGUID") != NULL) {
-               ldb_debug_set(ldb, LDB_DEBUG_ERROR,
+        guid_blob = ldb_msg_find_ldb_val(req->op.add.message, "objectGUID");
+       if ( guid_blob != NULL ) {
+               if( !allow_add_guid ) {
+                       ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                              "replmd_add: it's not allowed to add an object with objectGUID\n");
-               return LDB_ERR_UNWILLING_TO_PERFORM;
+                       talloc_free(ac);
+                       return LDB_ERR_UNWILLING_TO_PERFORM;
+               } else {
+                       NTSTATUS status = GUID_from_data_blob(guid_blob,&guid);
+                       if ( !NT_STATUS_IS_OK(status)) {
+                                       ldb_debug_set(ldb, LDB_DEBUG_ERROR,
+                                     "replmd_add: Unable to parse as a GUID the attribute objectGUID\n");
+                               talloc_free(ac);
+                               return LDB_ERR_UNWILLING_TO_PERFORM;
+                       }
+                       /* we remove this attribute as it can be a string and will not be treated 
+                       correctly and then we will readd it latter on in the good format*/
+                       remove_current_guid = true;
+               }
+       } else {
+               /* a new GUID */
+               guid = GUID_random();
        }
 
        /* Get a sequence number from the backend */
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &ac->seq_num);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
-       /* a new GUID */
-       guid = GUID_random();
-
        /* get our invocationId */
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
                ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                              "replmd_add: unable to find invocationId\n");
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -318,6 +560,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        msg = ldb_msg_copy_shallow(ac, req->op.add.message);
        if (msg == NULL) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -325,8 +568,13 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        unix_to_nt_time(&now, t);
        time_str = ldb_timestring(msg, t);
        if (!time_str) {
+               ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
+       if (remove_current_guid) {
+               ldb_msg_remove_attr(msg,"objectGUID");
+       }
 
        /* 
         * remove autogenerated attributes
@@ -337,13 +585,23 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        ldb_msg_remove_attr(msg, "uSNChanged");
        ldb_msg_remove_attr(msg, "replPropertyMetaData");
 
+       if (!ldb_msg_find_element(req->op.add.message, "instanceType")) {
+               ret = ldb_msg_add_fmt(msg, "instanceType", "%u", INSTANCE_TYPE_WRITE);
+               if (ret != LDB_SUCCESS) {
+                       ldb_oom(ldb);
+                       talloc_free(ac);
+                       return ret;
+               }
+       }
+
        /*
         * readd replicated attributes
         */
        ret = ldb_msg_add_string(msg, "whenCreated", time_str);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
 
        /* build the replication meta_data */
@@ -355,6 +613,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                               nmd.ctr.ctr1.count);
        if (!nmd.ctr.ctr1.array) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -370,10 +629,11 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                        ldb_debug_set(ldb, LDB_DEBUG_ERROR,
                                      "replmd_add: attribute '%s' not defined in schema\n",
                                      e->name);
+                       talloc_free(ac);
                        return LDB_ERR_NO_SUCH_ATTRIBUTE;
                }
 
-               if ((sa->systemFlags & 0x00000001) || (sa->systemFlags & 0x00000004)) {
+               if ((sa->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (sa->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
                        /* if the attribute is not replicated (0x00000001)
                         * or constructed (0x00000004) it has no metadata
                         */
@@ -384,13 +644,9 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                m->version                      = 1;
                m->originating_change_time      = now;
                m->originating_invocation_id    = *our_invocation_id;
-               m->originating_usn              = seq_num;
-               m->local_usn                    = seq_num;
+               m->originating_usn              = ac->seq_num;
+               m->local_usn                    = ac->seq_num;
                ni++;
-
-               if (ldb_attr_cmp(e->name, ldb_dn_get_rdn_name(msg->dn))) {
-                       rdn_attr = sa;
-               }
        }
 
        /* fix meta data count */
@@ -399,7 +655,11 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        /*
         * sort meta data array, and move the rdn attribute entry to the end
         */
-       replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_attr->attributeID_id);
+       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, schema, msg->dn);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
+               return ret;
+       }
 
        /* generated NDR encoded values */
        ndr_err = ndr_push_struct_blob(&guid_value, msg, 
@@ -408,6 +668,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                       (ndr_push_flags_fn_t)ndr_push_GUID);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
        ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
@@ -416,6 +677,7 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                       (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
        if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                ldb_oom(ldb);
+               talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
@@ -425,27 +687,32 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
        ret = ldb_msg_add_value(msg, "objectGUID", &guid_value, NULL);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
        ret = ldb_msg_add_string(msg, "whenChanged", time_str);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNCreated", ac->seq_num);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
-       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", seq_num);
+       ret = samdb_msg_add_uint64(ldb, msg, msg, "uSNChanged", ac->seq_num);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
        ret = ldb_msg_add_value(msg, "replPropertyMetaData", &nmd_value, NULL);
        if (ret != LDB_SUCCESS) {
                ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               talloc_free(ac);
+               return ret;
        }
 
        /*
@@ -459,9 +726,16 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
                                ac, replmd_op_callback,
                                req);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
 
+               /* if a control is there remove if from the modified request */
+       if (control && !save_controls(control, down_req, &saved_controls)) {
+               talloc_free(ac);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
        /* go on with the call chain */
        return ldb_next_request(module, down_req);
 }
@@ -475,10 +749,9 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                                      struct ldb_message_element *el,
                                      struct replPropertyMetaDataBlob *omd,
                                      struct dsdb_schema *schema,
-                                     uint64_t seq_num,
+                                     uint64_t *seq_num,
                                      const struct GUID *our_invocation_id,
-                                     NTTIME now,
-                                     bool *modified)
+                                     NTTIME now)
 {
        int i;
        const struct dsdb_attribute *a;
@@ -491,6 +764,10 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
+       if ((a->systemFlags & DS_FLAG_ATTR_NOT_REPLICATED) || (a->systemFlags & DS_FLAG_ATTR_IS_CONSTRUCTED)) {
+               return LDB_SUCCESS;
+       }
+
        for (i=0; i<omd->ctr.ctr1.count; i++) {
                if (a->attributeID_id == omd->ctr.ctr1.array[i].attid) break;
        }
@@ -503,17 +780,28 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
                omd->ctr.ctr1.count++;
+               ZERO_STRUCT(omd->ctr.ctr1.array[i]);
+       }
+
+       /* Get a new sequence number from the backend. We only do this
+        * if we have a change that requires a new
+        * replPropertyMetaData element 
+        */
+       if (*seq_num == 0) {
+               int ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, seq_num);
+               if (ret != LDB_SUCCESS) {
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
        }
 
        md1 = &omd->ctr.ctr1.array[i];
-       md1->version                   = 1;
+       md1->version++;
        md1->attid                     = a->attributeID_id;
        md1->originating_change_time   = now;
        md1->originating_invocation_id = *our_invocation_id;
-       md1->originating_usn           = seq_num;
-       md1->local_usn                 = seq_num;
+       md1->originating_usn           = *seq_num;
+       md1->local_usn                 = *seq_num;
        
-       *modified = true;
        return LDB_SUCCESS;
 }
 
@@ -522,14 +810,13 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
  * object. This is needed for DRS replication, as the merge on the
  * client is based on this object 
  */
-static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
-                             uint64_t seq_num)
+static int replmd_update_rpmd(struct ldb_module *module, 
+                             struct ldb_message *msg, uint64_t *seq_num)
 {
        const struct ldb_val *omd_value;
        enum ndr_err_code ndr_err;
        struct replPropertyMetaDataBlob omd;
        int i;
-       bool modified = false;
        struct dsdb_schema *schema;
        time_t t = time(NULL);
        NTTIME now;
@@ -537,19 +824,23 @@ static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
        int ret;
        const char *attrs[] = { "replPropertyMetaData" , NULL };
        struct ldb_result *res;
+       struct ldb_context *ldb;
+
+       ldb = ldb_module_get_ctx(module);
 
        our_invocation_id = samdb_ntds_invocation_id(ldb);
        if (!our_invocation_id) {
-               ldb_debug_set(ldb, LDB_DEBUG_ERROR,
-                             __location__ ": replmd_update_rpmd: unable to find invocationId\n");
-               return LDB_ERR_OPERATIONS_ERROR;
+               /* this happens during an initial vampire while
+                  updating the schema */
+               DEBUG(5,("No invocationID - skipping replPropertyMetaData update\n"));
+               return LDB_SUCCESS;
        }
 
        unix_to_nt_time(&now, t);
 
        /* search for the existing replPropertyMetaDataBlob */
-       ret = ldb_search(ldb, msg, &res, msg->dn, LDB_SCOPE_BASE, attrs, NULL);
-       if (ret != LDB_SUCCESS || res->count < 1) {
+       ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, attrs);
+       if (ret != LDB_SUCCESS || res->count != 1) {
                DEBUG(0,(__location__ ": Object %s failed to find replPropertyMetaData\n",
                         ldb_dn_get_linearized(msg->dn)));
                return LDB_ERR_OPERATIONS_ERROR;
@@ -582,13 +873,17 @@ static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
 
        for (i=0; i<msg->num_elements; i++) {
                ret = replmd_update_rpmd_element(ldb, msg, &msg->elements[i], &omd, schema, seq_num, 
-                                                our_invocation_id, now, &modified);
+                                                our_invocation_id, now);
                if (ret != LDB_SUCCESS) {
                        return ret;
                }
        }
 
-       if (modified) {
+       /*
+        * replmd_update_rpmd_element has done an update if the
+        * seq_num is set
+        */
+       if (*seq_num != 0) {
                struct ldb_val *md_value;
                struct ldb_message_element *el;
 
@@ -598,6 +893,11 @@ static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
                        return LDB_ERR_OPERATIONS_ERROR;
                }
 
+               ret = replmd_replPropertyMetaDataCtr1_sort(&omd.ctr.ctr1, schema, msg->dn);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
+               }
+
                ndr_err = ndr_push_struct_blob(md_value, msg, 
                                               lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
                                               &omd,
@@ -630,9 +930,10 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        const struct dsdb_schema *schema;
        struct ldb_request *down_req;
        struct ldb_message *msg;
-       int ret;
+       struct ldb_result *res;
        time_t t = time(NULL);
-       uint64_t seq_num;
+       uint64_t seq_num = 0;
+       int ret;
 
        /* do not manipulate our control entries */
        if (ldb_dn_is_special(req->op.mod.message->dn)) {
@@ -647,6 +948,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        if (!schema) {
                ldb_debug_set(ldb, LDB_DEBUG_FATAL,
                              "replmd_modify: no dsdb_schema loaded");
+               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
                return LDB_ERR_CONSTRAINT_VIOLATION;
        }
 
@@ -660,13 +962,12 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
        /* we have to copy the message as the caller might have it as a const */
        msg = ldb_msg_copy_shallow(ac, req->op.mod.message);
        if (msg == NULL) {
+               ldb_oom(ldb);
                talloc_free(ac);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
        /* TODO:
-        * - get the whole old object
-        * - if the old object doesn't exist report an error
         * - give an error when a readonly attribute should
         *   be modified
         * - merge the changed into the old object
@@ -675,29 +976,19 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
         *   attribute was changed
         */
 
-       /* Get a sequence number from the backend */
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       ret = dsdb_search_dn_with_deleted(ldb, msg, &res, msg->dn, NULL);
        if (ret != LDB_SUCCESS) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
-
-       ret = replmd_update_rpmd(ldb, msg, seq_num);
-       if (ret != LDB_SUCCESS) {
-               return ret;
-       }
-
-       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
                talloc_free(ac);
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ret;
        }
 
-       if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
+       ret = replmd_update_rpmd(module, msg, &ac->seq_num);
+       if (ret != LDB_SUCCESS) {
                talloc_free(ac);
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ret;
        }
 
        /* TODO:
-        * - sort the attributes by attid with replmd_ldb_message_sort()
         * - replace the old object with the newly constructed one
         */
 
@@ -707,14 +998,125 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
                                ac, replmd_op_callback,
                                req);
        if (ret != LDB_SUCCESS) {
+               talloc_free(ac);
                return ret;
        }
        talloc_steal(down_req, msg);
 
+       /* we only change whenChanged and uSNChanged if the seq_num
+          has changed */
+       if (seq_num != 0) {
+               if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+                       talloc_free(ac);
+                       return ret;
+               }
+
+               if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
+                       talloc_free(ac);
+                       return ret;
+               }
+       }
+
        /* go on with the call chain */
        return ldb_next_request(module, down_req);
 }
 
+
+/*
+  handle a rename request
+
+  On a rename we need to do an extra ldb_modify which sets the
+  whenChanged and uSNChanged attributes
+ */
+static int replmd_rename(struct ldb_module *module, struct ldb_request *req)
+{
+       struct ldb_context *ldb;
+       int ret, i;
+       time_t t = time(NULL);
+       uint64_t seq_num = 0;
+       struct ldb_message *msg;
+       struct replmd_private *replmd_private = 
+               talloc_get_type(ldb_module_get_private(module), struct replmd_private);
+
+       /* do not manipulate our control entries */
+       if (ldb_dn_is_special(req->op.mod.message->dn)) {
+               return ldb_next_request(module, req);
+       }
+
+       ldb = ldb_module_get_ctx(module);
+
+       ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_rename\n");
+
+       /* Get a sequence number from the backend */
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       msg = ldb_msg_new(req);
+       if (msg == NULL) {
+               ldb_oom(ldb);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       msg->dn = req->op.rename.olddn;
+
+       if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+               talloc_free(msg);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
+
+       if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
+               talloc_free(msg);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       msg->elements[1].flags = LDB_FLAG_MOD_REPLACE;
+
+       ret = ldb_modify(ldb, msg);
+       talloc_free(msg);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       ret = replmd_load_NCs(module);
+       if (ret != 0) {
+               return ret;
+       }
+
+       /* now update the highest uSNs of the partitions that are
+          affected. Note that two partitions could be changing */
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
+                                       req->op.rename.olddn) == 0) {
+                       break;
+               }
+       }
+       if (i == replmd_private->num_ncs) {
+               DEBUG(0,(__location__ ": rename olddn outside tree? %s\n",
+                        ldb_dn_get_linearized(req->op.rename.olddn)));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       replmd_private->ncs[i].mod_usn = seq_num;
+
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               if (ldb_dn_compare_base(replmd_private->ncs[i].dn, 
+                                       req->op.rename.newdn) == 0) {
+                       break;
+               }
+       }
+       if (i == replmd_private->num_ncs) {
+               DEBUG(0,(__location__ ": rename newdn outside tree? %s\n",
+                        ldb_dn_get_linearized(req->op.rename.newdn)));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       replmd_private->ncs[i].mod_usn = seq_num;
+       
+       /* go on with the call chain */
+       return ldb_next_request(module, req);
+}
+
+
 static int replmd_replicated_request_error(struct replmd_replicated_request *ar, int ret)
 {
        return ret;
@@ -815,6 +1217,24 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
                return replmd_replicated_request_error(ar, ret);
        }
 
+       ret = replmd_notify(ar->module, msg->dn, seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       /* remove any message elements that have zero values */
+       for (i=0; i<msg->num_elements; i++) {
+               if (msg->elements[i].num_values == 0) {
+                       DEBUG(4,(__location__ ": Removing attribute %s with num_values==0\n",
+                                msg->elements[i].name));
+                       memmove(&msg->elements[i], 
+                               &msg->elements[i+1], 
+                               sizeof(msg->elements[i])*(msg->num_elements - (i+1)));
+                       msg->num_elements--;
+                       i--;
+               }
+       }
+       
        /*
         * the meta data array is already sorted by the caller
         */
@@ -950,11 +1370,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                }
        }
 
-       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
-       if (ret != LDB_SUCCESS) {
-               return replmd_replicated_request_error(ar, ret);
-       }
-
        /* find existing meta data */
        omd_value = ldb_msg_find_ldb_val(ar->search_msg, "replPropertyMetaData");
        if (omd_value) {
@@ -989,8 +1404,6 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        for (i=0; i < rmd->ctr.ctr1.count; i++) {
                bool found = false;
 
-               rmd->ctr.ctr1.array[i].local_usn = seq_num;
-
                for (j=0; j < ni; j++) {
                        int cmp;
 
@@ -1033,22 +1446,9 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
         *
         * sort the new meta data array
         */
-       {
-               struct replPropertyMetaData1 *rdn_p;
-               uint32_t rdn_idx = omd.ctr.ctr1.count - 1;
-
-               rdn_p = &nmd.ctr.ctr1.array[rdn_idx];
-               replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, &rdn_p->attid);
-       }
-
-       /* create the meta data value */
-       ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
-                                      lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
-                                      &nmd,
-                                      (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
-               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+       ret = replmd_replPropertyMetaDataCtr1_sort(&nmd.ctr.ctr1, ar->schema, msg->dn);
+       if (ret != LDB_SUCCESS) {
+               return ret;
        }
 
        /*
@@ -1065,6 +1465,25 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
        ldb_debug(ldb, LDB_DEBUG_TRACE, "replmd_replicated_apply_merge[%u]: replace %u attributes\n",
                  ar->index_current, msg->num_elements);
 
+       ret = ldb_sequence_number(ldb, LDB_SEQ_NEXT, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
+       for (i=0; i<ni; i++) {
+               nmd.ctr.ctr1.array[i].local_usn = seq_num;
+       }
+
+       /* create the meta data value */
+       ndr_err = ndr_push_struct_blob(&nmd_value, msg, 
+                                      lp_iconv_convenience(ldb_get_opaque(ldb, "loadparm")),
+                                      &nmd,
+                                      (ndr_push_flags_fn_t)ndr_push_replPropertyMetaDataBlob);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               NTSTATUS nt_status = ndr_map_error2ntstatus(ndr_err);
+               return replmd_replicated_request_werror(ar, ntstatus_to_werror(nt_status));
+       }
+
        /*
         * when we know that we'll modify the record, add the whenChanged, uSNChanged
         * and replPopertyMetaData attributes
@@ -1089,6 +1508,11 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
                msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
        }
 
+       ret = replmd_notify(ar->module, msg->dn, seq_num);
+       if (ret != LDB_SUCCESS) {
+               return replmd_replicated_request_error(ar, ret);
+       }
+
        if (DEBUGLVL(4)) {
                char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
                DEBUG(4, ("DRS replication modify message:\n%s\n", s));
@@ -1185,6 +1609,13 @@ static int replmd_replicated_apply_next(struct replmd_replicated_request *ar)
                                   ar,
                                   replmd_replicated_apply_search_callback,
                                   ar->req);
+
+       ret = ldb_request_add_control(search_req, LDB_CONTROL_SHOW_DELETED_OID, true, NULL);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+       
+
        if (ret != LDB_SUCCESS) return replmd_replicated_request_error(ar, ret);
 
        return ldb_next_request(ar->module, search_req);
@@ -1218,12 +1649,6 @@ static int replmd_replicated_uptodate_modify_callback(struct ldb_request *req,
        return ldb_module_done(ar->req, NULL, NULL, LDB_SUCCESS);
 }
 
-static int replmd_drsuapi_DsReplicaCursor2_compare(const struct drsuapi_DsReplicaCursor2 *c1,
-                                                  const struct drsuapi_DsReplicaCursor2 *c2)
-{
-       return GUID_compare(&c1->source_dsa_invocation_id, &c2->source_dsa_invocation_id);
-}
-
 static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *ar)
 {
        struct ldb_context *ldb;
@@ -1378,7 +1803,7 @@ static int replmd_replicated_uptodate_modify(struct replmd_replicated_request *a
         */
        qsort(nuv.ctr.ctr2.cursors, nuv.ctr.ctr2.count,
              sizeof(struct drsuapi_DsReplicaCursor2),
-             (comparison_fn_t)replmd_drsuapi_DsReplicaCursor2_compare);
+             (comparison_fn_t)drsuapi_DsReplicaCursor2_compare);
 
        /*
         * create the change ldb_message
@@ -1621,6 +2046,7 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        if (!ar->schema) {
                ldb_debug_set(ldb, LDB_DEBUG_FATAL, "replmd_ctx_init: no loaded schema found\n");
                talloc_free(ar);
+               DEBUG(0,(__location__ ": %s\n", ldb_errstring(ldb)));
                return LDB_ERR_CONSTRAINT_VIOLATION;
        }
 
@@ -1668,18 +2094,17 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
        for (i=0; i<ar->objs->linked_attributes_count; i++) {
                struct la_entry *la_entry;
 
-               if (replmd_private == NULL) {
-                       DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
-                       return LDB_ERR_OPERATIONS_ERROR;
+               if (replmd_private->la_ctx == NULL) {
+                       replmd_private->la_ctx = talloc_new(replmd_private);
                }
-
-               la_entry = talloc(replmd_private, struct la_entry);
+               la_entry = talloc(replmd_private->la_ctx, struct la_entry);
                if (la_entry == NULL) {
                        ldb_oom(ldb);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
                la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
                if (la_entry->la == NULL) {
+                       talloc_free(la_entry);
                        ldb_oom(ldb);
                        return LDB_ERR_OPERATIONS_ERROR;
                }
@@ -1709,10 +2134,11 @@ static int replmd_process_linked_attribute(struct ldb_module *module,
        struct ldb_message_element *ret_el;
        TALLOC_CTX *tmp_ctx = talloc_new(la_entry);
        enum ndr_err_code ndr_err;
-       char *target_dn;
        struct ldb_request *mod_req;
        int ret;
        const struct dsdb_attribute *attr;
+       struct ldb_dn *target_dn;
+       uint64_t seq_num = 0;
 
 /*
 linked_attributes[0]:                                                     
@@ -1799,7 +2225,9 @@ linked_attributes[0]:
                talloc_free(tmp_ctx);
                return ret;
        }
-       ret_el->values = talloc_array(msg, struct ldb_val, 1);
+       /* we allocate two entries here, in case we need a remove/add
+          pair */
+       ret_el->values = talloc_array(msg, struct ldb_val, 2);
        if (!ret_el->values) {
                ldb_oom(ldb);
                talloc_free(tmp_ctx);
@@ -1807,16 +2235,37 @@ linked_attributes[0]:
        }
        ret_el->num_values = 1;
 
-       target_dn = talloc_asprintf(tmp_ctx, "<GUID=%s>;<SID=%s>;%s",
-                                   GUID_string(tmp_ctx, &target.guid),
-                                   dom_sid_string(tmp_ctx, &target.sid),
-                                   target.dn);
-       if (target_dn == NULL) {
-               ldb_oom(ldb);
+       ret = dsdb_find_dn_by_guid(ldb, tmp_ctx, GUID_string(tmp_ctx, &target.guid), &target_dn);
+       if (ret != LDB_SUCCESS) {
+               DEBUG(0,(__location__ ": Failed to map GUID %s to DN\n", GUID_string(tmp_ctx, &target.guid)));
                talloc_free(tmp_ctx);
                return LDB_ERR_OPERATIONS_ERROR;
        }
-       ret_el->values[0] = data_blob_string_const(target_dn);
+
+       ret_el->values[0].data = (uint8_t *)ldb_dn_get_extended_linearized(tmp_ctx, target_dn, 1);
+       ret_el->values[0].length = strlen((char *)ret_el->values[0].data);
+
+       ret = replmd_update_rpmd(module, msg, &seq_num);
+       if (ret != LDB_SUCCESS) {
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       /* we only change whenChanged and uSNChanged if the seq_num
+          has changed */
+       if (seq_num != 0) {
+               time_t t = time(NULL);
+
+               if (add_time_element(msg, "whenChanged", t) != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+
+               if (add_uint64_element(msg, "uSNChanged", seq_num) != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+       }
 
        ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
                                msg,
@@ -1844,10 +2293,48 @@ linked_attributes[0]:
                ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
        }
 
+       if (ret == LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
+               /* the link destination exists, we need to update it
+                * by deleting the old one for the same DN then adding
+                * the new one */
+               msg->elements = talloc_realloc(msg, msg->elements,
+                                              struct ldb_message_element,
+                                              msg->num_elements+1);
+               if (msg->elements == NULL) {
+                       ldb_oom(ldb);
+                       talloc_free(tmp_ctx);
+                       return LDB_ERR_OPERATIONS_ERROR;
+               }
+               /* this relies on the backend matching the old entry
+                  only by the DN portion of the extended DN */
+               msg->elements[1] = msg->elements[0];
+               msg->elements[0].flags = LDB_FLAG_MOD_DELETE;
+               msg->num_elements++;
+
+               ret = ldb_build_mod_req(&mod_req, ldb, tmp_ctx,
+                                       msg,
+                                       NULL,
+                                       NULL, 
+                                       ldb_op_default_callback,
+                                       NULL);
+               if (ret != LDB_SUCCESS) {
+                       talloc_free(tmp_ctx);
+                       return ret;
+               }
+
+               /* Run the new request */
+               ret = ldb_next_request(module, mod_req);
+               
+               if (ret == LDB_SUCCESS) {
+                       ret = ldb_wait(mod_req->handle, LDB_WAIT_ALL);
+               }
+       }
+
        if (ret != LDB_SUCCESS) {
                ldb_debug(ldb, LDB_DEBUG_WARNING, "Failed to apply linked attribute change '%s' %s\n",
                          ldb_errstring(ldb),
                          ldb_ldif_message_string(ldb, tmp_ctx, LDB_CHANGETYPE_MODIFY, msg));
+               ret = LDB_SUCCESS;
        }
        
        talloc_free(tmp_ctx);
@@ -1875,15 +2362,17 @@ static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
 static int replmd_start_transaction(struct ldb_module *module)
 {
        /* create our private structure for this transaction */
+       int i;
        struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
                                                                struct replmd_private);
-       talloc_free(replmd_private);
-       replmd_private = talloc(module, struct replmd_private);
-       if (replmd_private == NULL) {
-               return LDB_ERR_OPERATIONS_ERROR;
-       }
+       talloc_free(replmd_private->la_ctx);
        replmd_private->la_list = NULL;
-       ldb_module_set_private(module, replmd_private);
+       replmd_private->la_ctx = NULL;
+
+       for (i=0; i<replmd_private->num_ncs; i++) {
+               replmd_private->ncs[i].mod_usn = 0;
+       }
+
        return ldb_next_start_trans(module);
 }
 
@@ -1895,23 +2384,35 @@ static int replmd_prepare_commit(struct ldb_module *module)
 {
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-       struct la_entry *la;
+       struct la_entry *la, *prev;
+       int ret;
 
        /* walk the list backwards, to do the first entry first, as we
         * added the entries with DLIST_ADD() which puts them at the
         * start of the list */
        for (la = replmd_private->la_list; la && la->next; la=la->next) ;
 
-       for (; la; la=la->prev) {
-               int ret;
+       for (; la; la=prev) {
+               prev = la->prev;
+               DLIST_REMOVE(replmd_private->la_list, la);
                ret = replmd_process_linked_attribute(module, la);
                if (ret != LDB_SUCCESS) {
+                       talloc_free(replmd_private->la_ctx);
+                       replmd_private->la_list = NULL;
+                       replmd_private->la_ctx = NULL;
                        return ret;
                }
        }
 
-       talloc_free(replmd_private);
-       ldb_module_set_private(module, NULL);
+       talloc_free(replmd_private->la_ctx);
+       replmd_private->la_list = NULL;
+       replmd_private->la_ctx = NULL;
+
+       /* possibly change @REPLCHANGED */
+       ret = replmd_notify_store(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
        
        return ldb_next_prepare_commit(module);
 }
@@ -1920,17 +2421,20 @@ static int replmd_del_transaction(struct ldb_module *module)
 {
        struct replmd_private *replmd_private = 
                talloc_get_type(ldb_module_get_private(module), struct replmd_private);
-       talloc_free(replmd_private);
-       ldb_module_set_private(module, NULL);
+       talloc_free(replmd_private->la_ctx);
+       replmd_private->la_list = NULL;
+       replmd_private->la_ctx = NULL;
        return ldb_next_del_trans(module);
 }
 
 
 _PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
        .name          = "repl_meta_data",
-       .add           = replmd_add,
-       .modify        = replmd_modify,
-       .extended      = replmd_extended,
+       .init_context      = replmd_init,
+       .add               = replmd_add,
+       .modify            = replmd_modify,
+       .rename            = replmd_rename,
+       .extended          = replmd_extended,
        .start_transaction = replmd_start_transaction,
        .prepare_commit    = replmd_prepare_commit,
        .del_transaction   = replmd_del_transaction,