s4-repl: added a preiodic notification check to the repl task
[ira/wip.git] / source4 / dsdb / repl / drepl_out_helpers.c
index d79d94fc12a7d7ec71134609c346ce2e8339fe0b..168aacdde9844bc5ca6920b0b5a6de58a1e90294 100644 (file)
 #include "lib/messaging/irpc.h"
 #include "dsdb/repl/drepl_service.h"
 #include "lib/ldb/include/ldb_errors.h"
-#include "lib/util/dlinklist.h"
+#include "../lib/util/dlinklist.h"
 #include "librpc/gen_ndr/ndr_misc.h"
 #include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/composite/composite.h"
 #include "auth/gensec/gensec.h"
+#include "param/param.h"
 
 struct dreplsrv_out_drsuapi_state {
        struct composite_context *creq;
@@ -47,7 +48,7 @@ struct dreplsrv_out_drsuapi_state {
 
 static void dreplsrv_out_drsuapi_connect_recv(struct composite_context *creq);
 
-static struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn)
+struct composite_context *dreplsrv_out_drsuapi_send(struct dreplsrv_out_connection *conn)
 {
        struct composite_context *c;
        struct composite_context *creq;
@@ -142,10 +143,19 @@ static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req)
                        info24 = &st->bind_r.out.bind_info->info.info24;
                        st->drsuapi->remote_info28.supported_extensions = info24->supported_extensions;
                        st->drsuapi->remote_info28.site_guid            = info24->site_guid;
-                       st->drsuapi->remote_info28.u1                   = info24->u1;
+                       st->drsuapi->remote_info28.pid                  = info24->pid;
                        st->drsuapi->remote_info28.repl_epoch           = 0;
                        break;
                }
+               case 48: {
+                       struct drsuapi_DsBindInfo48 *info48;
+                       info48 = &st->bind_r.out.bind_info->info.info48;
+                       st->drsuapi->remote_info28.supported_extensions = info48->supported_extensions;
+                       st->drsuapi->remote_info28.site_guid            = info48->site_guid;
+                       st->drsuapi->remote_info28.pid                  = info48->pid;
+                       st->drsuapi->remote_info28.repl_epoch           = info48->repl_epoch;
+                       break;
+               }
                case 28:
                        st->drsuapi->remote_info28 = st->bind_r.out.bind_info->info.info28;
                        break;
@@ -155,7 +165,7 @@ static void dreplsrv_out_drsuapi_bind_recv(struct rpc_request *req)
        composite_done(c);
 }
 
