size_t packet_len;
};
-static void ctdb_pkt_send_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state);
-
-/**
- * Asynchronously send a ctdb packet given as iovec array
- *
- * Note: the passed iov array is not const here. Similar
- * functions in samba take a const array and create a copy
- * before calling iov_advance() on the array.
- *
- * This function will modify the iov array! But
- * this is a static function and our only caller
- * ctdb_parse_send/recv is prepared for this to
- * happen!
- **/
-static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct ctdbd_connection *conn,
- uint32_t reqid,
- struct iovec *iov,
- int iovcnt,
- enum dbwrap_req_state *req_state)
-{
- struct tevent_req *req = NULL;
- struct ctdb_pkt_send_state *state = NULL;
- ssize_t nwritten;
- bool ok;
-
- DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
-
- req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
- if (req == NULL) {
- return NULL;
- }
-
- *state = (struct ctdb_pkt_send_state) {
- .ev = ev,
- .conn = conn,
- .req = req,
- .reqid = reqid,
- .iov = iov,
- .iovcnt = iovcnt,
- .packet_len = iov_buflen(iov, iovcnt),
- };
-
- tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
-
- *req_state = DBWRAP_REQ_QUEUED;
-
- if (ctdbd_conn_has_async_sends(conn)) {
- /*
- * Can't attempt direct write with messages already queued and
- * possibly in progress
- */
- DLIST_ADD_END(conn->send_list, state);
- return req;
- }
-
- /*
- * Attempt a direct write. If this returns short, schedule the
- * remaining data as an async write, otherwise we're already done.
- */
-
- nwritten = writev(conn->fd, state->iov, state->iovcnt);
- if ((size_t)nwritten == state->packet_len) {
- DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
-
- *req_state = DBWRAP_REQ_DISPATCHED;
- tevent_req_done(req);
- return tevent_req_post(req, ev);
- }
-
- if (nwritten == -1) {
- if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
- cluster_fatal("cluster write error\n");
- }
- nwritten = 0;
- }
-
- DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
- "after short write [%zd]\n", reqid, nwritten);
-
- ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
- if (!ok) {
- *req_state = DBWRAP_REQ_ERROR;
- tevent_req_error(req, EIO);
- return tevent_req_post(req, ev);
- }
-
- /*
- * As this is the first async write req we post, we must enable
- * fd-writable events.
- */
- TEVENT_FD_WRITEABLE(conn->fde);
- DLIST_ADD_END(conn->send_list, state);
- return req;
-}
-
-static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
-{
- struct ctdbd_connection *conn = state->conn;
-
- if (conn == NULL) {
- return 0;
- }
-
- if (state->req == NULL) {
- DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
- state->reqid);
- state->conn = NULL;
- DLIST_REMOVE(conn->send_list, state);
- return 0;
- }
-
- DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
- state->reqid);
-
- talloc_reparent(state->req, conn, state);
- state->req = NULL;
- return -1;
-}
-
-static void ctdb_pkt_send_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state)
-{
- struct ctdb_pkt_send_state *state = tevent_req_data(
- req, struct ctdb_pkt_send_state);
- struct ctdbd_connection *conn = state->conn;
- size_t missing_len = 0;
-
- if (conn == NULL) {
- return;
- }
-
- missing_len = iov_buflen(state->iov, state->iovcnt);
- if (state->packet_len == missing_len) {
- /*
- * We haven't yet started sending this one, so we can just
- * remove it from the pending list
- */
- missing_len = 0;
- }
- if (missing_len != 0) {
- uint8_t *buf = NULL;
-
- if (req_state != TEVENT_REQ_RECEIVED) {
- /*
- * Wait til the req_state is TEVENT_REQ_RECEIVED, as
- * that will be the final state when the request state
- * is talloc_free'd from tallloc_req_received(). Which
- * ensures we only run the following code *ONCE*!
- */
- return;
- }
-
- DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
- state->reqid);
- /*
- * A request in progress of being sent. Reparent the iov buffer
- * so we can continue sending the request. See also the comment
- * in ctdbd_parse_send() when copying the key buffer.
- */
-
- buf = iov_concat(state, state->iov, state->iovcnt);
- if (buf == NULL) {
- cluster_fatal("iov_concat error\n");
- return;
- }
-
- state->iovcnt = 1;
- state->_iov.iov_base = buf;
- state->_iov.iov_len = missing_len;
- state->iov = &state->_iov;
-
- talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
- return;
- }
-
- DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
-
- state->conn = NULL;
- DLIST_REMOVE(conn->send_list, state);
-
- if (!ctdbd_conn_has_async_sends(conn)) {
- DBG_DEBUG("No more sends, disabling fd-writable events\n");
- TEVENT_FD_NOT_WRITEABLE(conn->fde);
- }
-}
-
static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
{
struct ctdb_pkt_send_state *state = NULL;
return 0;
}
-static int ctdb_pkt_send_recv(struct tevent_req *req)
-{
- int ret;
-
- if (tevent_req_is_unix_error(req, &ret)) {
- tevent_req_received(req);
- return ret;
- }
-
- tevent_req_received(req);
- return 0;
-}
-
struct ctdb_pkt_recv_state {
struct ctdb_pkt_recv_state *prev, *next;
struct tevent_context *ev;
struct ctdb_req_header *hdr;
};
-static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state);
-
-static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct ctdbd_connection *conn,
- uint32_t reqid)
-{
- struct tevent_req *req = NULL;
- struct ctdb_pkt_recv_state *state = NULL;
-
- req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
- if (req == NULL) {
- return NULL;
- }
-
- *state = (struct ctdb_pkt_recv_state) {
- .ev = ev,
- .conn = conn,
- .reqid = reqid,
- .req = req,
- };
-
- tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
-
- /*
- * fd-readable event is always set for the fde, no need to deal with
- * that here.
- */
-
- DLIST_ADD_END(conn->recv_list, state);
- DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
-
- return req;
-}
-
-static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state)
-{
- struct ctdb_pkt_recv_state *state = tevent_req_data(
- req, struct ctdb_pkt_recv_state);
- struct ctdbd_connection *conn = state->conn;
-
- if (conn == NULL) {
- return;
- }
- state->conn = NULL;
- DLIST_REMOVE(conn->recv_list, state);
-}
-
static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
{
struct ctdb_pkt_recv_state *state = NULL;
return 0;
}
-static int ctdb_pkt_recv_recv(struct tevent_req *req,
- TALLOC_CTX *mem_ctx,
- struct ctdb_req_header **_hdr)
-{
- struct ctdb_pkt_recv_state *state = tevent_req_data(
- req, struct ctdb_pkt_recv_state);
- int error;
-
- if (tevent_req_is_unix_error(req, &error)) {
- DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
- tevent_req_received(req);
- return error;
- }
-
- *_hdr = talloc_move(mem_ctx, &state->hdr);
-
- tevent_req_received(req);
- return 0;
-}
-
static int ctdbd_connection_destructor(struct ctdbd_connection *c)
{
TALLOC_FREE(c->fde);
TDB_DATA data,
void *private_data);
void *private_data;
- enum dbwrap_req_state *req_state;
};
-static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
static void ctdbd_parse_done(struct tevent_req *subreq);
struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
return NULL;
}
+ *req_state = DBWRAP_REQ_DISPATCHED;
+
*state = (struct ctdbd_parse_state) {
.ev = ev,
.conn = conn,
.reqid = ctdbd_next_reqid(conn),
.parser = parser,
.private_data = private_data,
- .req_state = req_state,
};
flags = local_copy ? CTDB_WANT_READONLY : 0;
state->iov[1].iov_base = state->key.dptr;
state->iov[1].iov_len = state->key.dsize;
- /*
- * Note that ctdb_pkt_send_send()
- * will modify state->iov using
- * iov_advance() without making a copy.
- */
- subreq = ctdb_pkt_send_send(state,
- ev,
- conn,
- state->reqid,
- state->iov,
- ARRAY_SIZE(state->iov),
- req_state);
+ subreq = ctdbd_req_send(
+ state, ev, conn, state->iov, ARRAY_SIZE(state->iov));
if (tevent_req_nomem(subreq, req)) {
*req_state = DBWRAP_REQ_ERROR;
return tevent_req_post(req, ev);
}
- tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
+ tevent_req_set_callback(subreq, ctdbd_parse_done, req);
return req;
}
-static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
-{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct ctdbd_parse_state *state = tevent_req_data(
- req, struct ctdbd_parse_state);
- int ret;
-
- ret = ctdb_pkt_send_recv(subreq);
- TALLOC_FREE(subreq);
- if (tevent_req_error(req, ret)) {
- DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
- return;
- }
-
- subreq = ctdb_pkt_recv_send(state,
- state->ev,
- state->conn,
- state->reqid);
- if (tevent_req_nomem(subreq, req)) {
- return;
- }
-
- *state->req_state = DBWRAP_REQ_DISPATCHED;
- tevent_req_set_callback(subreq, ctdbd_parse_done, req);
- return;
-}
-
static void ctdbd_parse_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
struct ctdb_reply_call_old *reply = NULL;
int ret;
- DBG_DEBUG("async parse request finished\n");
-
- ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
+ ret = ctdbd_req_recv(subreq, state, &hdr);
TALLOC_FREE(subreq);
if (tevent_req_error(req, ret)) {
- DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
+ DBG_DEBUG("ctdb_req_recv failed %s\n", strerror(ret));
return;
}
SMB_ASSERT(hdr != NULL);
int ctdbd_parse_recv(struct tevent_req *req)
{
- int error;
-
- if (tevent_req_is_unix_error(req, &error)) {
- DBG_DEBUG("async parse returned %s\n", strerror(error));
- tevent_req_received(req);
- return error;
- }
-
- tevent_req_received(req);
- return 0;
+ return tevent_req_simple_recv_unix(req);
}