s4:dsdb/repl: convert dreplsrv_op_pull_source_send/recv to tevent_req
authorStefan Metzmacher <metze@samba.org>
Wed, 30 Dec 2009 16:11:51 +0000 (17:11 +0100)
committerAndrew Tridgell <tridge@samba.org>
Fri, 8 Jan 2010 02:03:05 +0000 (13:03 +1100)
metze

Signed-off-by: Andrew Tridgell <tridge@samba.org>
source4/dsdb/repl/drepl_out_helpers.c
source4/dsdb/repl/drepl_out_pull.c
source4/dsdb/repl/drepl_service.h

index 4aa0e86b480a6a34c957ac01109a19a7af5d54f6..5666a660ada4bf3880a6e63ed088510ca88f6d98 100644 (file)
@@ -202,81 +202,83 @@ NTSTATUS dreplsrv_out_drsuapi_recv(struct tevent_req *req)
 }
 
 struct dreplsrv_op_pull_source_state {
-       struct composite_context *creq;
-
        struct dreplsrv_out_operation *op;
-
-       struct dreplsrv_drsuapi_connection *drsuapi;
-
-       bool have_all;
-
-       uint32_t ctr_level;
-       struct drsuapi_DsGetNCChangesCtr1 *ctr1;
-       struct drsuapi_DsGetNCChangesCtr6 *ctr6;
 };
 
 static void dreplsrv_op_pull_source_connect_done(struct tevent_req *subreq);
 
-struct composite_context *dreplsrv_op_pull_source_send(struct dreplsrv_out_operation *op)
+struct tevent_req *dreplsrv_op_pull_source_send(TALLOC_CTX *mem_ctx,
+                                               struct tevent_context *ev,
+                                               struct dreplsrv_out_operation *op)
 {
-       struct composite_context *c;
-       struct dreplsrv_op_pull_source_state *st;
+       struct tevent_req *req;
+       struct dreplsrv_op_pull_source_state *state;
        struct tevent_req *subreq;
 
-       c = composite_create(op, op->service->task->event_ctx);
-       if (c == NULL) return NULL;
-
-       st = talloc_zero(c, struct dreplsrv_op_pull_source_state);
-       if (composite_nomem(st, c)) return c;
+       req = tevent_req_create(mem_ctx, &state,
+                               struct dreplsrv_op_pull_source_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
-       st->creq        = c;
-       st->op          = op;
+       state->op = op;
 
-       subreq = dreplsrv_out_drsuapi_send(st,
-                                          op->service->task->event_ctx,
-                                          op->source_dsa->conn);
-       if (composite_nomem(subreq, c)) return c;
-       tevent_req_set_callback(subreq, dreplsrv_op_pull_source_connect_done, st);
+       subreq = dreplsrv_out_drsuapi_send(state, ev, op->source_dsa->conn);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, dreplsrv_op_pull_source_connect_done, req);
 
-       return c;
+       return req;
 }
 
-static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st);
+static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req);
 
 static void dreplsrv_op_pull_source_connect_done(struct tevent_req *subreq)
 {
-       struct dreplsrv_op_pull_source_state *st = tevent_req_callback_data(subreq,
-                                                  struct dreplsrv_op_pull_source_state);
-       struct composite_context *c = st->creq;
+       struct tevent_req *req = tevent_req_callback_data(subreq,
+                                struct tevent_req);
+       NTSTATUS status;
 
-       c->status = dreplsrv_out_drsuapi_recv(subreq);
+       status = dreplsrv_out_drsuapi_recv(subreq);
        TALLOC_FREE(subreq);
-       if (!composite_is_ok(c)) return;
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
 
-       dreplsrv_op_pull_source_get_changes_send(st);
+       dreplsrv_op_pull_source_get_changes_trigger(req);
 }
 
-static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req);
+static void dreplsrv_op_pull_source_get_changes_done(struct rpc_request *rreq);
 
