lib: Use ctdbd_req_send/recv in ctdb_parse_send/recv
authorVolker Lendecke <vl@samba.org>
Wed, 11 Mar 2020 10:03:06 +0000 (11:03 +0100)
committerRalph Boehme <slow@samba.org>
Tue, 28 Apr 2020 09:08:40 +0000 (09:08 +0000)
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source3/lib/ctdbd_conn.c
source3/lib/dbwrap/dbwrap_ctdb.c

index c6555914ff6e47ff80c1d7aca9942b8da567f64c..27ce1f3b3a77225354a8e163d61afcdc9ac5db56 100644 (file)
@@ -1360,195 +1360,6 @@ struct ctdb_pkt_send_state {
        size_t packet_len;
 };
 
-static void ctdb_pkt_send_cleanup(struct tevent_req *req,
-                                 enum tevent_req_state req_state);
-
-/**
- * Asynchronously send a ctdb packet given as iovec array
- *
- * Note: the passed iov array is not const here. Similar
- * functions in samba take a const array and create a copy
- * before calling iov_advance() on the array.
- *
- * This function will modify the iov array! But
- * this is a static function and our only caller
- * ctdb_parse_send/recv is prepared for this to
- * happen!
- **/
-static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
-                                            struct tevent_context *ev,
-                                            struct ctdbd_connection *conn,
-                                            uint32_t reqid,
-                                            struct iovec *iov,
-                                            int iovcnt,
-                                            enum dbwrap_req_state *req_state)
-{
-       struct tevent_req *req = NULL;
-       struct ctdb_pkt_send_state *state = NULL;
-       ssize_t nwritten;
-       bool ok;
-
-       DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
-
-       req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
-       if (req == NULL) {
-               return NULL;
-       }
-
-       *state = (struct ctdb_pkt_send_state) {
-               .ev = ev,
-               .conn = conn,
-               .req = req,
-               .reqid = reqid,
-               .iov = iov,
-               .iovcnt = iovcnt,
-               .packet_len = iov_buflen(iov, iovcnt),
-       };
-
-       tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
-
-       *req_state = DBWRAP_REQ_QUEUED;
-
-       if (ctdbd_conn_has_async_sends(conn)) {
-               /*
-                * Can't attempt direct write with messages already queued and
-                * possibly in progress
-                */
-               DLIST_ADD_END(conn->send_list, state);
-               return req;
-       }
-
-       /*
-        * Attempt a direct write. If this returns short, schedule the
-        * remaining data as an async write, otherwise we're already done.
-        */
-
-       nwritten = writev(conn->fd, state->iov, state->iovcnt);
-       if ((size_t)nwritten == state->packet_len) {
-               DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
-
-               *req_state = DBWRAP_REQ_DISPATCHED;
-               tevent_req_done(req);
-               return tevent_req_post(req, ev);
-       }
-
-       if (nwritten == -1) {
-               if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
-                       cluster_fatal("cluster write error\n");
-               }
-               nwritten = 0;
-       }
-
-       DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
-                 "after short write [%zd]\n", reqid, nwritten);
-
-       ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
-       if (!ok) {
-               *req_state = DBWRAP_REQ_ERROR;
-               tevent_req_error(req, EIO);
-               return tevent_req_post(req, ev);
-       }
-
-       /*
-        * As this is the first async write req we post, we must enable
-        * fd-writable events.
-        */
-       TEVENT_FD_WRITEABLE(conn->fde);
-       DLIST_ADD_END(conn->send_list, state);
-       return req;
-}
-
-static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
-{
-       struct ctdbd_connection *conn = state->conn;
-
-       if (conn == NULL) {
-               return 0;
-       }
-
-       if (state->req == NULL) {
-               DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
-                         state->reqid);
-               state->conn = NULL;
-               DLIST_REMOVE(conn->send_list, state);
-               return 0;
-       }
-
-       DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
-                 state->reqid);
-
-       talloc_reparent(state->req, conn, state);
-       state->req = NULL;
-       return -1;
-}
-
-static void ctdb_pkt_send_cleanup(struct tevent_req *req,
-                                 enum tevent_req_state req_state)
-{
-       struct ctdb_pkt_send_state *state = tevent_req_data(
-               req, struct ctdb_pkt_send_state);
-       struct ctdbd_connection *conn = state->conn;
-       size_t missing_len = 0;
-
-       if (conn == NULL) {
-               return;
-       }
-
-       missing_len = iov_buflen(state->iov, state->iovcnt);
-       if (state->packet_len == missing_len) {
-               /*
-                * We haven't yet started sending this one, so we can just
-                * remove it from the pending list
-                */
-               missing_len = 0;
-       }
-       if (missing_len != 0) {
-               uint8_t *buf = NULL;
-
-               if (req_state != TEVENT_REQ_RECEIVED) {
-                       /*
-                        * Wait til the req_state is TEVENT_REQ_RECEIVED, as
-                        * that will be the final state when the request state
-                        * is talloc_free'd from tallloc_req_received(). Which
-                        * ensures we only run the following code *ONCE*!
-                        */
-                       return;
-               }
-
-               DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
-                         state->reqid);
-               /*
-                * A request in progress of being sent. Reparent the iov buffer
-                * so we can continue sending the request. See also the comment
-                * in ctdbd_parse_send() when copying the key buffer.
-                */
-
-               buf = iov_concat(state, state->iov, state->iovcnt);
-               if (buf == NULL) {
-                       cluster_fatal("iov_concat error\n");
-                       return;
-               }
-
-               state->iovcnt = 1;
-               state->_iov.iov_base = buf;
-               state->_iov.iov_len = missing_len;
-               state->iov = &state->_iov;
-
-               talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
-               return;
-       }
-
-       DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
-
-       state->conn = NULL;
-       DLIST_REMOVE(conn->send_list, state);
-
-       if (!ctdbd_conn_has_async_sends(conn)) {
-               DBG_DEBUG("No more sends, disabling fd-writable events\n");
-               TEVENT_FD_NOT_WRITEABLE(conn->fde);
-       }
-}
-
 static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
 {
        struct ctdb_pkt_send_state *state = NULL;
@@ -1606,19 +1417,6 @@ static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
        return 0;
 }
 
