s4:drepl_out_helpers: don't look at the internals of 'struct rpc_request'
[ira/wip.git] / source4 / dsdb / repl / drepl_notify.c
index 73280917c52770c27936004bb4c32df637af1027..c3ca5ed31018b5b147d951d48078d304ad358a0a 100644 (file)
 #include "librpc/gen_ndr/ndr_drsuapi.h"
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/composite/composite.h"
+#include "../lib/util/tevent_ntstatus.h"
 
 
 struct dreplsrv_op_notify_state {
-       struct composite_context *creq;
-
-       struct dreplsrv_out_connection *conn;
-
-       struct dreplsrv_drsuapi_connection *drsuapi;
-
-       struct drsuapi_DsBindInfoCtr bind_info_ctr;
-       struct drsuapi_DsBind bind_r;
        struct dreplsrv_notify_operation *op;
+       void *ndr_struct_ptr;
 };
 
+static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq);
+
 /*
-  receive a DsReplicaSync reply
+  start the ReplicaSync async call
  */
-static void dreplsrv_op_notify_replica_sync_recv(struct rpc_request *req)
+static struct tevent_req *dreplsrv_op_notify_send(TALLOC_CTX *mem_ctx,
+                                                 struct tevent_context *ev,
+                                                 struct dreplsrv_notify_operation *op)
 {
-       struct dreplsrv_op_notify_state *st = talloc_get_type(req->async.private_data,
-                                                             struct dreplsrv_op_notify_state);
-       struct composite_context *c = st->creq;
-       struct drsuapi_DsReplicaSync *r = talloc_get_type(req->ndr.struct_ptr,
-                                                         struct drsuapi_DsReplicaSync);
+       struct tevent_req *req;
+       struct dreplsrv_op_notify_state *state;
+       struct tevent_req *subreq;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct dreplsrv_op_notify_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->op = op;
 
-       c->status = dcerpc_ndr_request_recv(req);
-       if (!composite_is_ok(c)) return;
+       subreq = dreplsrv_out_drsuapi_send(state,
+                                          ev,
+                                          op->source_dsa->conn);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, dreplsrv_op_notify_connect_done, req);
 
-       if (!W_ERROR_IS_OK(r->out.result)) {
-               composite_error(c, werror_to_ntstatus(r->out.result));
+       return req;
+}
+
+static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req);
+
+static void dreplsrv_op_notify_connect_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                                         struct tevent_req);
+       NTSTATUS status;
+
+       status = dreplsrv_out_drsuapi_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (tevent_req_nterror(req, status)) {
                return;
        }
 
-       composite_done(c);
+       dreplsrv_op_notify_replica_sync_trigger(req);
 }
 
-/*
-  send a DsReplicaSync
-*/
-static void dreplsrv_op_notify_replica_sync_send(struct dreplsrv_op_notify_state *st)
+static void dreplsrv_op_notify_replica_sync_done(struct rpc_request *rreq);
+
+static void dreplsrv_op_notify_replica_sync_trigger(struct tevent_req *req)
 {
-       struct composite_context *c = st->creq;
-       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
-       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
-       struct rpc_request *req;
+       struct dreplsrv_op_notify_state *state =
+               tevent_req_data(req,
+               struct dreplsrv_op_notify_state);
+       struct dreplsrv_partition *partition = state->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
+       struct rpc_request *rreq;
        struct drsuapi_DsReplicaSync *r;
 
-       r = talloc_zero(st, struct drsuapi_DsReplicaSync);
-       if (composite_nomem(r, c)) return;
-
+       r = talloc_zero(state, struct drsuapi_DsReplicaSync);
+       if (tevent_req_nomem(r, req)) {
+               return;
+       }
+       r->in.req = talloc_zero(r, union drsuapi_DsReplicaSyncRequest);
+       if (tevent_req_nomem(r, req)) {
+               return;
+       }
        r->in.bind_handle       = &drsuapi->bind_handle;
        r->in.level = 1;
-       r->in.req.req1.naming_context = &partition->nc;
-       r->in.req.req1.source_dsa_guid = st->op->service->ntds_guid;
-       r->in.req.req1.options = 
-               DRSUAPI_DS_REPLICA_SYNC_ASYNCHRONOUS_OPERATION |
-               DRSUAPI_DS_REPLICA_SYNC_WRITEABLE |
-               DRSUAPI_DS_REPLICA_SYNC_ALL_SOURCES;
-       
-
-       req = dcerpc_drsuapi_DsReplicaSync_send(drsuapi->pipe, r, r);
-       composite_continue_rpc(c, req, dreplsrv_op_notify_replica_sync_recv, st);
-}
-
-/*
-  called when we have an established connection
- */
-static void dreplsrv_op_notify_connect_recv(struct composite_context *creq)
-{
-       struct dreplsrv_op_notify_state *st = talloc_get_type(creq->async.private_data,
-                                                             struct dreplsrv_op_notify_state);
-       struct composite_context *c = st->creq;
+       r->in.req->req1.naming_context = &partition->nc;
+       r->in.req->req1.source_dsa_guid = state->op->service->ntds_guid;
+       r->in.req->req1.options =
+               DRSUAPI_DRS_ASYNC_OP |
+               DRSUAPI_DRS_UPDATE_NOTIFICATION |
+               DRSUAPI_DRS_WRIT_REP;
+
+       if (state->op->is_urgent) {
+               r->in.req->req1.options |= DRSUAPI_DRS_SYNC_URGENT;
+       }
 
-       c->status = dreplsrv_out_drsuapi_recv(creq);
-       if (!composite_is_ok(c)) return;
+       state->ndr_struct_ptr = r;
 
-       dreplsrv_op_notify_replica_sync_send(st);
+       rreq = dcerpc_drsuapi_DsReplicaSync_send(drsuapi->pipe, r, r);
+       if (tevent_req_nomem(rreq, req)) {
+               return;
+       }
+       composite_continue_rpc(NULL, rreq, dreplsrv_op_notify_replica_sync_done, req);
 }
 