-static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_source_state *st)
+static void dreplsrv_op_pull_source_get_changes_trigger(struct tevent_req *req)
 {
-       struct composite_context *c = st->creq;
-       struct repsFromTo1 *rf1 = st->op->source_dsa->repsFrom1;
-       struct dreplsrv_service *service = st->op->service;
-       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
-       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
-       struct rpc_request *req;
+       struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
+                                                     struct dreplsrv_op_pull_source_state);
+       struct repsFromTo1 *rf1 = state->op->source_dsa->repsFrom1;
+       struct dreplsrv_service *service = state->op->service;
+       struct dreplsrv_partition *partition = state->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
+       struct rpc_request *rreq;
        struct drsuapi_DsGetNCChanges *r;
 
-       r = talloc(st, struct drsuapi_DsGetNCChanges);
-       if (composite_nomem(r, c)) return;
+       r = talloc(state, struct drsuapi_DsGetNCChanges);
+       if (tevent_req_nomem(r, req)) {
+               return;
+       }
 
        r->out.level_out = talloc(r, int32_t);
-       if (composite_nomem(r->out.level_out, c)) return;
+       if (tevent_req_nomem(r->out.level_out, req)) {
+               return;
+       }
        r->in.req = talloc(r, union drsuapi_DsGetNCChangesRequest);
-       if (composite_nomem(r->in.req, c)) return;
+       if (tevent_req_nomem(r->in.req, req)) {
+               return;
+       }
        r->out.ctr = talloc(r, union drsuapi_DsGetNCChangesCtr);
-       if (composite_nomem(r->out.ctr, c)) return;
+       if (tevent_req_nomem(r->out.ctr, req)) {
+               return;
+       }
 
        r->in.bind_handle       = &drsuapi->bind_handle;
        if (drsuapi->remote_info28.supported_extensions & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8) {
@@ -289,8 +291,8 @@ static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_sou
                r->in.req->req8.replica_flags           = rf1->replica_flags;
                r->in.req->req8.max_object_count        = 133;
                r->in.req->req8.max_ndr_size            = 1336811;
-               r->in.req->req8.extended_op             = st->op->extended_op;
-               r->in.req->req8.fsmo_info               = st->op->fsmo_info;
+               r->in.req->req8.extended_op             = state->op->extended_op;
+               r->in.req->req8.fsmo_info               = state->op->fsmo_info;
                r->in.req->req8.partial_attribute_set   = NULL;
                r->in.req->req8.partial_attribute_set_ex= NULL;
                r->in.req->req8.mapping_ctr.num_mappings= 0;
@@ -305,36 +307,42 @@ static void dreplsrv_op_pull_source_get_changes_send(struct dreplsrv_op_pull_sou
                r->in.req->req5.replica_flags           = rf1->replica_flags;
                r->in.req->req5.max_object_count        = 133;
                r->in.req->req5.max_ndr_size            = 1336770;
-               r->in.req->req5.extended_op             = st->op->extended_op;
-               r->in.req->req5.fsmo_info               = st->op->fsmo_info;
+               r->in.req->req5.extended_op             = state->op->extended_op;
+               r->in.req->req5.fsmo_info               = state->op->fsmo_info;
        }
 
-       req = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
-       composite_continue_rpc(c, req, dreplsrv_op_pull_source_get_changes_recv, st);
+       rreq = dcerpc_drsuapi_DsGetNCChanges_send(drsuapi->pipe, r, r);
+       if (tevent_req_nomem(rreq, req)) {
+               return;
+       }
+       composite_continue_rpc(NULL, rreq, dreplsrv_op_pull_source_get_changes_done, req);
 }
 
-static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
-                                                      struct drsuapi_DsGetNCChanges *r,
-                                                      uint32_t ctr_level,
-                                                      struct drsuapi_DsGetNCChangesCtr1 *ctr1,
-                                                      struct drsuapi_DsGetNCChangesCtr6 *ctr6);
+static void dreplsrv_op_pull_source_apply_changes_trigger(struct tevent_req *req,
+                                                         struct drsuapi_DsGetNCChanges *r,
+                                                         uint32_t ctr_level,
+                                                         struct drsuapi_DsGetNCChangesCtr1 *ctr1,
+                                                         struct drsuapi_DsGetNCChangesCtr6 *ctr6);
 
