ctdb_conn: add ctdbd_parse_send/recv
authorRalph Boehme <slow@samba.org>
Mon, 9 Jan 2017 07:17:02 +0000 (08:17 +0100)
committerJeremy Allison <jra@samba.org>
Tue, 18 Apr 2017 20:54:16 +0000 (22:54 +0200)
Implement the ctdb packet layer for async parse send/recv with tevent.

ctdbd_setup_fde() will is used to create an fde from the
connection fd and will be called from dbwrap_ctdb.

ctdbd_parse_send() and ctdbd_parse_recv() will be used by dbwrap_ctdb
for async packet sending and receiving.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
source3/include/ctdbd_conn.h
source3/lib/ctdbd_conn.c

index bbebbcec9fecb4c80957f347184ec8d10e05193d..06fbcc373a4d9f81b465f9c6dd016c34588902d7 100644 (file)
@@ -23,6 +23,7 @@
 #include "replace.h"
 #include "system/filesys.h"
 #include "system/network.h"
+#include "lib/dbwrap/dbwrap.h"
 #include <tdb.h>
 #include <tevent.h>
 
@@ -36,6 +37,7 @@ int ctdbd_init_connection(TALLOC_CTX *mem_ctx,
 int ctdbd_reinit_connection(TALLOC_CTX *mem_ctx,
                            const char *sockname, int timeout,
                            struct ctdbd_connection *conn);
+int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev);
 
 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn);
 
@@ -94,4 +96,17 @@ int register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
                        void *private_data);
 int ctdbd_probe(const char *sockname, int timeout);
 
+struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
+                                   struct tevent_context *ev,
+                                   struct ctdbd_connection *conn,
+                                   uint32_t db_id,
+                                   TDB_DATA key,
+                                   bool local_copy,
+                                   void (*parser)(TDB_DATA key,
+                                                  TDB_DATA data,
+                                                  void *private_data),
+                                   void *private_data,
+                                   enum dbwrap_req_state *req_state);
+int ctdbd_parse_recv(struct tevent_req *req);
+
 #endif /* _CTDBD_CONN_H */
index d16796f4038279780836d3879f09e4dfba2dd998..c629d3c31e62e7c029d2846f6fa23e05a244382f 100644 (file)
@@ -19,6 +19,7 @@
 */
 
 #include "replace.h"
+#include <tevent.h>
 #include "util_tdb.h"
 #include "serverid.h"
 #include "ctdbd_conn.h"
 #include "lib/util/talloc_stack.h"
 #include "lib/util/genrand.h"
 #include "lib/util/fault.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/tevent_unix.c"
+#include "lib/util/sys_rw.h"
+#include "lib/util/blocking.h"
 
 /* paths to these include files come from --with-ctdb= in configure */
 
@@ -44,6 +49,9 @@ struct ctdbd_srvid_cb {
        void *private_data;
 };
 
+struct ctdb_pkt_send_state;
+struct ctdb_pkt_recv_state;
+
 struct ctdbd_connection {
        uint32_t reqid;
        uint32_t our_vnn;
@@ -51,8 +59,44 @@ struct ctdbd_connection {
        struct ctdbd_srvid_cb *callbacks;
        int fd;
        int timeout;
+
+       /* For async connections, enabled via ctdbd_setup_ev() */
+       struct tevent_fd *fde;
+
+       /* State to track in-progress read */
+       struct ctdb_read_state {
+               /* Receive buffer for the initial packet length */
+               uint32_t msglen;
+
+               /* iovec state for current read */
+               struct iovec iov;
+               struct iovec *iovs;
+               int iovcnt;
+
+               /* allocated receive buffer based on packet length */
+               struct ctdb_req_header *hdr;
+       } read_state;
+
+       /* Lists of pending async reads and writes */
+       struct ctdb_pkt_recv_state *recv_list;
+       struct ctdb_pkt_send_state *send_list;
 };
 
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+                                      struct tevent_fd *fde,
+                                      uint16_t flags,
+                                      void *private_data);
+
+static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
+{
+       return (conn->send_list != NULL);
+}
+
+static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
+{
+       return (conn->fde != NULL);
+}
+
 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
 {
        conn->reqid += 1;
@@ -391,14 +435,28 @@ static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
        return 0;
 }
 