-static int ctdb_pkt_send_recv(struct tevent_req *req)
-{
-       int ret;
-
-       if (tevent_req_is_unix_error(req, &ret)) {
-               tevent_req_received(req);
-               return ret;
-       }
-
-       tevent_req_received(req);
-       return 0;
-}
-
 struct ctdb_pkt_recv_state {
        struct ctdb_pkt_recv_state *prev, *next;
        struct tevent_context *ev;
@@ -1634,56 +1432,6 @@ struct ctdb_pkt_recv_state {
        struct ctdb_req_header *hdr;
 };
 
-static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
-                                 enum tevent_req_state req_state);
-
-static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
-                                            struct tevent_context *ev,
-                                            struct ctdbd_connection *conn,
-                                            uint32_t reqid)
-{
-       struct tevent_req *req = NULL;
-       struct ctdb_pkt_recv_state *state = NULL;
-
-       req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
-       if (req == NULL) {
-               return NULL;
-       }
-
-       *state = (struct ctdb_pkt_recv_state) {
-               .ev = ev,
-               .conn = conn,
-               .reqid = reqid,
-               .req = req,
-       };
-
-       tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
-
-       /*
-        * fd-readable event is always set for the fde, no need to deal with
-        * that here.
-        */
-
-       DLIST_ADD_END(conn->recv_list, state);
-       DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
-
-       return req;
-}
-
-static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
-                                 enum tevent_req_state req_state)
-{
-       struct ctdb_pkt_recv_state *state = tevent_req_data(
-               req, struct ctdb_pkt_recv_state);
-       struct ctdbd_connection *conn = state->conn;
-
-       if (conn == NULL) {
-               return;
-       }
-       state->conn = NULL;
-       DLIST_REMOVE(conn->recv_list, state);
-}
-
 static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
 {
        struct ctdb_pkt_recv_state *state = NULL;
@@ -1793,26 +1541,6 @@ static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
        return 0;
 }
 
-static int ctdb_pkt_recv_recv(struct tevent_req *req,
-                             TALLOC_CTX *mem_ctx,
-                             struct ctdb_req_header **_hdr)
-{
-       struct ctdb_pkt_recv_state *state = tevent_req_data(
-               req, struct ctdb_pkt_recv_state);
-       int error;
-
-       if (tevent_req_is_unix_error(req, &error)) {
-               DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
-               tevent_req_received(req);
-               return error;
-       }
-
-       *_hdr = talloc_move(mem_ctx, &state->hdr);
-
-       tevent_req_received(req);
-       return 0;
-}
-
 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
 {
        TALLOC_FREE(c->fde);
@@ -2188,10 +1916,8 @@ struct ctdbd_parse_state {
                       TDB_DATA data,
                       void *private_data);
        void *private_data;
-       enum dbwrap_req_state *req_state;
 };
 
