s4:libcli/wrepl: implement wrepl_request_send as a tevent_req based wrapper
authorStefan Metzmacher <metze@samba.org>
Fri, 5 Mar 2010 14:22:10 +0000 (15:22 +0100)
committerStefan Metzmacher <metze@samba.org>
Sun, 7 Mar 2010 15:52:36 +0000 (16:52 +0100)
metze

source4/libcli/wrepl/winsrepl.c
source4/torture/nbt/winsreplication.c
source4/wrepl_server/wrepl_out_helpers.c

index 2b14de30f4bf74b90d69a2d2f09945db5647eacd..39d801d606ecb0c26f964b8919c48606502b184b 100644 (file)
@@ -500,8 +500,8 @@ static int wrepl_send_ctrl_destructor(struct wrepl_send_ctrl_state *s)
   send a generic wins replication request
 */
 static struct wrepl_request *wrepl_request_internal_send(struct wrepl_socket *wrepl_socket,
-                                                         struct wrepl_packet *packet,
-                                                         struct wrepl_send_ctrl *ctrl)
+                                                        const struct wrepl_packet *packet,
+                                                        const struct wrepl_send_ctrl *ctrl)
 {
        struct wrepl_request *req;
        struct wrepl_wrap wrap;
@@ -576,18 +576,80 @@ static NTSTATUS wrepl_request_internal_recv(struct wrepl_request *req,
        return status;
 }
 
-struct wrepl_request *wrepl_request_send(struct wrepl_socket *wrepl_socket,
-                                        struct wrepl_packet *packet,
-                                        struct wrepl_send_ctrl *ctrl)
+struct wrepl_request_state {
+       struct wrepl_packet *packet;
+};
+
+static void wrepl_request_done(struct wrepl_request *subreq);
+
+struct tevent_req *wrepl_request_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct wrepl_socket *wrepl_socket,
+                                     const struct wrepl_packet *packet,
+                                     const struct wrepl_send_ctrl *ctrl)
 {
-       return wrepl_request_internal_send(wrepl_socket, packet, ctrl);
+       struct tevent_req *req;
+       struct wrepl_request_state *state;
+       struct wrepl_request *subreq;
+
+       if (wrepl_socket->event.ctx != ev) {
+               /* TODO: remove wrepl_socket->event.ctx !!! */
+               smb_panic("wrepl_associate_stop_send event context mismatch!");
+               return NULL;
+       }
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct wrepl_request_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       subreq = wrepl_request_internal_send(wrepl_socket, packet, ctrl);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       subreq->async.fn = wrepl_request_done;
+       subreq->async.private_data = req;
+
+       return req;
 }
 