-static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+/**
+ * This prepares conn for handling async requests
+ **/
+int ctdbd_setup_fde(struct ctdbd_connection *conn, struct tevent_context *ev)
 {
-       if (c->fd != -1) {
-               close(c->fd);
-               c->fd = -1;
+       set_blocking(conn->fd, false);
+
+       conn->fde = tevent_add_fd(ev,
+                                 conn,
+                                 conn->fd,
+                                 TEVENT_FD_READ,
+                                 ctdbd_async_socket_handler,
+                                 conn);
+       if (conn->fde == NULL) {
+               return ENOMEM;
        }
+
        return 0;
 }
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c);
+
 /*
  * Get us a ctdbd connection
  */
@@ -547,6 +605,41 @@ void ctdbd_socket_readable(struct ctdbd_connection *conn)
        }
 }
 
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn);
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn);
+
+/* Used for async connection and async ctcb requests */
+static void ctdbd_async_socket_handler(struct tevent_context *ev,
+                                      struct tevent_fd *fde,
+                                      uint16_t flags,
+                                      void *private_data)
+{
+       struct ctdbd_connection *conn = talloc_get_type_abort(
+               private_data, struct ctdbd_connection);
+       int ret;
+
+       if ((flags & TEVENT_FD_READ) != 0) {
+               ret = ctdb_pkt_recv_handler(conn);
+               if (ret != 0) {
+                       DBG_DEBUG("ctdb_read_iov_handler returned %s\n",
+                                 strerror(ret));
+               }
+               return;
+       }
+
+       if ((flags & TEVENT_FD_WRITE) != 0) {
+               ret = ctdb_pkt_send_handler(conn);
+               if (ret != 0) {
+                       DBG_DEBUG("ctdb_write_iov_handler returned %s\n",
+                                 strerror(ret));
+                       return;
+               }
+               return;
+       }
+
+       return;
+}
+
 int ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
                             uint32_t dst_vnn, uint64_t dst_srvid,
                             const struct iovec *iov, int iovlen)
@@ -600,6 +693,17 @@ static int ctdbd_control(struct ctdbd_connection *conn,
        ssize_t nwritten;
        int ret;
 
+       if (ctdbd_conn_has_async_reqs(conn)) {
+               /*
+                * Can't use sync call while an async call is in flight. Adding
+                * this check as a safety net. We'll be using different
+                * connections for sync and async requests, so this shouldn't
+                * happen, but who knows...
+                */
+               DBG_ERR("Async ctdb req on sync connection\n");
+               return EINVAL;
+       }
+
        ZERO_STRUCT(req);
        req.hdr.length = offsetof(struct ctdb_req_control_old, data) + data.dsize;
        req.hdr.ctdb_magic   = CTDB_MAGIC;
@@ -768,6 +872,17 @@ int ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id, TDB_DATA key)
        ssize_t nwritten;
        int ret;
 
+       if (ctdbd_conn_has_async_reqs(conn)) {
+               /*
+                * Can't use sync call while an async call is in flight. Adding
+                * this check as a safety net. We'll be using different
+                * connections for sync and async requests, so this shouldn't
+                * happen, but who knows...
+                */
+               DBG_ERR("Async ctdb req on sync connection\n");
+               return EINVAL;
+       }
+
        ZERO_STRUCT(req);
 
        req.hdr.length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
@@ -833,6 +948,17 @@ int ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
        uint32_t flags;
        int ret;
 