-static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
+static void dreplsrv_op_pull_source_get_changes_done(struct rpc_request *rreq)
 {
-       struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
-                                                  struct dreplsrv_op_pull_source_state);
-       struct composite_context *c = st->creq;
-       struct drsuapi_DsGetNCChanges *r = talloc_get_type(req->ndr.struct_ptr,
+       struct tevent_req *req = talloc_get_type(rreq->async.private_data,
+                                                struct tevent_req);
+       NTSTATUS status;
+       struct drsuapi_DsGetNCChanges *r = talloc_get_type(rreq->ndr.struct_ptr,
                                           struct drsuapi_DsGetNCChanges);
        uint32_t ctr_level = 0;
        struct drsuapi_DsGetNCChangesCtr1 *ctr1 = NULL;
        struct drsuapi_DsGetNCChangesCtr6 *ctr6 = NULL;
 
-       c->status = dcerpc_ndr_request_recv(req);
-       if (!composite_is_ok(c)) return;
+       status = dcerpc_ndr_request_recv(rreq);
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
 
        if (!W_ERROR_IS_OK(r->out.result)) {
-               composite_error(c, werror_to_ntstatus(r->out.result));
+               status = werror_to_ntstatus(r->out.result);
+               tevent_req_nterror(req, status);
                return;
        }
 
@@ -361,38 +369,42 @@ static void dreplsrv_op_pull_source_get_changes_recv(struct rpc_request *req)
                ctr_level = 6;
                ctr6 = &r->out.ctr->ctr7.ctr.xpress6.ts->ctr6;
        } else {
-               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               status = werror_to_ntstatus(WERR_BAD_NET_RESP);
+               tevent_req_nterror(req, status);
                return;
        }
 
        if (!ctr1 && !ctr6) {
-               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               status = werror_to_ntstatus(WERR_BAD_NET_RESP);
+               tevent_req_nterror(req, status);
                return;
        }
 
        if (ctr_level == 6) {
                if (!W_ERROR_IS_OK(ctr6->drs_error)) {
-                       composite_error(c, werror_to_ntstatus(ctr6->drs_error));
+                       status = werror_to_ntstatus(ctr6->drs_error);
+                       tevent_req_nterror(req, status);
                        return;
                }
        }
 
-       dreplsrv_op_pull_source_apply_changes_send(st, r, ctr_level, ctr1, ctr6);
+       dreplsrv_op_pull_source_apply_changes_trigger(req, r, ctr_level, ctr1, ctr6);
 }
 
-static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st);
+static void dreplsrv_update_refs_trigger(struct tevent_req *req);
 
-static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_source_state *st,
-                                                      struct drsuapi_DsGetNCChanges *r,
-                                                      uint32_t ctr_level,
-                                                      struct drsuapi_DsGetNCChangesCtr1 *ctr1,
-                                                      struct drsuapi_DsGetNCChangesCtr6 *ctr6)
+static void dreplsrv_op_pull_source_apply_changes_trigger(struct tevent_req *req,
+                                                         struct drsuapi_DsGetNCChanges *r,
+                                                         uint32_t ctr_level,
+                                                         struct drsuapi_DsGetNCChangesCtr1 *ctr1,
+                                                          struct drsuapi_DsGetNCChangesCtr6 *ctr6)
 {
-       struct composite_context *c = st->creq;
-       struct repsFromTo1 rf1 = *st->op->source_dsa->repsFrom1;
-       struct dreplsrv_service *service = st->op->service;
-       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
-       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
+       struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
+                                                     struct dreplsrv_op_pull_source_state);
+       struct repsFromTo1 rf1 = *state->op->source_dsa->repsFrom1;
+       struct dreplsrv_service *service = state->op->service;
+       struct dreplsrv_partition *partition = state->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
        const struct drsuapi_DsReplicaOIDMapping_Ctr *mapping_ctr;
        uint32_t object_count;
        struct drsuapi_DsReplicaObjectListItemEx *first_object;
@@ -402,6 +414,7 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
        struct dsdb_extended_replicated_objects *objects;
        bool more_data = false;
        WERROR status;
+       NTSTATUS nt_status;
 
        switch (ctr_level) {
        case 1:
@@ -425,7 +438,8 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
                more_data                       = ctr6->more_data;
                break;
        default:
-               composite_error(c, werror_to_ntstatus(WERR_BAD_NET_RESP));
+               nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
+               tevent_req_nterror(req, nt_status);
                return;
        }
 
@@ -439,32 +453,39 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
                                                          &rf1,
                                                          uptodateness_vector,
                                                          &drsuapi->gensec_skey,
