*/
#include "replace.h"
+#include <tevent.h>
#include "util_tdb.h"
#include "serverid.h"
#include "ctdbd_conn.h"
#include "lib/util/talloc_stack.h"
#include "lib/util/genrand.h"
#include "lib/util/fault.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/tevent_unix.c"
+#include "lib/util/sys_rw.h"
+#include "lib/util/blocking.h"
/* paths to these include files come from --with-ctdb= in configure */
void *private_data;
};
+struct ctdb_pkt_send_state;
+struct ctdb_pkt_recv_state;
+
struct ctdbd_connection {
uint32_t reqid;
uint32_t our_vnn;
struct ctdbd_srvid_cb *callbacks;
int fd;
int timeout;
+
+ /* For async connections, enabled via ctdbd_setup_ev() */
+ struct tevent_fd *fde;
+
+ /* State to track in-progress read */
+ struct ctdb_read_state {
+ /* Receive buffer for the initial packet length */
+ uint32_t msglen;
+
+ /* iovec state for current read */
+ struct iovec iov;
+ struct iovec *iovs;
+ int iovcnt;
+
+ /* allocated receive buffer based on packet length */
+ struct ctdb_req_header *hdr;
+ } read_state;
+
+ /* Lists of pending async reads and writes */
+ struct ctdb_pkt_recv_state *recv_list;
+ struct ctdb_pkt_send_state *send_list;
};
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data);
+
+static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
+{
+ return (conn->send_list != NULL);
+}
+
+static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
+{
+ return (conn->fde != NULL);
+}
+
static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
{
conn->reqid += 1;
return 0;
}
-static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+/**
+ * This prepares conn for handling async requests
+ **/
+int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev)
{
- if (c->fd != -1) {
- close(c->fd);
- c->fd = -1;
+ set_blocking(conn->fd, false);
+
+ conn->fde = tevent_add_fd(ev,
+ conn,
+ conn->fd,
+ TEVENT_FD_READ,
+ ctdbd_async_socket_handler,
+ conn);
+ if (conn->fde == NULL) {
+ return ENOMEM;
}
+
return 0;
}
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c);
+
/*
* Get us a ctdbd connection
*/
}
}
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn);
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn);
+
+/* Used for async connection and async ctcb requests */
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct ctdbd_connection *conn = talloc_get_type_abort(
+ private_data, struct ctdbd_connection);
+ int ret;
+
+ if ((flags & TEVENT_FD_READ) != 0) {
+ ret = ctdb_pkt_recv_handler(conn);
+ if (ret != 0) {
+ DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
+ strerror(ret));
+ }
+ return;
+ }
+
+ if ((flags & TEVENT_FD_WRITE) != 0) {
+ ret = ctdb_pkt_send_handler(conn);
+ if (ret != 0) {
+ DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
+ strerror(ret));
+ return;
+ }
+ return;
+ }
+
+ return;
+}
+
int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
uint32_t dst_vnn, uint64_t dst_srvid,
const struct iovec *iov, int iovlen)
ssize_t nwritten;
int ret;
+ if (ctdbd_conn_has_async_reqs(conn)) {
+ /*
+ * Can't use sync call while an async call is in flight. Adding
+ * this check as a safety net. We'll be using different
+ * connections for sync and async requests, so this shouldn't
+ * happen, but who knows...
+ */
+ DBG_ERR("Async ctdb req on sync connection\n");
+ return EINVAL;
+ }
+
ZERO_STRUCT(req);
req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
req.hdr.ctdb_magic = CTDB_MAGIC;
ssize_t nwritten;
int ret;
+ if (ctdbd_conn_has_async_reqs(conn)) {
+ /*
+ * Can't use sync call while an async call is in flight. Adding
+ * this check as a safety net. We'll be using different
+ * connections for sync and async requests, so this shouldn't
+ * happen, but who knows...
+ */
+ DBG_ERR("Async ctdb req on sync connection\n");
+ return EINVAL;
+ }
+
ZERO_STRUCT(req);
req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
uint32_t flags;
int ret;
+ if (ctdbd_conn_has_async_reqs(conn)) {
+ /*
+ * Can't use sync call while an async call is in flight. Adding
+ * this check as a safety net. We'll be using different
+ * connections for sync and async requests, so this shouldn't
+ * happen, but who knows...
+ */
+ DBG_ERR("Async ctdb req on sync connection\n");
+ return EINVAL;
+ }
+
flags = local_copy ? CTDB_WANT_READONLY : 0;
ZERO_STRUCT(req);
struct ctdb_traverse_start t;
int32_t cstatus;
+ if (ctdbd_conn_has_async_reqs(conn)) {
+ /*
+ * Can't use sync call while an async call is in flight. Adding
+ * this check as a safety net. We'll be using different
+ * connections for sync and async requests, so this shouldn't
+ * happen, but who knows...
+ */
+ DBG_ERR("Async ctdb req on sync connection\n");
+ return EINVAL;
+ }
+
t.db_id = db_id;
t.srvid = conn->rand_srvid;
t.reqid = ctdbd_next_reqid(conn);
return ret;
}
+
+struct ctdb_pkt_send_state {
+ struct ctdb_pkt_send_state *prev, *next;
+ struct tevent_context *ev;
+ struct ctdbd_connection *conn;
+
+ /* ctdb request id */
+ uint32_t reqid;
+
+ /* the associated tevent request */
+ struct tevent_req *req;
+
+ /* iovec array with data to send */
+ struct iovec _iov;
+ struct iovec *iov;
+ int iovcnt;
+
+ /* Initial packet length */
+ size_t packet_len;
+};
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+/**
+ * Asynchronously send a ctdb packet given as iovec array
+ *
+ * Note: the passed iov array is not const here. Similar
+ * functions in samba take a const array and create a copy
+ * before calling iov_advance() on the array.
+ *
+ * This function will modify the iov array! But
+ * this is a static function and our only caller
+ * ctdb_parse_send/recv is preparared for this to
+ * happen!
+ **/
+static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdbd_connection *conn,
+ uint32_t reqid,
+ struct iovec *iov,
+ int iovcnt,
+ enum dbwrap_req_state *req_state)
+{
+ struct tevent_req *req = NULL;
+ struct ctdb_pkt_send_state *state = NULL;
+ ssize_t nwritten;
+ bool ok;
+
+ DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ *state = (struct ctdb_pkt_send_state) {
+ .ev = ev,
+ .conn = conn,
+ .req = req,
+ .reqid = reqid,
+ .iov = iov,
+ .iovcnt = iovcnt,
+ .packet_len = iov_buflen(iov, iovcnt),
+ };
+
+ tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
+
+ *req_state = DBWRAP_REQ_QUEUED;
+
+ if (ctdbd_conn_has_async_sends(conn)) {
+ /*
+ * Can't attempt direct write with messages already queued and
+ * possibly in progress
+ */
+ DLIST_ADD_END(conn->send_list, state);
+ return req;
+ }
+
+ /*
+ * Attempt a direct write. If this returns short, shedule the
+ * remaining data as an async write, otherwise we're already done.
+ */
+
+ nwritten = writev(conn->fd, state->iov, state->iovcnt);
+ if (nwritten == state->packet_len) {
+ DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
+
+ *req_state = DBWRAP_REQ_DISPATCHED;
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+ }
+
+ if (nwritten == -1) {
+ if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+ cluster_fatal("cluster write error\n");
+ }
+ nwritten = 0;
+ }
+
+ DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
+ "after short write [%zd]\n", reqid, nwritten);
+
+ ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+ if (!ok) {
+ *req_state = DBWRAP_REQ_ERROR;
+ tevent_req_error(req, EIO);
+ return tevent_req_post(req, ev);
+ }
+
+ /*
+ * As this is the first async write req we post, we must enable
+ * fd-writable events.
+ */
+ TEVENT_FD_WRITEABLE(conn->fde);
+ DLIST_ADD_END(conn->send_list, state);
+ return req;
+}
+
+static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
+{
+ struct ctdbd_connection *conn = state->conn;
+
+ if (conn == NULL) {
+ return 0;
+ }
+
+ if (state->req == NULL) {
+ DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
+ state->reqid);
+ state->conn = NULL;
+ DLIST_REMOVE(conn->send_list, state);
+ return 0;
+ }
+
+ DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
+ state->reqid);
+
+ talloc_reparent(state->req, conn, state);
+ state->req = NULL;
+ return -1;
+}
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct ctdb_pkt_send_state *state = tevent_req_data(
+ req, struct ctdb_pkt_send_state);
+ struct ctdbd_connection *conn = state->conn;
+ size_t missing_len = 0;
+
+ if (conn == NULL) {
+ return;
+ }
+
+ missing_len = iov_buflen(state->iov, state->iovcnt);
+ if (state->packet_len == missing_len) {
+ /*
+ * We haven't yet started sending this one, so we can just
+ * remove it from the pending list
+ */
+ missing_len = 0;
+ }
+ if (missing_len != 0) {
+ uint8_t *buf = NULL;
+
+ if (req_state != TEVENT_REQ_RECEIVED) {
+ /*
+ * Wait til the req_state is TEVENT_REQ_RECEIVED, as
+ * that will be the final state when the request state
+ * is talloc_free'd from tallloc_req_received(). Which
+ * ensures we only run the following code *ONCE*!
+ */
+ return;
+ }
+
+ DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
+ state->reqid);
+ /*
+ * A request in progress of being sent. Reparent the iov buffer
+ * so we can continue sending the request. See also the comment
+ * in ctdbd_parse_send() when copying the key buffer.
+ */
+
+ buf = iov_concat(state, state->iov, state->iovcnt);
+ if (buf == NULL) {
+ cluster_fatal("iov_concat error\n");
+ return;
+ }
+
+ state->iovcnt = 1;
+ state->_iov.iov_base = buf;
+ state->_iov.iov_len = missing_len;
+ state->iov = &state->_iov;
+
+ talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
+ return;
+ }
+
+ DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
+
+ state->conn = NULL;
+ DLIST_REMOVE(conn->send_list, state);
+
+ if (!ctdbd_conn_has_async_sends(conn)) {
+ DBG_DEBUG("No more sends, disabling fd-writable events\n");
+ TEVENT_FD_NOT_WRITEABLE(conn->fde);
+ }
+}
+
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
+{
+ struct ctdb_pkt_send_state *state = NULL;
+ ssize_t nwritten;
+ ssize_t iovlen;
+ bool ok;
+
+ DBG_DEBUG("send handler\n");
+
+ if (!ctdbd_conn_has_async_sends(conn)) {
+ DBG_WARNING("Writable fd-event without pending send\n");
+ TEVENT_FD_NOT_WRITEABLE(conn->fde);
+ return 0;
+ }
+
+ state = conn->send_list;
+ iovlen = iov_buflen(state->iov, state->iovcnt);
+
+ nwritten = writev(conn->fd, state->iov, state->iovcnt);
+ if (nwritten == -1) {
+ if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+ DBG_ERR("writev failed: %s\n", strerror(errno));
+ cluster_fatal("cluster write error\n");
+ }
+ DBG_DEBUG("recoverable writev error, retry\n");
+ return 0;
+ }
+
+ if (nwritten < iovlen) {
+ DBG_DEBUG("short write\n");
+
+ ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+ if (!ok) {
+ DBG_ERR("iov_advance failed\n");
+ if (state->req == NULL) {
+ TALLOC_FREE(state);
+ return 0;
+ }
+ tevent_req_error(state->req, EIO);
+ return 0;
+ }
+ return 0;
+ }
+
+ if (state->req == NULL) {
+ DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32 "]\n",
+ state->reqid);
+ TALLOC_FREE(state);
+ return 0;
+ }
+
+ DBG_DEBUG("Finished send request id [%" PRIu32 "]\n", state->reqid);
+
+ tevent_req_done(state->req);
+ return 0;
+}
+
+static int ctdb_pkt_send_recv(struct tevent_req *req)
+{
+ int ret;
+
+ if (tevent_req_is_unix_error(req, &ret)) {
+ tevent_req_received(req);
+ return ret;
+ }
+
+ tevent_req_received(req);
+ return 0;
+}
+
+struct ctdb_pkt_recv_state {
+ struct ctdb_pkt_recv_state *prev, *next;
+ struct tevent_context *ev;
+ struct ctdbd_connection *conn;
+
+ /* ctdb request id */
+ uint32_t reqid;
+
+ /* the associated tevent_req */
+ struct tevent_req *req;
+
+ /* pointer to allocated ctdb packet buffer */
+ struct ctdb_req_header *hdr;
+};
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdbd_connection *conn,
+ uint32_t reqid)
+{
+ struct tevent_req *req = NULL;
+ struct ctdb_pkt_recv_state *state = NULL;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ *state = (struct ctdb_pkt_recv_state) {
+ .ev = ev,
+ .conn = conn,
+ .reqid = reqid,
+ .req = req,
+ };
+
+ tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
+
+ /*
+ * fd-readable event is always set for the fde, no need to deal with
+ * that here.
+ */
+
+ DLIST_ADD_END(conn->recv_list, state);
+ DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
+
+ return req;
+}
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct ctdb_pkt_recv_state *state = tevent_req_data(
+ req, struct ctdb_pkt_recv_state);
+ struct ctdbd_connection *conn = state->conn;
+
+ if (conn == NULL) {
+ return;
+ }
+ state->conn = NULL;
+ DLIST_REMOVE(conn->recv_list, state);
+}
+
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
+{
+ struct ctdb_pkt_recv_state *state = NULL;
+ ssize_t nread;
+ ssize_t iovlen;
+ bool ok;
+
+ DBG_DEBUG("receive handler\n");
+
+ if (conn->read_state.iovs == NULL) {
+ conn->read_state.iov.iov_base = &conn->read_state.msglen;
+ conn->read_state.iov.iov_len = sizeof(conn->read_state.msglen);
+ conn->read_state.iovs = &conn->read_state.iov;
+ conn->read_state.iovcnt = 1;
+ }
+
+ iovlen = iov_buflen(conn->read_state.iovs, conn->read_state.iovcnt);
+
+ DBG_DEBUG("iovlen [%zd]\n", iovlen);
+
+ nread = readv(conn->fd, conn->read_state.iovs, conn->read_state.iovcnt);
+ if (nread == 0) {
+ cluster_fatal("cluster read error, peer closed connection\n");
+ }
+ if (nread == -1) {
+ if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+ cluster_fatal("cluster read error\n");
+ }
+ DBG_DEBUG("recoverable error from readv, retry\n");
+ return 0;
+ }
+
+ if (nread < iovlen) {
+ DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen, nread);
+ ok = iov_advance(&conn->read_state.iovs,
+ &conn->read_state.iovcnt,
+ nread);
+ if (!ok) {
+ return EIO;
+ }
+ return 0;
+ }
+
+ conn->read_state.iovs = NULL;
+ conn->read_state.iovcnt = 0;
+
+ if (conn->read_state.hdr == NULL) {
+ /*
+ * Going this way after reading the 4 initial byte message
+ * length
+ */
+ uint32_t msglen = conn->read_state.msglen;
+ uint8_t *readbuf = NULL;
+ size_t readlen;
+
+ DBG_DEBUG("msglen: %" PRIu32 "\n", msglen);
+
+ if (msglen < sizeof(struct ctdb_req_header)) {
+ DBG_ERR("short message %" PRIu32 "\n", msglen);
+ return EIO;
+ }
+
+ conn->read_state.hdr = talloc_size(conn, msglen);
+ if (conn->read_state.hdr == NULL) {
+ return ENOMEM;
+ }
+ conn->read_state.hdr->length = msglen;
+ talloc_set_name_const(conn->read_state.hdr,
+ "struct ctdb_req_header");
+
+ readbuf = (uint8_t *)conn->read_state.hdr + sizeof(msglen);
+ readlen = msglen - sizeof(msglen);
+
+ conn->read_state.iov.iov_base = readbuf;
+ conn->read_state.iov.iov_len = readlen;
+ conn->read_state.iovs = &conn->read_state.iov;
+ conn->read_state.iovcnt = 1;
+
+ DBG_DEBUG("Scheduled packet read size %zd\n", readlen);
+ return 0;
+ }
+
+ /*
+ * Searching a list here is expected to be cheap, as messages are
+ * exepcted to be coming in more or less ordered and we should find the
+ * waiting request near the beginning of the list.
+ */
+ for (state = conn->recv_list; state != NULL; state = state->next) {
+ if (state->reqid == conn->read_state.hdr->reqid) {
+ break;
+ }
+ }
+
+ if (state == NULL) {
+ DBG_ERR("Discarding async ctdb reqid %u\n",
+ conn->read_state.hdr->reqid);
+ TALLOC_FREE(conn->read_state.hdr);
+ ZERO_STRUCT(conn->read_state);
+ return EINVAL;
+ }
+
+ DBG_DEBUG("Got reply for reqid [%" PRIu32 "]\n", state->reqid);
+
+ state->hdr = talloc_move(state, &conn->read_state.hdr);
+ ZERO_STRUCT(conn->read_state);
+ tevent_req_done(state->req);
+ return 0;
+}
+
+static int ctdb_pkt_recv_recv(struct tevent_req *req,
+ TALLOC_CTX *mem_ctx,
+ struct ctdb_req_header **_hdr)
+{
+ struct ctdb_pkt_recv_state *state = tevent_req_data(
+ req, struct ctdb_pkt_recv_state);
+ int error;
+
+ if (tevent_req_is_unix_error(req, &error)) {
+ DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
+ tevent_req_received(req);
+ return error;
+ }
+
+ *_hdr = talloc_move(mem_ctx, &state->hdr);
+
+ tevent_req_received(req);
+ return 0;
+}
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+{
+ struct ctdb_pkt_recv_state *recv_state = NULL;
+ struct ctdb_pkt_send_state *send_state = NULL;
+
+ TALLOC_FREE(c->fde);
+ if (c->fd != -1) {
+ close(c->fd);
+ c->fd = -1;
+ }
+
+ TALLOC_FREE(c->read_state.hdr);
+ ZERO_STRUCT(c->read_state);
+
+ for (send_state = c->send_list; send_state != NULL;) {
+ DLIST_REMOVE(c->send_list, send_state);
+ send_state->conn = NULL;
+ tevent_req_defer_callback(send_state->req, send_state->ev);
+ tevent_req_error(send_state->req, EIO);
+ }
+
+ for (recv_state = c->recv_list; recv_state != NULL;) {
+ DLIST_REMOVE(c->recv_list, recv_state);
+ recv_state->conn = NULL;
+ tevent_req_defer_callback(send_state->req, recv_state->ev);
+ tevent_req_error(recv_state->req, EIO);
+ }
+
+ return 0;
+}
+
+struct ctdbd_parse_state {
+ struct tevent_context *ev;
+ struct ctdbd_connection *conn;
+ uint32_t reqid;
+ TDB_DATA key;
+ uint8_t _keybuf[64];
+ struct ctdb_req_call_old ctdb_req;
+ struct iovec iov[2];
+ void (*parser)(TDB_DATA key,
+ TDB_DATA data,
+ void *private_data);
+ void *private_data;
+ enum dbwrap_req_state *req_state;
+};
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
+static void ctdbd_parse_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdbd_connection *conn,
+ uint32_t db_id,
+ TDB_DATA key,
+ bool local_copy,
+ void (*parser)(TDB_DATA key,
+ TDB_DATA data,
+ void *private_data),
+ void *private_data,
+ enum dbwrap_req_state *req_state)
+{
+ struct tevent_req *req = NULL;
+ struct ctdbd_parse_state *state = NULL;
+ uint32_t flags;
+ uint32_t packet_length;
+ struct tevent_req *subreq = NULL;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
+ if (req == NULL) {
+ *req_state = DBWRAP_REQ_ERROR;
+ return NULL;
+ }
+
+ *state = (struct ctdbd_parse_state) {
+ .ev = ev,
+ .conn = conn,
+ .reqid = ctdbd_next_reqid(conn),
+ .parser = parser,
+ .private_data = private_data,
+ .req_state = req_state,
+ };
+
+ flags = local_copy ? CTDB_WANT_READONLY : 0;
+ packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
+
+ /*
+ * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
+ * all passed iov elements have a lifetime longer that the tevent_req
+ * returned by ctdb_pkt_send_send(). This is required continue sending a
+ * the low level request into the ctdb socket, if a higher level
+ * ('this') request is canceled (or talloc free'd) by the application
+ * layer, without sending invalid packets to ctdb.
+ */
+ if (key.dsize > sizeof(state->_keybuf)) {
+ state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
+ if (tevent_req_nomem(state->key.dptr, req)) {
+ return tevent_req_post(req, ev);
+ }
+ } else {
+ memcpy(state->_keybuf, key.dptr, key.dsize);
+ state->key.dptr = state->_keybuf;
+ }
+ state->key.dsize = key.dsize;
+
+ state->ctdb_req.hdr.length = packet_length;
+ state->ctdb_req.hdr.ctdb_magic = CTDB_MAGIC;
+ state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
+ state->ctdb_req.hdr.operation = CTDB_REQ_CALL;
+ state->ctdb_req.hdr.reqid = state->reqid;
+ state->ctdb_req.flags = flags;
+ state->ctdb_req.callid = CTDB_FETCH_FUNC;
+ state->ctdb_req.db_id = db_id;
+ state->ctdb_req.keylen = state->key.dsize;
+
+ state->iov[0].iov_base = &state->ctdb_req;
+ state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
+ state->iov[1].iov_base = state->key.dptr;
+ state->iov[1].iov_len = state->key.dsize;
+
+ /*
+ * Note that ctdb_pkt_send_send()
+ * will modify state->iov using
+ * iov_advance() without making a copy.
+ */
+ subreq = ctdb_pkt_send_send(state,
+ ev,
+ conn,
+ state->reqid,
+ state->iov,
+ ARRAY_SIZE(state->iov),
+ req_state);
+ if (tevent_req_nomem(subreq, req)) {
+ *req_state = DBWRAP_REQ_ERROR;
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
+
+ return req;
+}
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdbd_parse_state *state = tevent_req_data(
+ req, struct ctdbd_parse_state);
+ int ret;
+
+ ret = ctdb_pkt_send_recv(subreq);
+ TALLOC_FREE(subreq);
+ if (tevent_req_error(req, ret)) {
+ DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
+ return;
+ }
+
+ subreq = ctdb_pkt_recv_send(state,
+ state->ev,
+ state->conn,
+ state->reqid);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+
+ *state->req_state = DBWRAP_REQ_DISPATCHED;
+ tevent_req_set_callback(subreq, ctdbd_parse_done, req);
+ return;
+}
+
+static void ctdbd_parse_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdbd_parse_state *state = tevent_req_data(
+ req, struct ctdbd_parse_state);
+ struct ctdb_req_header *hdr = NULL;
+ struct ctdb_reply_call_old *reply = NULL;
+ int ret;
+
+ DBG_DEBUG("async parse request finished\n");
+
+ ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
+ TALLOC_FREE(subreq);
+ if (tevent_req_error(req, ret)) {
+ DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
+ return;
+ }
+
+ if (hdr->operation != CTDB_REPLY_CALL) {
+ DBG_ERR("received invalid reply\n");
+ ctdb_packet_dump(hdr);
+ tevent_req_error(req, EIO);
+ return;
+ }
+
+ reply = (struct ctdb_reply_call_old *)hdr;
+
+ if (reply->datalen == 0) {
+ /*
+ * Treat an empty record as non-existing
+ */
+ tevent_req_error(req, ENOENT);
+ return;
+ }
+
+ state->parser(state->key,
+ make_tdb_data(&reply->data[0], reply->datalen),
+ state->private_data);
+
+ tevent_req_done(req);
+ return;
+}
+
+int ctdbd_parse_recv(struct tevent_req *req)
+{
+ int error;
+
+ if (tevent_req_is_unix_error(req, &error)) {
+ DBG_DEBUG("async parse returned %s\n", strerror(error));
+ tevent_req_received(req);
+ return error;
+ }
+
+ tevent_req_received(req);
+ return 0;
+}