+       if (ctdbd_conn_has_async_reqs(conn)) {
+               /*
+                * Can't use sync call while an async call is in flight. Adding
+                * this check as a safety net. We'll be using different
+                * connections for sync and async requests, so this shouldn't
+                * happen, but who knows...
+                */
+               DBG_ERR("Async ctdb req on sync connection\n");
+               return EINVAL;
+       }
+
        flags = local_copy ? CTDB_WANT_READONLY : 0;
 
        ZERO_STRUCT(req);
@@ -904,6 +1030,17 @@ int ctdbd_traverse(struct ctdbd_connection *conn, uint32_t db_id,
        struct ctdb_traverse_start t;
        int32_t cstatus;
 
+       if (ctdbd_conn_has_async_reqs(conn)) {
+               /*
+                * Can't use sync call while an async call is in flight. Adding
+                * this check as a safety net. We'll be using different
+                * connections for sync and async requests, so this shouldn't
+                * happen, but who knows...
+                */
+               DBG_ERR("Async ctdb req on sync connection\n");
+               return EINVAL;
+       }
+
        t.db_id = db_id;
        t.srvid = conn->rand_srvid;
        t.reqid = ctdbd_next_reqid(conn);
@@ -1143,3 +1280,702 @@ int ctdbd_probe(const char *sockname, int timeout)
 
        return ret;
 }