-NTSTATUS wrepl_request_recv(struct wrepl_request *req,
+static void wrepl_request_done(struct wrepl_request *subreq)
+{
+       struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+                                struct tevent_req);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+
+       status = wrepl_request_internal_recv(subreq, state, &state->packet);
+       if (!NT_STATUS_IS_OK(status)) {
+               tevent_req_nterror(req, status);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+NTSTATUS wrepl_request_recv(struct tevent_req *req,
                            TALLOC_CTX *mem_ctx,
                            struct wrepl_packet **packet)
 {
-       return wrepl_request_internal_recv(req, mem_ctx, packet);
+       struct wrepl_request_state *state = tevent_req_data(req,
+                                           struct wrepl_request_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       if (packet) {
+               *packet = talloc_move(mem_ctx, &state->packet);
+       }
+
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /*
@@ -595,11 +657,28 @@ NTSTATUS wrepl_request_recv(struct wrepl_request *req,
 */
 NTSTATUS wrepl_request(struct wrepl_socket *wrepl_socket,
                       TALLOC_CTX *mem_ctx,
-                      struct wrepl_packet *req_packet,
+                      const struct wrepl_packet *req_packet,
                       struct wrepl_packet **reply_packet)
 {
-       struct wrepl_request *req = wrepl_request_send(wrepl_socket, req_packet, NULL);
-       return wrepl_request_recv(req, mem_ctx, reply_packet);
+       struct tevent_req *subreq;
+       bool ok;
+       NTSTATUS status;
+
+       subreq = wrepl_request_send(mem_ctx, wrepl_socket->event.ctx,
+                                   wrepl_socket, req_packet, NULL);
+       NT_STATUS_HAVE_NO_MEMORY(subreq);
+
+       ok = tevent_req_poll(subreq, wrepl_socket->event.ctx);
+       if (!ok) {
+               TALLOC_FREE(subreq);
+               return NT_STATUS_INTERNAL_ERROR;
+       }
+
+       status = wrepl_request_recv(subreq, mem_ctx, reply_packet);
+       TALLOC_FREE(subreq);
+       NT_STATUS_NOT_OK_RETURN(status);
+
+       return NT_STATUS_OK;
 }
 
 
@@ -609,7 +688,7 @@ struct wrepl_associate_state {
        uint16_t major_version;
 };
 
-static void wrepl_associate_done(struct wrepl_request *subreq);
+static void wrepl_associate_done(struct tevent_req *subreq);
 
 struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
                                        struct tevent_context *ev,
@@ -618,7 +697,7 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct wrepl_associate_state *state;
-       struct wrepl_request *subreq;
+       struct tevent_req *subreq;
 
        if (wrepl_socket->event.ctx != ev) {
                /* TODO: remove wrepl_socket->event.ctx !!! */
@@ -651,19 +730,18 @@ struct tevent_req *wrepl_associate_send(TALLOC_CTX *mem_ctx,
        }
        memset(state->packet.padding.data, 0, state->packet.padding.length);
 
-       subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
-       subreq->async.fn = wrepl_associate_done;
-       subreq->async.private_data = req;
+       tevent_req_set_callback(subreq, wrepl_associate_done, req);
 
        return req;
 }
 
-static void wrepl_associate_done(struct wrepl_request *subreq)
+static void wrepl_associate_done(struct tevent_req *subreq)
 {
-       struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+       struct tevent_req *req = tevent_req_callback_data(subreq,
                                 struct tevent_req);
        struct wrepl_associate_state *state = tevent_req_data(req,
                                              struct wrepl_associate_state);
@@ -671,6 +749,7 @@ static void wrepl_associate_done(struct wrepl_request *subreq)
        struct wrepl_packet *packet;
 
        status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
        if (!NT_STATUS_IS_OK(status)) {
                tevent_req_nterror(req, status);
                return;
@@ -741,7 +820,7 @@ struct wrepl_associate_stop_state {
        struct wrepl_send_ctrl ctrl;
 };
 
-static void wrepl_associate_stop_done(struct wrepl_request *subreq);
+static void wrepl_associate_stop_done(struct tevent_req *subreq);
 
 struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
                                             struct tevent_context *ev,
@@ -750,7 +829,7 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct wrepl_associate_stop_state *state;
-       struct wrepl_request *subreq;
+       struct tevent_req *subreq;
 
        if (wrepl_socket->event.ctx != ev) {
                /* TODO: remove wrepl_socket->event.ctx !!! */
@@ -774,19 +853,18 @@ struct tevent_req *wrepl_associate_stop_send(TALLOC_CTX *mem_ctx,
                state->ctrl.disconnect_after_send       = true;
        }
 
-       subreq = wrepl_request_send(wrepl_socket, &state->packet, &state->ctrl);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, &state->ctrl);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
-       subreq->async.fn = wrepl_associate_stop_done;
-       subreq->async.private_data = req;
+       tevent_req_set_callback(subreq, wrepl_associate_stop_done, req);
 
        return req;
 }
 
-static void wrepl_associate_stop_done(struct wrepl_request *subreq)
+static void wrepl_associate_stop_done(struct tevent_req *subreq)
 {
-       struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+       struct tevent_req *req = tevent_req_callback_data(subreq,
                                 struct tevent_req);
        struct wrepl_associate_stop_state *state = tevent_req_data(req,
                                                   struct wrepl_associate_stop_state);
@@ -794,6 +872,7 @@ static void wrepl_associate_stop_done(struct wrepl_request *subreq)
 
        /* currently we don't care about a possible response */
        status = wrepl_request_recv(subreq, state, NULL);
+       TALLOC_FREE(subreq);
        if (!NT_STATUS_IS_OK(status)) {
                tevent_req_nterror(req, status);
                return;
@@ -852,7 +931,7 @@ struct wrepl_pull_table_state {
        struct wrepl_wins_owner *partners;
 };
 
-static void wrepl_pull_table_done(struct wrepl_request *subreq);
+static void wrepl_pull_table_done(struct tevent_req *subreq);
 
 struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
                                         struct tevent_context *ev,
@@ -861,7 +940,7 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct wrepl_pull_table_state *state;
-       struct wrepl_request *subreq;
+       struct tevent_req *subreq;
 
        if (wrepl_socket->event.ctx != ev) {
                /* TODO: remove wrepl_socket->event.ctx !!! */
@@ -880,19 +959,18 @@ struct tevent_req *wrepl_pull_table_send(TALLOC_CTX *mem_ctx,
        state->packet.mess_type                         = WREPL_REPLICATION;
        state->packet.message.replication.command       = WREPL_REPL_TABLE_QUERY;
 
-       subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
-       subreq->async.fn = wrepl_pull_table_done;
-       subreq->async.private_data = req;
+       tevent_req_set_callback(subreq, wrepl_pull_table_done, req);
 
        return req;
 }
 
-static void wrepl_pull_table_done(struct wrepl_request *subreq)
+static void wrepl_pull_table_done(struct tevent_req *subreq)
 {
-       struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+       struct tevent_req *req = tevent_req_callback_data(subreq,
                                 struct tevent_req);
        struct wrepl_pull_table_state *state = tevent_req_data(req,
                                               struct wrepl_pull_table_state);
@@ -901,6 +979,7 @@ static void wrepl_pull_table_done(struct wrepl_request *subreq)
        struct wrepl_table *table;
 
        status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
        if (!NT_STATUS_IS_OK(status)) {
                tevent_req_nterror(req, status);
                return;
@@ -985,7 +1064,7 @@ struct wrepl_pull_names_state {
        struct wrepl_name *names;
 };
 
-static void wrepl_pull_names_done(struct wrepl_request *subreq);
+static void wrepl_pull_names_done(struct tevent_req *subreq);
 
 struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
                                         struct tevent_context *ev,
@@ -994,7 +1073,7 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct wrepl_pull_names_state *state;
-       struct wrepl_request *subreq;
+       struct tevent_req *subreq;
 
        if (wrepl_socket->event.ctx != ev) {
                /* TODO: remove wrepl_socket->event.ctx !!! */
@@ -1015,19 +1094,18 @@ struct tevent_req *wrepl_pull_names_send(TALLOC_CTX *mem_ctx,
        state->packet.message.replication.command       = WREPL_REPL_SEND_REQUEST;
        state->packet.message.replication.info.owner    = io->in.partner;
 
-       subreq = wrepl_request_send(wrepl_socket, &state->packet, NULL);
+       subreq = wrepl_request_send(state, ev, wrepl_socket, &state->packet, NULL);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
-       subreq->async.fn = wrepl_pull_names_done;
-       subreq->async.private_data = req;
+       tevent_req_set_callback(subreq, wrepl_pull_names_done, req);
 
        return req;
 }
 
-static void wrepl_pull_names_done(struct wrepl_request *subreq)
+static void wrepl_pull_names_done(struct tevent_req *subreq)
 {
-       struct tevent_req *req = talloc_get_type_abort(subreq->async.private_data,
+       struct tevent_req *req = tevent_req_callback_data(subreq,
                                 struct tevent_req);
        struct wrepl_pull_names_state *state = tevent_req_data(req,
                                               struct wrepl_pull_names_state);
@@ -1036,6 +1114,7 @@ static void wrepl_pull_names_done(struct wrepl_request *subreq)
        uint32_t i;
 
        status = wrepl_request_recv(subreq, state, &packet);
+       TALLOC_FREE(subreq);
        if (!NT_STATUS_IS_OK(status)) {
                tevent_req_nterror(req, status);
                return;
index 1655135a3341fee67e4bd0513ee65994727808a6..9a7be03199525ede32a9cf2bcaccd2127cbf61ef 100644 (file)
@@ -83,7 +83,7 @@ static const char *wrepl_name_state_string(enum wrepl_name_state state)
 static bool test_assoc_ctx1(struct torture_context *tctx)
 {
        bool ret = true;
-       struct wrepl_request *req;
+       struct tevent_req *subreq;
        struct wrepl_socket *wrepl_socket1;
        struct wrepl_associate associate1;
        struct wrepl_socket *wrepl_socket2;
@@ -95,6 +95,7 @@ static bool test_assoc_ctx1(struct torture_context *tctx)
        NTSTATUS status;
        struct nbt_name name;
        const char *address;
+       bool ok;
 
        if (!torture_nbt_get_name(tctx, &name, &address))
                return false;
@@ -131,8 +132,13 @@ static bool test_assoc_ctx1(struct torture_context *tctx)
        packet.message.replication.command = WREPL_REPL_TABLE_QUERY;
        ZERO_STRUCT(ctrl);
        ctrl.send_only = true;
-       req = wrepl_request_send(wrepl_socket2, &packet, &ctrl);
-       status = wrepl_request_recv(req, tctx, &rep_packet);
+       subreq = wrepl_request_send(tctx, tctx->ev, wrepl_socket2, &packet, &ctrl);
+       ok = tevent_req_poll(subreq, tctx->ev);
+       if (!ok) {
+               CHECK_STATUS(tctx, NT_STATUS_INTERNAL_ERROR, NT_STATUS_OK);
+       }
+       status = wrepl_request_recv(subreq, tctx, &rep_packet);
+       TALLOC_FREE(subreq);
        CHECK_STATUS(tctx, status, NT_STATUS_OK);
 
        torture_comment(tctx, "Send a association request (conn2), to make sure the last request was ignored\n");
index de3fb72318d1d931da437fed38be0adcc5710332..19cabd1e12b87b5eabb66307663c63458b0bb5fc 100644 (file)
@@ -848,15 +848,15 @@ struct wreplsrv_push_notify_state {
        enum wrepl_replication_cmd command;
        bool full_table;
        struct wrepl_send_ctrl ctrl;
-       struct wrepl_request *req;
        struct wrepl_packet req_packet;
        struct wrepl_packet *rep_packet;
        struct composite_context *creq;
        struct wreplsrv_out_connection *wreplconn;
+       struct tevent_req *subreq;
 };
 
 static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
-static void wreplsrv_push_notify_handler_req(struct wrepl_request *req);
+static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq);
 
 static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
 {
@@ -880,8 +880,10 @@ static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *s
        NT_STATUS_NOT_OK_RETURN(status);
 
        /* queue the request */
-       state->req = wrepl_request_send(state->wreplconn->sock, req, NULL);
-       NT_STATUS_HAVE_NO_MEMORY(state->req);
+       state->subreq = wrepl_request_send(state,
+                                          state->wreplconn->service->task->event_ctx,
+                                          state->wreplconn->sock, req, NULL);
+       NT_STATUS_HAVE_NO_MEMORY(state->subreq);
 
        /*
         * now we need to convert the wrepl_socket (client connection)
@@ -951,11 +953,14 @@ static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *s
        /* we won't get a reply to a inform message */
        state->ctrl.send_only           = true;
 
-       state->req = wrepl_request_send(state->wreplconn->sock, req, &state->ctrl);
-       NT_STATUS_HAVE_NO_MEMORY(state->req);
+       state->subreq = wrepl_request_send(state,
+                                          state->wreplconn->service->task->event_ctx,
+                                          state->wreplconn->sock, req, &state->ctrl);
+       NT_STATUS_HAVE_NO_MEMORY(state->subreq);
 
-       state->req->async.fn            = wreplsrv_push_notify_handler_req;
-       state->req->async.private_data  = state;
+       tevent_req_set_callback(state->subreq,
+                               wreplsrv_push_notify_handler_treq,
+                               state);
 
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
 
@@ -1009,7 +1014,8 @@ static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_sta
 {
        NTSTATUS status;
 
-       status =  wrepl_request_recv(state->req, state, NULL);
+       status = wrepl_request_recv(state->subreq, state, NULL);
+       TALLOC_FREE(state->subreq);
        NT_STATUS_NOT_OK_RETURN(status);
 
        state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
@@ -1052,9 +1058,9 @@ static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
        return;
 }
 
-static void wreplsrv_push_notify_handler_req(struct wrepl_request *req)
+static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq)
 {
-       struct wreplsrv_push_notify_state *state = talloc_get_type(req->async.private_data,
+       struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq,
                                                   struct wreplsrv_push_notify_state);
        wreplsrv_push_notify_handler(state);
        return;