-/*
-  start the ReplicaSync async call
- */
-static struct composite_context *dreplsrv_op_notify_send(struct dreplsrv_notify_operation *op)
+static void dreplsrv_op_notify_replica_sync_done(struct rpc_request *rreq)
 {
-       struct composite_context *c;
-       struct composite_context *creq;
-       struct dreplsrv_op_notify_state *st;
+       struct tevent_req *req = talloc_get_type(rreq->async.private_data,
+                                                struct tevent_req);
+       struct dreplsrv_op_notify_state *state =
+               tevent_req_data(req,
+               struct dreplsrv_op_notify_state);
+       struct drsuapi_DsReplicaSync *r = talloc_get_type(state->ndr_struct_ptr,
+                                                         struct drsuapi_DsReplicaSync);
+       NTSTATUS status;
 
-       c = composite_create(op, op->service->task->event_ctx);
-       if (c == NULL) return NULL;
+       state->ndr_struct_ptr = NULL;
 
-       st = talloc_zero(c, struct dreplsrv_op_notify_state);
-       if (composite_nomem(st, c)) return c;
+       status = dcerpc_drsuapi_DsReplicaSync_recv(rreq);
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
 
-       st->creq        = c;
-       st->op          = op;
+       if (!W_ERROR_IS_OK(r->out.result)) {
+               status = werror_to_ntstatus(r->out.result);
+               tevent_req_nterror(req, status);
+               return;
+       }
 
-       creq = dreplsrv_out_drsuapi_send(op->source_dsa->conn);
-       composite_continue(c, creq, dreplsrv_op_notify_connect_recv, st);
+       tevent_req_done(req);
+}
 
-       return c;
+static NTSTATUS dreplsrv_op_notify_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_ntstatus(req);
 }
 
 static void dreplsrv_notify_del_repsTo(struct dreplsrv_notify_operation *op)
@@ -172,15 +199,20 @@ static void dreplsrv_notify_del_repsTo(struct dreplsrv_notify_operation *op)
 /*
   called when a notify operation has completed
  */