+
+struct ctdb_pkt_send_state {
+       struct ctdb_pkt_send_state *prev, *next;
+       struct tevent_context *ev;
+       struct ctdbd_connection *conn;
+
+       /* ctdb request id */
+       uint32_t reqid;
+
+       /* the associated tevent request */
+       struct tevent_req *req;
+
+       /* iovec array with data to send */
+       struct iovec _iov;
+       struct iovec *iov;
+       int iovcnt;
+
+       /* Initial packet length */
+       size_t packet_len;
+};
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state);
+
+/**
+ * Asynchronously send a ctdb packet given as iovec array
+ *
+ * Note: the passed iov array is not const here. Similar
+ * functions in samba take a const array and create a copy
+ * before calling iov_advance() on the array.
+ *
+ * This function will modify the iov array! But
+ * this is a static function and our only caller
+ * ctdb_parse_send/recv is preparared for this to
+ * happen!
+ **/
+static struct tevent_req *ctdb_pkt_send_send(TALLOC_CTX *mem_ctx,
+                                            struct tevent_context *ev,
+                                            struct ctdbd_connection *conn,
+                                            uint32_t reqid,
+                                            struct iovec *iov,
+                                            int iovcnt,
+                                            enum dbwrap_req_state *req_state)
+{
+       struct tevent_req *req = NULL;
+       struct ctdb_pkt_send_state *state = NULL;
+       ssize_t nwritten;
+       bool ok;
+
+       DBG_DEBUG("sending async ctdb reqid [%" PRIu32 "]\n", reqid);
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_send_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       *state = (struct ctdb_pkt_send_state) {
+               .ev = ev,
+               .conn = conn,
+               .req = req,
+               .reqid = reqid,
+               .iov = iov,
+               .iovcnt = iovcnt,
+               .packet_len = iov_buflen(iov, iovcnt),
+       };
+
+       tevent_req_set_cleanup_fn(req, ctdb_pkt_send_cleanup);
+
+       *req_state = DBWRAP_REQ_QUEUED;
+
+       if (ctdbd_conn_has_async_sends(conn)) {
+               /*
+                * Can't attempt direct write with messages already queued and
+                * possibly in progress
+                */
+               DLIST_ADD_END(conn->send_list, state);
+               return req;
+       }
+
+       /*
+        * Attempt a direct write. If this returns short, shedule the
+        * remaining data as an async write, otherwise we're already done.
+        */
+
+       nwritten = writev(conn->fd, state->iov, state->iovcnt);
+       if (nwritten == state->packet_len) {
+               DBG_DEBUG("Finished sending reqid [%" PRIu32 "]\n", reqid);
+
+               *req_state = DBWRAP_REQ_DISPATCHED;
+               tevent_req_done(req);
+               return tevent_req_post(req, ev);
+       }
+
+       if (nwritten == -1) {
+               if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       cluster_fatal("cluster write error\n");
+               }
+               nwritten = 0;
+       }
+
+       DBG_DEBUG("Posting async write of reqid [%" PRIu32"]"
+                 "after short write [%zd]\n", reqid, nwritten);
+
+       ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+       if (!ok) {
+               *req_state = DBWRAP_REQ_ERROR;
+               tevent_req_error(req, EIO);
+               return tevent_req_post(req, ev);
+       }
+
+       /*
+        * As this is the first async write req we post, we must enable
+        * fd-writable events.
+        */
+       TEVENT_FD_WRITEABLE(conn->fde);
+       DLIST_ADD_END(conn->send_list, state);
+       return req;
+}
+
+static int ctdb_pkt_send_state_destructor(struct ctdb_pkt_send_state *state)
+{
+       struct ctdbd_connection *conn = state->conn;
+
+       if (conn == NULL) {
+               return 0;
+       }
+
+       if (state->req == NULL) {
+               DBG_DEBUG("Removing cancelled reqid [%" PRIu32"]\n",
+                         state->reqid);
+               state->conn = NULL;
+               DLIST_REMOVE(conn->send_list, state);
+               return 0;
+       }
+
+       DBG_DEBUG("Reparenting cancelled reqid [%" PRIu32"]\n",
+                 state->reqid);
+
+       talloc_reparent(state->req, conn, state);
+       state->req = NULL;
+       return -1;
+}
+
+static void ctdb_pkt_send_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state)
+{
+       struct ctdb_pkt_send_state *state = tevent_req_data(
+               req, struct ctdb_pkt_send_state);
+       struct ctdbd_connection *conn = state->conn;
+       size_t missing_len = 0;
+
+       if (conn == NULL) {
+               return;
+       }
+
+       missing_len = iov_buflen(state->iov, state->iovcnt);
+       if (state->packet_len == missing_len) {
+               /*
+                * We haven't yet started sending this one, so we can just
+                * remove it from the pending list
+                */
+               missing_len = 0;
+       }
+       if (missing_len != 0) {
+               uint8_t *buf = NULL;
+
+               if (req_state != TEVENT_REQ_RECEIVED) {
+                       /*
+                        * Wait til the req_state is TEVENT_REQ_RECEIVED, as
+                        * that will be the final state when the request state
+                        * is talloc_free'd from tallloc_req_received(). Which
+                        * ensures we only run the following code *ONCE*!
+                        */
+                       return;
+               }
+
+               DBG_DEBUG("Cancelling in-flight reqid [%" PRIu32"]\n",
+                         state->reqid);
+               /*
+                * A request in progress of being sent. Reparent the iov buffer
+                * so we can continue sending the request. See also the comment
+                * in ctdbd_parse_send() when copying the key buffer.
+                */
+
+               buf = iov_concat(state, state->iov, state->iovcnt);
+               if (buf == NULL) {
+                       cluster_fatal("iov_concat error\n");
+                       return;
+               }
+
+               state->iovcnt = 1;
+               state->_iov.iov_base = buf;
+               state->_iov.iov_len = missing_len;
+               state->iov = &state->_iov;
+
+               talloc_set_destructor(state, ctdb_pkt_send_state_destructor);
+               return;
+       }
+
+       DBG_DEBUG("Removing pending reqid [%" PRIu32"]\n", state->reqid);
+
+       state->conn = NULL;
+       DLIST_REMOVE(conn->send_list, state);
+
+       if (!ctdbd_conn_has_async_sends(conn)) {
+               DBG_DEBUG("No more sends, disabling fd-writable events\n");
+               TEVENT_FD_NOT_WRITEABLE(conn->fde);
+       }
+}
+
+static int ctdb_pkt_send_handler(struct ctdbd_connection *conn)
+{
+       struct ctdb_pkt_send_state *state = NULL;
+       ssize_t nwritten;
+       ssize_t iovlen;
+       bool ok;
+
+       DBG_DEBUG("send handler\n");
+
+       if (!ctdbd_conn_has_async_sends(conn)) {
+               DBG_WARNING("Writable fd-event without pending send\n");
+               TEVENT_FD_NOT_WRITEABLE(conn->fde);
+               return 0;
+       }
+
+       state = conn->send_list;
+       iovlen = iov_buflen(state->iov, state->iovcnt);
+
+       nwritten = writev(conn->fd, state->iov, state->iovcnt);
+       if (nwritten == -1) {
+               if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       DBG_ERR("writev failed: %s\n", strerror(errno));
+                       cluster_fatal("cluster write error\n");
+               }
+               DBG_DEBUG("recoverable writev error, retry\n");
+               return 0;
+       }
+
+       if (nwritten < iovlen) {
+               DBG_DEBUG("short write\n");
+
+               ok = iov_advance(&state->iov, &state->iovcnt, nwritten);
+               if (!ok) {
+                       DBG_ERR("iov_advance failed\n");
+                       if (state->req == NULL) {
+                               TALLOC_FREE(state);
+                               return 0;
+                       }
+                       tevent_req_error(state->req, EIO);
+                       return 0;
+               }
+               return 0;
+       }
+
+       if (state->req == NULL) {
+               DBG_DEBUG("Finished sending cancelled reqid [%" PRIu32 "]\n",
+                         state->reqid);
+               TALLOC_FREE(state);
+               return 0;
+       }
+
+       DBG_DEBUG("Finished send request id [%" PRIu32 "]\n", state->reqid);
+
+       tevent_req_done(state->req);
+       return 0;
+}
+
+static int ctdb_pkt_send_recv(struct tevent_req *req)
+{
+       int ret;
+
+       if (tevent_req_is_unix_error(req, &ret)) {
+               tevent_req_received(req);
+               return ret;
+       }
+
+       tevent_req_received(req);
+       return 0;
+}
+
+struct ctdb_pkt_recv_state {
+       struct ctdb_pkt_recv_state *prev, *next;
+       struct tevent_context *ev;
+       struct ctdbd_connection *conn;
+
+       /* ctdb request id */
+       uint32_t reqid;
+
+       /* the associated tevent_req */
+       struct tevent_req *req;
+
+       /* pointer to allocated ctdb packet buffer */
+       struct ctdb_req_header *hdr;
+};
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state);
+
+static struct tevent_req *ctdb_pkt_recv_send(TALLOC_CTX *mem_ctx,
+                                            struct tevent_context *ev,
+                                            struct ctdbd_connection *conn,
+                                            uint32_t reqid)
+{
+       struct tevent_req *req = NULL;
+       struct ctdb_pkt_recv_state *state = NULL;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdb_pkt_recv_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       *state = (struct ctdb_pkt_recv_state) {
+               .ev = ev,
+               .conn = conn,
+               .reqid = reqid,
+               .req = req,
+       };
+
+       tevent_req_set_cleanup_fn(req, ctdb_pkt_recv_cleanup);
+
+       /*
+        * fd-readable event is always set for the fde, no need to deal with
+        * that here.
+        */
+
+       DLIST_ADD_END(conn->recv_list, state);
+       DBG_DEBUG("Posted receive reqid [%" PRIu32 "]\n", state->reqid);
+
+       return req;
+}
+
+static void ctdb_pkt_recv_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state)
+{
+       struct ctdb_pkt_recv_state *state = tevent_req_data(
+               req, struct ctdb_pkt_recv_state);
+       struct ctdbd_connection *conn = state->conn;
+
+       if (conn == NULL) {
+               return;
+       }
+       state->conn = NULL;
+       DLIST_REMOVE(conn->recv_list, state);
+}
+
+static int ctdb_pkt_recv_handler(struct ctdbd_connection *conn)
+{
+       struct ctdb_pkt_recv_state *state = NULL;
+       ssize_t nread;
+       ssize_t iovlen;
+       bool ok;
+
+       DBG_DEBUG("receive handler\n");
+
+       if (conn->read_state.iovs == NULL) {
+               conn->read_state.iov.iov_base = &conn->read_state.msglen;
+               conn->read_state.iov.iov_len = sizeof(conn->read_state.msglen);
+               conn->read_state.iovs = &conn->read_state.iov;
+               conn->read_state.iovcnt = 1;
+       }
+
+       iovlen = iov_buflen(conn->read_state.iovs, conn->read_state.iovcnt);
+
+       DBG_DEBUG("iovlen [%zd]\n", iovlen);
+
+       nread = readv(conn->fd, conn->read_state.iovs, conn->read_state.iovcnt);
+       if (nread == 0) {
+               cluster_fatal("cluster read error, peer closed connection\n");
+       }
+       if (nread == -1) {
+               if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       cluster_fatal("cluster read error\n");
+               }
+               DBG_DEBUG("recoverable error from readv, retry\n");
+               return 0;
+       }
+
+       if (nread < iovlen) {
+               DBG_DEBUG("iovlen [%zd] nread [%zd]\n", iovlen, nread);
+               ok = iov_advance(&conn->read_state.iovs,
+                                &conn->read_state.iovcnt,
+                                nread);
+               if (!ok) {
+                       return EIO;
+               }
+               return 0;
+       }
+
+       conn->read_state.iovs = NULL;
+       conn->read_state.iovcnt = 0;
+
+       if (conn->read_state.hdr == NULL) {
+               /*
+                * Going this way after reading the 4 initial byte message
+                * length
+                */
+               uint32_t msglen = conn->read_state.msglen;
+               uint8_t *readbuf = NULL;
+               size_t readlen;
+
+               DBG_DEBUG("msglen: %" PRIu32 "\n", msglen);
+
+               if (msglen < sizeof(struct ctdb_req_header)) {
+                       DBG_ERR("short message %" PRIu32 "\n", msglen);
+                       return EIO;
+               }
+
+               conn->read_state.hdr = talloc_size(conn, msglen);
+               if (conn->read_state.hdr == NULL) {
+                       return ENOMEM;
+               }
+               conn->read_state.hdr->length = msglen;
+               talloc_set_name_const(conn->read_state.hdr,
+                                     "struct ctdb_req_header");
+
+               readbuf = (uint8_t *)conn->read_state.hdr + sizeof(msglen);
+               readlen = msglen - sizeof(msglen);
+
+               conn->read_state.iov.iov_base = readbuf;
+               conn->read_state.iov.iov_len = readlen;
+               conn->read_state.iovs = &conn->read_state.iov;
+               conn->read_state.iovcnt = 1;
+
+               DBG_DEBUG("Scheduled packet read size %zd\n", readlen);
+               return 0;
+       }
+
+       /*
+        * Searching a list here is expected to be cheap, as messages are
+        * exepcted to be coming in more or less ordered and we should find the
+        * waiting request near the beginning of the list.
+        */
+       for (state = conn->recv_list; state != NULL; state = state->next) {
+               if (state->reqid == conn->read_state.hdr->reqid) {
+                       break;
+               }
+       }
+
+       if (state == NULL) {
+               DBG_ERR("Discarding async ctdb reqid %u\n",
+                       conn->read_state.hdr->reqid);
+               TALLOC_FREE(conn->read_state.hdr);
+               ZERO_STRUCT(conn->read_state);
+               return EINVAL;
+       }
+
+       DBG_DEBUG("Got reply for reqid [%" PRIu32 "]\n", state->reqid);
+
+       state->hdr = talloc_move(state, &conn->read_state.hdr);
+       ZERO_STRUCT(conn->read_state);
+       tevent_req_done(state->req);
+       return 0;
+}
+
+static int ctdb_pkt_recv_recv(struct tevent_req *req,
+                             TALLOC_CTX *mem_ctx,
+                             struct ctdb_req_header **_hdr)
+{
+       struct ctdb_pkt_recv_state *state = tevent_req_data(
+               req, struct ctdb_pkt_recv_state);
+       int error;
+
+       if (tevent_req_is_unix_error(req, &error)) {
+               DBG_ERR("ctdb_read_req failed %s\n", strerror(error));
+               tevent_req_received(req);
+               return error;
+       }
+
+       *_hdr = talloc_move(mem_ctx, &state->hdr);
+
+       tevent_req_received(req);
+       return 0;
+}
+
+static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+{
+       struct ctdb_pkt_recv_state *recv_state = NULL;
+       struct ctdb_pkt_send_state *send_state = NULL;
+
+       TALLOC_FREE(c->fde);
+       if (c->fd != -1) {
+               close(c->fd);
+               c->fd = -1;
+       }
+
+       TALLOC_FREE(c->read_state.hdr);
+       ZERO_STRUCT(c->read_state);
+
+       for (send_state = c->send_list; send_state != NULL;) {
+               DLIST_REMOVE(c->send_list, send_state);
+               send_state->conn = NULL;
+               tevent_req_defer_callback(send_state->req, send_state->ev);
+               tevent_req_error(send_state->req, EIO);
+       }
+
+       for (recv_state = c->recv_list; recv_state != NULL;) {
+               DLIST_REMOVE(c->recv_list, recv_state);
+               recv_state->conn = NULL;
+               tevent_req_defer_callback(send_state->req, recv_state->ev);
+               tevent_req_error(recv_state->req, EIO);
+       }
+
+       return 0;
+}
+
+struct ctdbd_parse_state {
+       struct tevent_context *ev;
+       struct ctdbd_connection *conn;
+       uint32_t reqid;
+       TDB_DATA key;
+       uint8_t _keybuf[64];
+       struct ctdb_req_call_old ctdb_req;
+       struct iovec iov[2];
+       void (*parser)(TDB_DATA key,
+                      TDB_DATA data,
+                      void *private_data);
+       void *private_data;
+       enum dbwrap_req_state *req_state;
+};
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq);
+static void ctdbd_parse_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
+                                   struct tevent_context *ev,
+                                   struct ctdbd_connection *conn,
+                                   uint32_t db_id,
+                                   TDB_DATA key,
+                                   bool local_copy,
+                                   void (*parser)(TDB_DATA key,
+                                                  TDB_DATA data,
+                                                  void *private_data),
+                                   void *private_data,
+                                   enum dbwrap_req_state *req_state)
+{
+       struct tevent_req *req = NULL;
+       struct ctdbd_parse_state *state = NULL;
+       uint32_t flags;
+       uint32_t packet_length;
+       struct tevent_req *subreq = NULL;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdbd_parse_state);
+       if (req == NULL) {
+               *req_state = DBWRAP_REQ_ERROR;
+               return NULL;
+       }
+
+       *state = (struct ctdbd_parse_state) {
+               .ev = ev,
+               .conn = conn,
+               .reqid = ctdbd_next_reqid(conn),
+               .parser = parser,
+               .private_data = private_data,
+               .req_state = req_state,
+       };
+
+       flags = local_copy ? CTDB_WANT_READONLY : 0;
+       packet_length = offsetof(struct ctdb_req_call_old, data) + key.dsize;
+
+       /*
+        * Copy the key into our state, as ctdb_pkt_send_cleanup() requires that
+        * all passed iov elements have a lifetime longer that the tevent_req
+        * returned by ctdb_pkt_send_send(). This is required continue sending a
+        * the low level request into the ctdb socket, if a higher level
+        * ('this') request is canceled (or talloc free'd) by the application
+        * layer, without sending invalid packets to ctdb.
+        */
+       if (key.dsize > sizeof(state->_keybuf)) {
+               state->key.dptr = talloc_memdup(state, key.dptr, key.dsize);
+               if (tevent_req_nomem(state->key.dptr, req)) {
+                       return tevent_req_post(req, ev);
+               }
+       } else {
+               memcpy(state->_keybuf, key.dptr, key.dsize);
+               state->key.dptr = state->_keybuf;
+       }
+       state->key.dsize = key.dsize;
+
+       state->ctdb_req.hdr.length       = packet_length;
+       state->ctdb_req.hdr.ctdb_magic   = CTDB_MAGIC;
+       state->ctdb_req.hdr.ctdb_version = CTDB_PROTOCOL;
+       state->ctdb_req.hdr.operation    = CTDB_REQ_CALL;
+       state->ctdb_req.hdr.reqid        = state->reqid;
+       state->ctdb_req.flags            = flags;
+       state->ctdb_req.callid           = CTDB_FETCH_FUNC;
+       state->ctdb_req.db_id            = db_id;
+       state->ctdb_req.keylen           = state->key.dsize;
+
+       state->iov[0].iov_base = &state->ctdb_req;
+       state->iov[0].iov_len = offsetof(struct ctdb_req_call_old, data);
+       state->iov[1].iov_base = state->key.dptr;
+       state->iov[1].iov_len = state->key.dsize;
+
+       /*
+        * Note that ctdb_pkt_send_send()
+        * will modify state->iov using
+        * iov_advance() without making a copy.
+        */
+       subreq = ctdb_pkt_send_send(state,
+                                   ev,
+                                   conn,
+                                   state->reqid,
+                                   state->iov,
+                                   ARRAY_SIZE(state->iov),
+                                   req_state);
+       if (tevent_req_nomem(subreq, req)) {
+               *req_state = DBWRAP_REQ_ERROR;
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdbd_parse_pkt_send_done, req);
+
+       return req;
+}
+
+static void ctdbd_parse_pkt_send_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdbd_parse_state *state = tevent_req_data(
+               req, struct ctdbd_parse_state);
+       int ret;
+
+       ret = ctdb_pkt_send_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               DBG_DEBUG("ctdb_pkt_send_recv failed %s\n", strerror(ret));
+               return;
+       }
+
+       subreq = ctdb_pkt_recv_send(state,
+                                   state->ev,
+                                   state->conn,
+                                   state->reqid);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+
+       *state->req_state = DBWRAP_REQ_DISPATCHED;
+       tevent_req_set_callback(subreq, ctdbd_parse_done, req);
+       return;
+}
+
+static void ctdbd_parse_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdbd_parse_state *state = tevent_req_data(
+               req, struct ctdbd_parse_state);
+       struct ctdb_req_header *hdr = NULL;
+       struct ctdb_reply_call_old *reply = NULL;
+       int ret;
+
+       DBG_DEBUG("async parse request finished\n");
+
+       ret = ctdb_pkt_recv_recv(subreq, state, &hdr);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               DBG_ERR("ctdb_pkt_recv_recv returned %s\n", strerror(ret));
+               return;
+       }
+
+       if (hdr->operation != CTDB_REPLY_CALL) {
+               DBG_ERR("received invalid reply\n");
+               ctdb_packet_dump(hdr);
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       reply = (struct ctdb_reply_call_old *)hdr;
+
+       if (reply->datalen == 0) {
+               /*
+                * Treat an empty record as non-existing
+                */
+               tevent_req_error(req, ENOENT);
+               return;
+       }
+
+       state->parser(state->key,
+                     make_tdb_data(&reply->data[0], reply->datalen),
+                     state->private_data);
+
+       tevent_req_done(req);
+       return;
+}
+
+int ctdbd_parse_recv(struct tevent_req *req)
+{
+       int error;
+
+       if (tevent_req_is_unix_error(req, &error)) {
+               DBG_DEBUG("async parse returned %s\n", strerror(error));
+               tevent_req_received(req);
+               return error;
+       }
+
+       tevent_req_received(req);
+       return 0;
+}