-static NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c)
+NTSTATUS dreplsrv_out_drsuapi_recv(struct composite_context *c)
 {
        NTSTATUS status;
        struct dreplsrv_out_drsuapi_state *st = talloc_get_type(c->private_data,
@@ -237,40 +247,42 @@ static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_sou
        r = talloc(st, struct drsuapi_DsGetNCChanges);
        if (composite_nomem(r, c)) return;
 
-       r->in.level = talloc(r, int32_t);
-       if (composite_nomem(r->in.level, c)) return;
-       r->out.level = talloc(r, int32_t);
-       if (composite_nomem(r->out.level, c)) return;
+       r->out.level_out = talloc(r, int32_t);
+       if (composite_nomem(r->out.level_out, c)) return;
+       r->in.req = talloc(r, union drsuapi_DsGetNCChangesRequest);
+       if (composite_nomem(r->in.req, c)) return;
+       r->out.ctr = talloc(r, union drsuapi_DsGetNCChangesCtr);
+       if (composite_nomem(r->out.ctr, c)) return;
 
        r->in.bind_handle       = &drsuapi->bind_handle;
        if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
-               *r->in.level                            = 8;
-               r->in.req.req8.destination_dsa_guid     = service->ntds_guid;
-               r->in.req.req8.source_dsa_invocation_id = rf1->source_dsa_invocation_id;
-               r->in.req.req8.naming_context           = &partition->nc;
-               r->in.req.req8.highwatermark            = rf1->highwatermark;
-               r->in.req.req8.uptodateness_vector      = NULL;/*&partition->uptodatevector_ex;*/
-               r->in.req.req8.replica_flags            = rf1->replica_flags;
-               r->in.req.req8.max_object_count         = 133;
-               r->in.req.req8.max_ndr_size             = 1336811;
-               r->in.req.req8.extended_op              = DRSUAPI_EXOP_NONE;
-               r->in.req.req8.fsmo_info                = 0;
-               r->in.req.req8.partial_attribute_set    = NULL;
-               r->in.req.req8.partial_attribute_set_ex = NULL;
-               r->in.req.req8.mapping_ctr.num_mappings = 0;
-               r->in.req.req8.mapping_ctr.mappings     = NULL;
+               r->in.level                             = 8;
+               r->in.req->req8.destination_dsa_guid    = service->ntds_guid;
+               r->in.req->req8.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
+               r->in.req->req8.naming_context          = &partition->nc;
+               r->in.req->req8.highwatermark           = rf1->highwatermark;
+               r->in.req->req8.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req->req8.replica_flags           = rf1->replica_flags;
+               r->in.req->req8.max_object_count        = 133;
+               r->in.req->req8.max_ndr_size            = 1336811;
+               r->in.req->req8.extended_op             = DRSUAPI_EXOP_NONE;
+               r->in.req->req8.fsmo_info               = 0;
+               r->in.req->req8.partial_attribute_set   = NULL;
+               r->in.req->req8.partial_attribute_set_ex= NULL;
+               r->in.req->req8.mapping_ctr.num_mappings= 0;
+               r->in.req->req8.mapping_ctr.mappings    = NULL;
        } else {
-               *r->in.level                            = 5;
-               r->in.req.req5.destination_dsa_guid     = service->ntds_guid;
-               r->in.req.req5.source_dsa_invocation_id = rf1->source_dsa_invocation_id;
-               r->in.req.req5.naming_context           = &partition->nc;
-               r->in.req.req5.highwatermark            = rf1->highwatermark;
-               r->in.req.req5.uptodateness_vector      = NULL;/*&partition->uptodatevector_ex;*/
-               r->in.req.req5.replica_flags            = rf1->replica_flags;
-               r->in.req.req5.max_object_count         = 133;
-               r->in.req.req5.max_ndr_size             = 1336770;
-               r->in.req.req5.extended_op              = DRSUAPI_EXOP_NONE;
-               r->in.req.req5.fsmo_info                = 0;
+               r->in.level                             = 5;
+               r->in.req->req5.destination_dsa_guid    = service->ntds_guid;
+               r->in.req->req5.source_dsa_invocation_id= rf1->source_dsa_invocation_id;
+               r->in.req->req5.naming_context          = &partition->nc;
+               r->in.req->req5.highwatermark           = rf1->highwatermark;
+               r->in.req->req5.uptodateness_vector     = NULL;/*&partition->uptodatevector_ex;*/
+               r->in.req->req5.replica_flags           = rf1->replica_flags;
+               r->in.req->req5.max_object_count        = 133;
+               r->in.req->req5.max_ndr_size            = 1336770;
+               r->in.req->req5.extended_op             = DRSUAPI_EXOP_NONE;
+               r->in.req->req5.fsmo_info               = 0;
        }
 
        req = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
@@ -302,28 +314,50 @@ static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
                return;
        }
 
-       if (*r->out.level == 1) {
+       if (*r->out.level_out == 1) {
                ctr_level = 1;
-               ctr1 = &r->out.ctr.ctr1;
-       } else if (*r->out.level == 2) {
+               ctr1 = &r->out.ctr->ctr1;
+       } else if (*r->out.level_out == 2 &&
+                  r->out.ctr->ctr2.mszip1.ts) {
                ctr_level = 1;
-               ctr1 = r->out.ctr.ctr2.ctr.mszip1.ctr1;
-       } else if (*r->out.level == 6) {
+               ctr1 = &r->out.ctr->ctr2.mszip1.ts->ctr1;
+       } else if (*r->out.level_out == 6) {
                ctr_level = 6;
-               ctr6 = &r->out.ctr.ctr6;
-       } else if (*r->out.level == 7 &&
-                  r->out.ctr.ctr7.level == 6 &&
-                  r->out.ctr.ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP) {
+               ctr6 = &r->out.ctr->ctr6;
+       } else if (*r->out.level_out == 7 &&
+                  r->out.ctr->ctr7.level == 6 &&
+                  r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_MSZIP &&
+                  r->out.ctr->ctr7.ctr.mszip6.ts) {
                ctr_level = 6;
-               ctr6 = r->out.ctr.ctr7.ctr.mszip6.ctr6;
+               ctr6 = &r->out.ctr->ctr7.ctr.mszip6.ts->ctr6;
+       } else if (*r->out.level_out == 7 &&
+                  r->out.ctr->ctr7.level == 6 &&
+                  r->out.ctr->ctr7.type == DRSUAPI_COMPRESSION_TYPE_XPRESS &&
+                  r->out.ctr->ctr7.ctr.xpress6.ts) {
+               ctr_level = 6;
+               ctr6 = &r->out.ctr->ctr7.ctr.xpress6.ts->ctr6;
        } else {
                composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
                return;
        }
 
+       if (!ctr1 && !ctr6) {
+               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               return;
+       }
+
+       if (ctr_level == 6) {
+               if (!W_ERROR_IS_OK(ctr6->drs_error)) {
+                       composite_error(c, werror_to_ntstatus(ctr6->drs_error));
+                       return;
+               }
+       }
+
        dreplsrv_op_pull_source_apply_changes_send(st, r, ctr_level, ctr1, ctr6);
 }
 
