s4:drepl_out_helpers: don't look at the internals of 'struct rpc_request'
[ira/wip.git] / source4 / dsdb / repl / drepl_out_pull.c
index 2793eec8b4ec3e6e633c4b9290cb9ed260df157a..101214609af2b9caa96c3250857702a0403bf487 100644 (file)
 #include "librpc/gen_ndr/ndr_drsblobs.h"
 #include "libcli/composite/composite.h"
 
-static WERROR dreplsrv_schedule_partition_pull_source(struct dreplsrv_service *s,
-                                                     struct dreplsrv_partition *p,
-                                                     struct dreplsrv_partition_source_dsa *source,
-                                                     TALLOC_CTX *mem_ctx)
+WERROR dreplsrv_schedule_partition_pull_source(struct dreplsrv_service *s,
+                                              struct dreplsrv_partition_source_dsa *source,
+                                              enum drsuapi_DsExtendedOperation extended_op,
+                                              uint64_t fsmo_info,
+                                              dreplsrv_fsmo_callback_t callback)
 {
        struct dreplsrv_out_operation *op;
 
-       op = talloc_zero(mem_ctx, struct dreplsrv_out_operation);
+       op = talloc_zero(s, struct dreplsrv_out_operation);
        W_ERROR_HAVE_NO_MEMORY(op);
 
        op->service     = s;
        op->source_dsa  = source;
+       op->extended_op = extended_op;
+       op->fsmo_info   = fsmo_info;
+       op->callback    = callback;
 
        DLIST_ADD_END(s->ops.pending, op, struct dreplsrv_out_operation *);
-       talloc_steal(s, op);
+
        return WERR_OK;
 }
 
@@ -59,7 +63,7 @@ static WERROR dreplsrv_schedule_partition_pull(struct dreplsrv_service *s,
        struct dreplsrv_partition_source_dsa *cur;
 
        for (cur = p->sources; cur; cur = cur->next) {
-               status = dreplsrv_schedule_partition_pull_source(s, p, cur, mem_ctx);
+               status = dreplsrv_schedule_partition_pull_source(s, cur, DRSUAPI_EXOP_NONE, 0, NULL);
                W_ERROR_NOT_OK_RETURN(status);
        }
 
@@ -95,8 +99,10 @@ WERROR dreplsrv_schedule_partition_pull_by_guid(struct dreplsrv_service *s, TALL
        return WERR_NOT_FOUND;
 }
 
-static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
+static void dreplsrv_pending_op_callback(struct tevent_req *subreq)
 {
+       struct dreplsrv_out_operation *op = tevent_req_callback_data(subreq,
+                                           struct dreplsrv_out_operation);
        struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
        struct dreplsrv_service *s = op->service;
        time_t t;
@@ -105,7 +111,8 @@ static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
        t = time(NULL);
        unix_to_nt_time(&now, t);
 
-       rf->result_last_attempt = dreplsrv_op_pull_source_recv(op->creq);
+       rf->result_last_attempt = dreplsrv_op_pull_source_recv(subreq);
+       TALLOC_FREE(subreq);
        if (W_ERROR_IS_OK(rf->result_last_attempt)) {
                rf->consecutive_sync_failures   = 0;
                rf->last_success                = now;
@@ -116,30 +123,28 @@ static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
 
        rf->consecutive_sync_failures++;
 
-       DEBUG(1,("dreplsrv_op_pull_source(%s/%s) failures[%u]\n",
-               win_errstr(rf->result_last_attempt),
-               nt_errstr(werror_to_ntstatus(rf->result_last_attempt)),
-               rf->consecutive_sync_failures));
+       DEBUG(1,("dreplsrv_op_pull_source(%s/%s) for %s failures[%u]\n",
+                win_errstr(rf->result_last_attempt),
+                win_errstr(rf->result_last_attempt),
+                ldb_dn_get_linearized(op->source_dsa->partition->dn),
+                rf->consecutive_sync_failures));
 
 done:
+       if (op->callback) {
+               op->callback(s, rf->result_last_attempt);
+       }
        talloc_free(op);
        s->ops.current = NULL;
        dreplsrv_run_pending_ops(s);
        dreplsrv_notify_run_ops(s);
 }
 
-static void dreplsrv_pending_op_callback_creq(struct composite_context *creq)
-{
-       struct dreplsrv_out_operation *op = talloc_get_type(creq->async.private_data,
-                                                          struct dreplsrv_out_operation);
-       dreplsrv_pending_op_callback(op);
-}
-
 void dreplsrv_run_pending_ops(struct dreplsrv_service *s)
 {
        struct dreplsrv_out_operation *op;
        time_t t;
        NTTIME now;
+       struct tevent_req *subreq;
 
        if (s->ops.current || s->ops.n_current) {
                /* if there's still one running, we're done */
@@ -160,12 +165,18 @@ void dreplsrv_run_pending_ops(struct dreplsrv_service *s)
 
        op->source_dsa->repsFrom1->last_attempt = now;
 
-       op->creq = dreplsrv_op_pull_source_send(op);
-       if (!op->creq) {
-               dreplsrv_pending_op_callback(op);
+       subreq = dreplsrv_op_pull_source_send(op, s->task->event_ctx, op);
+       if (!subreq) {
+               struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
+
+               rf->result_last_attempt = WERR_NOMEM;
+               rf->consecutive_sync_failures++;
+
+               DEBUG(1,("dreplsrv_op_pull_source(%s/%s) failures[%u]\n",
+                       win_errstr(rf->result_last_attempt),
+                       nt_errstr(werror_to_ntstatus(rf->result_last_attempt)),
+                       rf->consecutive_sync_failures));
                return;
        }
-
-       op->creq->async.fn              = dreplsrv_pending_op_callback_creq;
-       op->creq->async.private_data    = op;
+       tevent_req_set_callback(subreq, dreplsrv_pending_op_callback, op);
 }