s4-drs: calculate and send a uptodateness_vector with replication requests
authorAndrew Tridgell <tridge@samba.org>
Sat, 9 Jan 2010 03:29:39 +0000 (14:29 +1100)
committerAndrew Tridgell <tridge@samba.org>
Sat, 9 Jan 2010 07:56:29 +0000 (18:56 +1100)
This stops us getting objects changes twice if they came via an
indirect path.

source4/dsdb/repl/drepl_out_helpers.c
source4/dsdb/repl/drepl_partitions.c

index 5666a660ada4bf3880a6e63ed088510ca88f6d98..a4f5d1faec643c9d6a9b44c1494e410f497013aa 100644 (file)
@@ -261,6 +261,7 @@ static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
        struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
        struct rpc_request *rreq;
        struct drsuapi_DsGetNCChanges *r;
        struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
        struct rpc_request *rreq;
        struct drsuapi_DsGetNCChanges *r;
+       struct drsuapi_DsReplicaCursorCtrEx *uptodateness_vector;
 
        r = talloc(state, struct drsuapi_DsGetNCChanges);
        if (tevent_req_nomem(r, req)) {
 
        r = talloc(state, struct drsuapi_DsGetNCChanges);
        if (tevent_req_nomem(r, req)) {
@@ -280,6 +281,12 @@ static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
                return;
        }
 
                return;
        }
 
+       if (partition->uptodatevector_ex.count == 0) {
+               uptodateness_vector = NULL;
+       } else {
+               uptodateness_vector = &partition->uptodatevector_ex;
+       }
+
        r->in.bind_handle       = &drsuapi->bind_handle;
        if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
                r->in.level                             = 8;
        r->in.bind_handle       = &drsuapi->bind_handle;
        if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
                r->in.level                             = 8;
@@ -287,7 +294,7 @@ static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
                r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
                r->in.req->req8.naming_context          = &partition->nc;
                r->in.req->req8.highwatermark           = rf1->highwatermark;
                r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
                r->in.req->req8.naming_context          = &partition->nc;
                r->in.req->req8.highwatermark           = rf1->highwatermark;
-               r->in.req->req8.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req->req8.uptodateness_vector     = uptodateness_vector;
                r->in.req->req8.replica_flags           = rf1->replica_flags;
                r->in.req->req8.max_object_count        = 133;
                r->in.req->req8.max_ndr_size            = 1336811;
                r->in.req->req8.replica_flags           = rf1->replica_flags;
                r->in.req->req8.max_object_count        = 133;
                r->in.req->req8.max_ndr_size            = 1336811;
@@ -303,7 +310,7 @@ static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
                r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
                r->in.req->req5.naming_context          = &partition->nc;
                r->in.req->req5.highwatermark           = rf1->highwatermark;
                r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
                r->in.req->req5.naming_context          = &partition->nc;
                r->in.req->req5.highwatermark           = rf1->highwatermark;
-               r->in.req->req5.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req->req5.uptodateness_vector     = uptodateness_vector;
                r->in.req->req5.replica_flags           = rf1->replica_flags;
                r->in.req->req5.max_object_count        = 133;
                r->in.req->req5.max_ndr_size            = 1336770;
                r->in.req->req5.replica_flags           = rf1->replica_flags;
                r->in.req->req5.max_object_count        = 133;
                r->in.req->req5.max_ndr_size            = 1336770;
@@ -311,6 +318,10 @@ static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
                r->in.req->req5.fsmo_info               = state->op->fsmo_info;
        }
 
                r->in.req->req5.fsmo_info               = state->op->fsmo_info;
        }
 