-static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
 static void ctdbd_parse_done(struct tevent_req *subreq);
 
 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
@@ -2218,13 +1944,14 @@ struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
+       *req_state = DBWRAP_REQ_DISPATCHED;
+
        *state = (struct ctdbd_parse_state) {
                .ev = ev,
                .conn = conn,
                .reqid = ctdbd_next_reqid(conn),
                .parser = parser,
                .private_data = private_data,
-               .req_state = req_state,
        };
 
        flags = local_copy ? CTDB_WANT_READONLY : 0;
@@ -2264,55 +1991,17 @@ struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
        state->iov[1].iov_base = state->key.dptr;
        state->iov[1].iov_len = state->key.dsize;
 
-       /*
-        * Note that ctdb_pkt_send_send()
-        * will modify state->iov using
-        * iov_advance() without making a copy.
-        */
-       subreq = ctdb_pkt_send_send(state,
-                                   ev,
-                                   conn,
-                                   state->reqid,
-                                   state->iov,
-                                   ARRAY_SIZE(state->iov),
-                                   req_state);
+       subreq = ctdbd_req_send(
+               state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
        if (tevent_req_nomem(subreq, req)) {
                *req_state = DBWRAP_REQ_ERROR;
                return tevent_req_post(req, ev);
        }
-       tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
+       tevent_req_set_callback(subreq, ctdbd_parse_done, req);
 
        return req;
 }
 
-static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
-{
-       struct tevent_req *req = tevent_req_callback_data(
-               subreq, struct tevent_req);
-       struct ctdbd_parse_state *state = tevent_req_data(
-               req, struct ctdbd_parse_state);
-       int ret;
-
-       ret = ctdb_pkt_send_recv(subreq);
-       TALLOC_FREE(subreq);
-       if (tevent_req_error(req, ret)) {
-               DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
-               return;
-       }
-
-       subreq = ctdb_pkt_recv_send(state,
-                                   state->ev,
-                                   state->conn,
-                                   state->reqid);
-       if (tevent_req_nomem(subreq, req)) {
-               return;
-       }
-
-       *state->req_state = DBWRAP_REQ_DISPATCHED;
-       tevent_req_set_callback(subreq, ctdbd_parse_done, req);
-       return;
-}
-
 static void ctdbd_parse_done(struct tevent_req *subreq)
 {
        struct tevent_req *req = tevent_req_callback_data(
@@ -2323,12 +2012,10 @@ static void ctdbd_parse_done(struct tevent_req *subreq)
        struct ctdb_reply_call_old *reply = NULL;
        int ret;
 
-       DBG_DEBUG("async parse request finished\n");
-
-       ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
+       ret = ctdbd_req_recv(subreq, state, &hdr);
        TALLOC_FREE(subreq);
        if (tevent_req_error(req, ret)) {
-               DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
+               DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
                return;
        }
        SMB_ASSERT(hdr != NULL);
@@ -2360,14 +2047,5 @@ static void ctdbd_parse_done(struct tevent_req *subreq)
 
 int ctdbd_parse_recv(struct tevent_req *req)
 {
-       int error;
-
-       if (tevent_req_is_unix_error(req, &error)) {
-               DBG_DEBUG("async parse returned %s\n", strerror(error));
-               tevent_req_received(req);
-               return error;
-       }
-
-       tevent_req_received(req);
-       return 0;
+       return tevent_req_simple_recv_unix(req);
 }
index 4441ffa02855c1cdc22d5e49eb6e7978ffe6d6f2..9c9e05b9d7778f5db7af053db8d7491dfaa5e08a 100644 (file)
@@ -92,10 +92,11 @@ static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
        }
 
        become_root();
-       ret = ctdbd_init_connection(mem_ctx,
-                                   lp_ctdbd_socket(),
-                                   lp_ctdb_timeout(),
-                                   &ctdb_async_ctx.async_conn);
+       ret = ctdbd_init_async_connection(
+               mem_ctx,
+               lp_ctdbd_socket(),
+               lp_ctdb_timeout(),
+               &ctdb_async_ctx.async_conn);
        unbecome_root();
 
        if (ret != 0 || ctdb_async_ctx.async_conn == NULL) {
@@ -103,13 +104,6 @@ static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
                return EIO;
        }
 
-       ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
-       if (ret != 0) {
-               DBG_ERR("ctdbd_setup_fde failed\n");
-               TALLOC_FREE(ctdb_async_ctx.async_conn);
-               return ret;
-       }
-
        ctdb_async_ctx.initialized = true;
        return 0;
 }