-                                                         st, &objects);
+                                                         state, &objects);
        if (!W_ERROR_IS_OK(status)) {
-               DEBUG(0,("Failed to convert objects: %s\n", win_errstr(status)));
-               composite_error(c, werror_to_ntstatus(status));
+               nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
+               DEBUG(0,("Failed to convert objects: %s/%s\n",
+                         win_errstr(status), nt_errstr(nt_status)));
+               tevent_req_nterror(req, nt_status);
                return;
        }
 
        status = dsdb_extended_replicated_objects_commit(service->samdb,
                                                         objects, 
-                                                        &st->op->source_dsa->notify_uSN);
+                                                        &state->op->source_dsa->notify_uSN);
        talloc_free(objects);
        if (!W_ERROR_IS_OK(status)) {
-               DEBUG(0,("Failed to commit objects: %s\n", win_errstr(status)));
-               composite_error(c, werror_to_ntstatus(status));
+               nt_status = werror_to_ntstatus(WERR_BAD_NET_RESP);
+               DEBUG(0,("Failed to commit objects: %s/%s\n",
+                         win_errstr(status), nt_errstr(nt_status)));
+               tevent_req_nterror(req, nt_status);
                return;
        }
 
        /* if it applied fine, we need to update the highwatermark */
-       *st->op->source_dsa->repsFrom1 = rf1;
+       *state->op->source_dsa->repsFrom1 = rf1;
 
        /*
         * TODO: update our uptodatevector!
         */
 
+       /* we don't need this maybe very large structure anymore */
+       TALLOC_FREE(r);
+
        if (more_data) {
-               dreplsrv_op_pull_source_get_changes_send(st);
+               dreplsrv_op_pull_source_get_changes_trigger(req);
                return;
        }
 
@@ -473,43 +494,89 @@ static void dreplsrv_op_pull_source_apply_changes_send(struct dreplsrv_op_pull_s
           we join the domain, but they quickly expire.  We do it here
           so we can use the already established DRSUAPI pipe
        */
-       dreplsrv_update_refs_send(st);
+       dreplsrv_update_refs_trigger(req);
 }
 
-WERROR dreplsrv_op_pull_source_recv(struct composite_context *c)
+static void dreplsrv_update_refs_done(struct rpc_request *rreq);
+
+/*
+  send a UpdateRefs request to refresh our repsTo record on the server
+ */
+static void dreplsrv_update_refs_trigger(struct tevent_req *req)
 {
-       NTSTATUS status;
+       struct dreplsrv_op_pull_source_state *state = tevent_req_data(req,
+                                                     struct dreplsrv_op_pull_source_state);
+       struct dreplsrv_service *service = state->op->service;
+       struct dreplsrv_partition *partition = state->op->source_dsa->partition;
+       struct dreplsrv_drsuapi_connection *drsuapi = state->op->source_dsa->conn->drsuapi;
+       struct rpc_request *rreq;
+       struct drsuapi_DsReplicaUpdateRefs *r;
+       char *ntds_guid_str;
+       char *ntds_dns_name;
+
+       r = talloc(state, struct drsuapi_DsReplicaUpdateRefs);
+       if (tevent_req_nomem(r, req)) {
+               return;
+       }
+
+       ntds_guid_str = GUID_string(r, &service->ntds_guid);
+       if (tevent_req_nomem(ntds_guid_str, req)) {
+               return;
+       }
+
+       ntds_dns_name = talloc_asprintf(r, "%s._msdcs.%s",
+                                       ntds_guid_str,
+                                       lp_dnsdomain(service->task->lp_ctx));
+       if (tevent_req_nomem(ntds_dns_name, req)) {
+               return;
+       }
 
-       status = composite_wait(c);
+       r->in.bind_handle       = &drsuapi->bind_handle;
+       r->in.level             = 1;
+       r->in.req.req1.naming_context     = &partition->nc;
+       r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
+       r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
+       r->in.req.req1.options            =
+               DRSUAPI_DS_REPLICA_UPDATE_ADD_REFERENCE |
+               DRSUAPI_DS_REPLICA_UPDATE_DELETE_REFERENCE;
+       if (!samdb_rodc(service->task->lp_ctx)) {
+               r->in.req.req1.options |= DRSUAPI_DS_REPLICA_UPDATE_WRITEABLE;
+       }
 
-       talloc_free(c);
-       return ntstatus_to_werror(status);
+       rreq = dcerpc_drsuapi_DsReplicaUpdateRefs_send(drsuapi->pipe, r, r);
+       if (tevent_req_nomem(rreq, req)) {
+               return;
+       }
+       composite_continue_rpc(NULL, rreq, dreplsrv_update_refs_done, req);
 }
 
 /*
   receive a UpdateRefs reply
  */