+#if 0
+       NDR_PRINT_IN_DEBUG(drsuapi_DsGetNCChanges, r);
+#endif
+
        rreq = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
        if (tevent_req_nomem(rreq, req)) {
                return;
        rreq = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
        if (tevent_req_nomem(rreq, req)) {
                return;
index f5c8a701a9dba717ad536d9db88d382cab2e0c21..aba7735440a007b02c2b118a03dc5ea77681be12 100644 (file)
@@ -188,6 +188,65 @@ static WERROR dreplsrv_partition_add_source_dsa(struct dreplsrv_service *s,
        return WERR_OK;
 }
 
        return WERR_OK;
 }
 
+/*
+  convert from one udv format to the other
+ */
+static WERROR udv_convert(TALLOC_CTX *mem_ctx,
+                         const struct replUpToDateVectorCtr2 *udv,
+                         struct drsuapi_DsReplicaCursorCtrEx *udv_ex)
+{
+       int i;
+
+       udv_ex->version = 2;
+       udv_ex->reserved1 = 0;
+       udv_ex->reserved2 = 0;
+       udv_ex->count = udv->count;
+       udv_ex->cursors = talloc_array(mem_ctx, struct drsuapi_DsReplicaCursor, udv->count);
+       W_ERROR_HAVE_NO_MEMORY(udv_ex->cursors);
+
+       for (i=0; i<udv->count; i++) {
+               udv_ex->cursors[i].source_dsa_invocation_id = udv->cursors[i].source_dsa_invocation_id;
+               udv_ex->cursors[i].highest_usn = udv->cursors[i].highest_usn;
+       }
+
+       return WERR_OK;
+}
+
+/*
+  add our local UDV element for the partition
+ */
+static WERROR add_local_udv(struct dreplsrv_service *s,
+                           struct dreplsrv_partition *p,
+                           const struct GUID *our_invocation_id,
+                           struct drsuapi_DsReplicaCursorCtrEx *udv)
+{
+       int ret;
+       uint64_t highest_usn;
+       int i;
+
+       ret = dsdb_load_partition_usn(s->samdb, p->dn, &highest_usn);
+       if (ret != LDB_SUCCESS) {
+               /* nothing to add */
+               return WERR_OK;
+       }
+
+       for (i=0; i<udv->count; i++) {
+               if (GUID_equal(our_invocation_id, &udv->cursors[i].source_dsa_invocation_id)) {
+                       udv->cursors[i].highest_usn = highest_usn;
+                       return WERR_OK;
+               }
+       }
+
+       udv->cursors = talloc_realloc(p, udv->cursors, struct drsuapi_DsReplicaCursor, udv->count+1);
+       W_ERROR_HAVE_NO_MEMORY(udv->cursors);
+
+       udv->cursors[udv->count].source_dsa_invocation_id = *our_invocation_id;
+       udv->cursors[udv->count].highest_usn = highest_usn;
+       udv->count++;
+
+       return WERR_OK;
+}
+
 static WERROR dreplsrv_refresh_partition(struct dreplsrv_service *s,
                                         struct dreplsrv_partition *p)
 {
 static WERROR dreplsrv_refresh_partition(struct dreplsrv_service *s,
                                         struct dreplsrv_partition *p)
 {
@@ -232,6 +291,11 @@ static WERROR dreplsrv_refresh_partition(struct dreplsrv_service *s,
                talloc_free(nc_sid);
        }
 
                talloc_free(nc_sid);
        }
 
+       talloc_free(p->uptodatevector.cursors);
+       talloc_free(p->uptodatevector_ex.cursors);
+       ZERO_STRUCT(p->uptodatevector);
+       ZERO_STRUCT(p->uptodatevector_ex);
+
        ouv_value = ldb_msg_find_ldb_val(r->msgs[0], "replUpToDateVector");
        if (ouv_value) {
                enum ndr_err_code ndr_err;
        ouv_value = ldb_msg_find_ldb_val(r->msgs[0], "replUpToDateVector");
        if (ouv_value) {
                enum ndr_err_code ndr_err;
@@ -251,14 +315,14 @@ static WERROR dreplsrv_refresh_partition(struct dreplsrv_service *s,
 
                p->uptodatevector.count         = ouv.ctr.ctr2.count;
                p->uptodatevector.reserved      = ouv.ctr.ctr2.reserved;
 
                p->uptodatevector.count         = ouv.ctr.ctr2.count;
                p->uptodatevector.reserved      = ouv.ctr.ctr2.reserved;
-               talloc_free(p->uptodatevector.cursors);
                p->uptodatevector.cursors       = talloc_steal(p, ouv.ctr.ctr2.cursors);
                p->uptodatevector.cursors       = talloc_steal(p, ouv.ctr.ctr2.cursors);
-       }
 
 
-       /*
-        * TODO: add our own uptodatevector cursor
-        */
+               status = udv_convert(p, &p->uptodatevector, &p->uptodatevector_ex);
+               W_ERROR_NOT_OK_RETURN(status);
 
 
+               status = add_local_udv(s, p, samdb_ntds_invocation_id(s->samdb), &p->uptodatevector_ex);
+               W_ERROR_NOT_OK_RETURN(status);
+       }
 
        orf_el = ldb_msg_find_element(r->msgs[0], "repsFrom");
        if (orf_el) {
 
        orf_el = ldb_msg_find_element(r->msgs[0], "repsFrom");
        if (orf_el) {