-static void dreplsrv_notify_op_callback(struct dreplsrv_notify_operation *op)
+static void dreplsrv_notify_op_callback(struct tevent_req *subreq)
 {
+       struct dreplsrv_notify_operation *op =
+               tevent_req_callback_data(subreq,
+               struct dreplsrv_notify_operation);
        NTSTATUS status;
        struct dreplsrv_service *s = op->service;
 
-       status = composite_wait(op->creq);
+       status = dreplsrv_op_notify_recv(subreq);
+       TALLOC_FREE(subreq);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0,("dreplsrv_notify: Failed to send DsReplicaSync to %s - %s\n",
+               DEBUG(0,("dreplsrv_notify: Failed to send DsReplicaSync to %s for %s - %s\n",
                         op->source_dsa->repsFrom1->other_info->dns_name,
+                        ldb_dn_get_linearized(op->source_dsa->partition->dn),
                         nt_errstr(status)));
        } else {
                DEBUG(2,("dreplsrv_notify: DsReplicaSync OK for %s\n",
@@ -190,27 +222,19 @@ static void dreplsrv_notify_op_callback(struct dreplsrv_notify_operation *op)
                   partition, as we have successfully told him to sync */
                dreplsrv_notify_del_repsTo(op);
        }
-       talloc_free(op->creq);
 
        talloc_free(op);
        s->ops.n_current = NULL;
        dreplsrv_notify_run_ops(s);
 }
 
-
-static void dreplsrv_notify_op_callback_creq(struct composite_context *creq)
-{
-       struct dreplsrv_notify_operation *op = talloc_get_type(creq->async.private_data,
-                                                              struct dreplsrv_notify_operation);
-       dreplsrv_notify_op_callback(op);
-}
-
 /*
   run any pending replica sync calls
  */
 void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
 {
        struct dreplsrv_notify_operation *op;
+       struct tevent_req *subreq;
 
        if (s->ops.n_current || s->ops.current) {
                /* if there's still one running, we're done */
@@ -226,14 +250,14 @@ void dreplsrv_notify_run_ops(struct dreplsrv_service *s)
        s->ops.n_current = op;
        DLIST_REMOVE(s->ops.notifies, op);
 
-       op->creq = dreplsrv_op_notify_send(op);
-       if (!op->creq) {
-               dreplsrv_notify_op_callback(op);
+       subreq = dreplsrv_op_notify_send(op, s->task->event_ctx, op);
+       if (!subreq) {
+               DEBUG(0,("dreplsrv_notify_run_ops: dreplsrv_op_notify_send[%s][%s] - no memory\n",
+                        op->source_dsa->repsFrom1->other_info->dns_name,
+                        ldb_dn_get_linearized(op->source_dsa->partition->dn)));
                return;
        }
-
-       op->creq->async.fn              = dreplsrv_notify_op_callback_creq;
-       op->creq->async.private_data    = op;
+       tevent_req_set_callback(subreq, dreplsrv_notify_op_callback, op);
 }
 
 
@@ -261,7 +285,8 @@ static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
                                            struct dreplsrv_partition *p,
                                            struct repsFromToBlob *reps,
                                            TALLOC_CTX *mem_ctx,
-                                           uint64_t uSN)
+                                           uint64_t uSN,
+                                           bool is_urgent)
 {
        struct dreplsrv_notify_operation *op;
        struct dreplsrv_partition_source_dsa *s;
@@ -279,6 +304,7 @@ static WERROR dreplsrv_schedule_notify_sync(struct dreplsrv_service *service,
        op->service     = service;
        op->source_dsa  = s;
        op->uSN         = uSN;
+       op->is_urgent   = is_urgent;
 
        DLIST_ADD_END(service->ops.notifies, op, struct dreplsrv_notify_operation *);
        talloc_steal(service, op);
@@ -296,7 +322,8 @@ static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
        uint32_t count=0;
        struct repsFromToBlob *reps;
        WERROR werr;
-       uint64_t uSN;
+       uint64_t uSNHighest;
+       uint64_t uSNUrgent;
        int ret, i;
 
        werr = dsdb_loadreps(s->samdb, mem_ctx, p->dn, "repsTo", &reps, &count);
@@ -309,9 +336,9 @@ static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
                return werr;
        }
 
-       /* loads the partition uSNHighest */
-       ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSN);
-       if (ret != LDB_SUCCESS || uSN == 0) {
+       /* loads the partition uSNHighest and uSNUrgent */
+       ret = dsdb_load_partition_usn(s->samdb, p->dn, &uSNHighest, &uSNUrgent);
+       if (ret != LDB_SUCCESS || uSNHighest == 0) {
                /* nothing to do */
                return WERR_OK;
        }
@@ -321,10 +348,19 @@ static WERROR dreplsrv_notify_check(struct dreplsrv_service *s,
                struct dreplsrv_partition_source_dsa *sdsa;
                sdsa = dreplsrv_find_source_dsa(p, &reps[i].ctr.ctr1.source_dsa_obj_guid);
                if (sdsa == NULL) continue;
-               if (sdsa->notify_uSN < uSN) {
+               if (sdsa->notify_uSN < uSNHighest) {
                        /* we need to tell this partner to replicate
                           with us */
-                       werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx, uSN);
+
+                       /* check if urgent replication is needed */
+                       if (sdsa->notify_uSN < uSNUrgent) {
+                               werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx,
+                                                                       uSNHighest, true);
+                       } else {
+                               werr = dreplsrv_schedule_notify_sync(s, p, &reps[i], mem_ctx,
+                                                                       uSNHighest, false);
+                       }
+
                        if (!W_ERROR_IS_OK(werr)) {
                                DEBUG(0,(__location__ ": Failed to setup notify to %s for %s\n",
                                         reps[i].ctr.ctr1.other_info->dns_name,
@@ -368,7 +404,7 @@ static void dreplsrv_notify_handler_te(struct tevent_context *ev, struct tevent_
 
        status = dreplsrv_notify_schedule(service, service->notify.interval);
        if (!W_ERROR_IS_OK(status)) {
-               task_server_terminate(service->task, win_errstr(status));
+               task_server_terminate(service->task, win_errstr(status), false);
                return;
        }
 }
@@ -403,7 +439,7 @@ WERROR dreplsrv_notify_schedule(struct dreplsrv_service *service, uint32_t next_
        W_ERROR_HAVE_NO_MEMORY(new_te);
 
        tmp_mem = talloc_new(service);
-       DEBUG(2,("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
+       DEBUG(4,("dreplsrv_notify_schedule(%u) %sscheduled for: %s\n",
                next_interval,
                (service->notify.te?"re":""),
                nt_time_string(tmp_mem, timeval_to_nttime(&next_time))));