-static void dreplsrv_update_refs_recv(struct rpc_request *req)
+static void dreplsrv_update_refs_done(struct rpc_request *rreq)
 {
-       struct dreplsrv_op_pull_source_state *st = talloc_get_type(req->async.private_data,
-                                                  struct dreplsrv_op_pull_source_state);
-       struct composite_context *c = st->creq;
-       struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(req->ndr.struct_ptr,
+       struct tevent_req *req = talloc_get_type(rreq->async.private_data,
+                                struct tevent_req);
+       struct drsuapi_DsReplicaUpdateRefs *r = talloc_get_type(rreq->ndr.struct_ptr,
                                                                struct drsuapi_DsReplicaUpdateRefs);
+       NTSTATUS status;
 
-       c->status = dcerpc_ndr_request_recv(req);
-       if (!composite_is_ok(c)) {
+       status = dcerpc_ndr_request_recv(rreq);
+       if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0,("UpdateRefs failed with %s\n", 
-                        nt_errstr(c->status)));
+                        nt_errstr(status)));
+               tevent_req_nterror(req, status);
                return;
        }
 
        if (!W_ERROR_IS_OK(r->out.result)) {
-               DEBUG(0,("UpdateRefs failed with %s for %s %s\n", 
+               status = werror_to_ntstatus(r->out.result);
+               DEBUG(0,("UpdateRefs failed with %s/%s for %s %s\n",
                         win_errstr(r->out.result),
+                        nt_errstr(status),
                         r->in.req.req1.dest_dsa_dns_name,
                         r->in.req.req1.naming_context->dn));
-               composite_error(c, werror_to_ntstatus(r->out.result));
+               tevent_req_nterror(req, status);
                return;
        }
 
@@ -517,46 +584,19 @@ static void dreplsrv_update_refs_recv(struct rpc_request *req)
                 r->in.req.req1.dest_dsa_dns_name,
                 r->in.req.req1.naming_context->dn));
 
-       composite_done(c);
+       tevent_req_done(req);
 }
 
-/*
-  send a UpdateRefs request to refresh our repsTo record on the server
- */
-static void dreplsrv_update_refs_send(struct dreplsrv_op_pull_source_state *st)
+WERROR dreplsrv_op_pull_source_recv(struct tevent_req *req)
 {
-       struct composite_context *c = st->creq;
-       struct dreplsrv_service *service = st->op->service;
-       struct dreplsrv_partition *partition = st->op->source_dsa->partition;
-       struct dreplsrv_drsuapi_connection *drsuapi = st->op->source_dsa->conn->drsuapi;
-       struct rpc_request *req;
-       struct drsuapi_DsReplicaUpdateRefs *r;
-       char *ntds_guid_str;
-       char *ntds_dns_name;
-
-       r = talloc(st, struct drsuapi_DsReplicaUpdateRefs);
-       if (composite_nomem(r, c)) return;
-
-       ntds_guid_str = GUID_string(r, &service->ntds_guid);
-       if (composite_nomem(ntds_guid_str, c)) return;
-
-       ntds_dns_name = talloc_asprintf(r, "%s._msdcs.%s",
-                                       ntds_guid_str,
-                                       lp_dnsdomain(service->task->lp_ctx));
-       if (composite_nomem(ntds_dns_name, c)) return;
+       NTSTATUS status;
 
-       r->in.bind_handle       = &drsuapi->bind_handle;
-       r->in.level             = 1;
-       r->in.req.req1.naming_context     = &partition->nc;
-       r->in.req.req1.dest_dsa_dns_name  = ntds_dns_name;
-       r->in.req.req1.dest_dsa_guid      = service->ntds_guid;
-       r->in.req.req1.options            = 
-               DRSUAPI_DS_REPLICA_UPDATE_ADD_REFERENCE |
-               DRSUAPI_DS_REPLICA_UPDATE_DELETE_REFERENCE;
-       if (!samdb_rodc(service->task->lp_ctx)) {
-               r->in.req.req1.options |= DRSUAPI_DS_REPLICA_UPDATE_WRITEABLE;
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return ntstatus_to_werror(status);
        }
 
-       req = dcerpc_drsuapi_DsReplicaUpdateRefs_send(drsuapi->pipe, r, r);
-       composite_continue_rpc(c, req, dreplsrv_update_refs_recv, st);
+       tevent_req_received(req);
+       return WERR_OK;
 }