+static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st);
+
 static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
                                                       struct drsuapi_DsGetNCChanges *r,
                                                       uint32_t ctr_level,
@@ -353,6 +387,7 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
                linked_attributes               = NULL;
                rf1.highwatermark               = ctr1->new_highwatermark;
                uptodateness_vector             = NULL; /* TODO: map it */
+               more_data                       = ctr1->more_data;
                break;
        case 6:
                mapping_ctr                     = &ctr6->mapping_ctr;
@@ -362,6 +397,7 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
                linked_attributes               = ctr6->linked_attributes;
                rf1.highwatermark               = ctr6->new_highwatermark;
                uptodateness_vector             = ctr6->uptodateness_vector;
+               more_data                       = ctr6->more_data;
                break;
        default:
                composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
@@ -392,20 +428,17 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
         * TODO: update our uptodatevector!
         */
 
-       /*
-        * if the tmp_highest_usn is higher than highest_usn
-        * there's more to pull from this source_dsa
-        */
-       if (rf1.highwatermark.tmp_highest_usn > rf1.highwatermark.highest_usn) {
-               more_data = true;
-       }
-
        if (more_data) {
                dreplsrv_op_pull_source_get_changes_send(st);
                return;
        }
 
-       composite_done(c);
+       /* now we need to update the repsTo record for this partition
+          on the server. These records are initially established when
+          we join the domain, but they quickly expire.  We do it here
+          so we can use the already established DRSUAPI pipe
+       */
+       dreplsrv_update_refs_send(st);
 }
 
 WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
@@ -417,3 +450,79 @@ WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
        talloc_free(c);
        return ntstatus_to_werror(status);
 }
+
+/*
+  receive a UpdateRefs reply
+ */
+static void dreplsrv_update_refs_recv(struct rpc_request *req)
+{
+       struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
+                                                  struct dreplsrv_op_pull_source_state);
+       struct composite_context *c = st->creq;
+       struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(req->ndr.struct_ptr,
+                                                               struct drsuapi_DsReplicaUpdateRefs);
+
+       c->status = dcerpc_ndr_request_recv(req);
+       if (!composite_is_ok(c)) {
+               DEBUG(0,("UpdateRefs failed with %s\n", 
+                        nt_errstr(c->status)));
+               return;
+       }
+
+       if (!W_ERROR_IS_OK(r->out.result)) {
+               DEBUG(0,("UpdateRefs failed with %s for %s %s\n", 
+                        win_errstr(r->out.result),
+                        r->in.req.req1.dest_dsa_dns_name,
+                        r->in.req.req1.naming_context->dn));
+               composite_error(c, werror_to_ntstatus(r->out.result));
+               return;
+       }
+
+       DEBUG(4,("UpdateRefs OK for %s %s\n", 
+                r->in.req.req1.dest_dsa_dns_name,
+                r->in.req.req1.naming_context->dn));
+
+       composite_done(c);
+}
+
+/*
+  send a UpdateRefs request to refresh our repsTo record on the server
+ */
+static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st)
+{
+       struct composite_context *c = st->creq;
+       struct dreplsrv_service *service = st->op->service;
+       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
+       struct rpc_request *req;
+       struct drsuapi_DsReplicaUpdateRefs *r;
+       char *ntds_guid_str;
+       char *ntds_dns_name;
+
+       r = talloc(st, struct drsuapi_DsReplicaUpdateRefs);
+       if (composite_nomem(r, c)) return;
+
+       ntds_guid_str = GUID_string(r, &service->ntds_guid);
+       if (composite_nomem(ntds_guid_str, c)) return;
+
+       /* lp_realm() is not really right here */
+       ntds_dns_name = talloc_asprintf(r, "%s._msdcs.%s",
+                                       ntds_guid_str,
+                                       lp_realm(service->task->lp_ctx));
+       if (composite_nomem(ntds_dns_name, c)) return;
+
+       r->in.bind_handle       = &drsuapi->bind_handle;
+       r->in.level             = 1;
+       r->in.req.req1.naming_context     = &partition->nc;
+       r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
+       r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
+       r->in.req.req1.options            = 
+               DRSUAPI_DS_REPLICA_UPDATE_ADD_REFERENCE |
+               DRSUAPI_DS_REPLICA_UPDATE_DELETE_REFERENCE;
+       if (!lp_parm_bool(service->task->lp_ctx, NULL, "repl", "RODC", false)) {
+               r->in.req.req1.options |= DRSUAPI_DS_REPLICA_UPDATE_WRITEABLE;
+       }
+
+       req = dcerpc_drsuapi_DsReplicaUpdateRefs_send(drsuapi->pipe, r, r);
+       composite_continue_rpc(c, req, dreplsrv_update_refs_recv, st);
+}