+
index 8a33006d063f7c463d814a8a97b06b4e0a44357a..c2ea7e69747a3ad4a602fff28424124c3c851f1e 100644 (file)
@@ -99,8 +99,10 @@ WERROR dreplsrv_schedule_partition_pull_by_guid(struct dreplsrv_service *s, TALL
        return WERR_NOT_FOUND;
 }
 
-static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
+static void dreplsrv_pending_op_callback(struct tevent_req *subreq)
 {
+       struct dreplsrv_out_operation *op = tevent_req_callback_data(subreq,
+                                           struct dreplsrv_out_operation);
        struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
        struct dreplsrv_service *s = op->service;
        time_t t;
@@ -109,7 +111,8 @@ static void dreplsrv_pending_op_callback(struct dreplsrv_out_operation *op)
        t = time(NULL);
        unix_to_nt_time(&now, t);
 
-       rf->result_last_attempt = dreplsrv_op_pull_source_recv(op->creq);
+       rf->result_last_attempt = dreplsrv_op_pull_source_recv(subreq);
+       TALLOC_FREE(subreq);
        if (W_ERROR_IS_OK(rf->result_last_attempt)) {
                rf->consecutive_sync_failures   = 0;
                rf->last_success                = now;
@@ -135,18 +138,12 @@ done:
        dreplsrv_notify_run_ops(s);
 }
 
-static void dreplsrv_pending_op_callback_creq(struct composite_context *creq)
-{
-       struct dreplsrv_out_operation *op = talloc_get_type(creq->async.private_data,
-                                                          struct dreplsrv_out_operation);
-       dreplsrv_pending_op_callback(op);
-}
-
 void dreplsrv_run_pending_ops(struct dreplsrv_service *s)
 {
        struct dreplsrv_out_operation *op;
        time_t t;
        NTTIME now;
+       struct tevent_req *subreq;
 
        if (s->ops.current || s->ops.n_current) {
                /* if there's still one running, we're done */
@@ -167,12 +164,18 @@ void dreplsrv_run_pending_ops(struct dreplsrv_service *s)
 
        op->source_dsa->repsFrom1->last_attempt = now;
 
-       op->creq = dreplsrv_op_pull_source_send(op);
-       if (!op->creq) {
-               dreplsrv_pending_op_callback(op);
+       subreq = dreplsrv_op_pull_source_send(op, s->task->event_ctx, op);
+       if (!subreq) {
+               struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
+
+               rf->result_last_attempt = WERR_NOMEM;
+               rf->consecutive_sync_failures++;
+
+               DEBUG(1,("dreplsrv_op_pull_source(%s/%s) failures[%u]\n",
+                       win_errstr(rf->result_last_attempt),
+                       nt_errstr(werror_to_ntstatus(rf->result_last_attempt)),
+                       rf->consecutive_sync_failures));
                return;
        }
-
-       op->creq->async.fn              = dreplsrv_pending_op_callback_creq;
-       op->creq->async.private_data    = op;
+       tevent_req_set_callback(subreq, dreplsrv_pending_op_callback, op);
 }
index b9e8640ae9fccb354144127a4986839732bc2a26..0a0d721d5c2805a6b1fd9a60c3abb3e4d5fbf3c8 100644 (file)
@@ -109,8 +109,6 @@ struct dreplsrv_out_operation {
 
        struct dreplsrv_partition_source_dsa *source_dsa;
 
-       struct composite_context *creq;
-
        enum drsuapi_DsExtendedOperation extended_op;
        uint64_t fsmo_info;
        dreplsrv_fsmo_callback_t callback;