ctdbd_conn: Remove ctdb_packet dependency
authorVolker Lendecke <vl@samba.org>
Tue, 6 May 2014 10:21:42 +0000 (12:21 +0200)
committerMichael Adam <obnox@samba.org>
Wed, 6 Aug 2014 13:40:10 +0000 (15:40 +0200)
This was an early, failed attempt at async socket handling.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Michael Adam <obnox@samba.org>
source3/lib/ctdbd_conn.c

index 201c70029050ff52cd18d4402f155be2037660e2..29482a0e79b729207a1e828c9cd68585cba4f108 100644 (file)
@@ -22,8 +22,8 @@
 #include "util_tdb.h"
 #include "serverid.h"
 #include "ctdbd_conn.h"
+#include "system/select.h"
 
-#include "ctdb_packet.h"
 #include "messages.h"
 
 /*
@@ -54,7 +54,7 @@ struct ctdbd_connection {
        uint32_t reqid;
        uint32_t our_vnn;
        uint64_t rand_srvid;
-       struct ctdb_packet_context *pkt;
+       int fd;
        struct tevent_fd *fde;
 
        bool (*release_ip_handler)(const char *ip_addr, void *private_data);
@@ -208,10 +208,8 @@ const char *lp_ctdbd_socket(void)
  * Get us a ctdb connection
  */
 
-static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx,
-                             struct ctdb_packet_context **presult)
+static int ctdbd_connect(int *pfd)
 {
-       struct ctdb_packet_context *result;
        const char *sockname = lp_ctdbd_socket();
        struct sockaddr_un addr = { 0, };
        int fd;
@@ -219,61 +217,26 @@ static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx,
 
        fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd == -1) {
-               DEBUG(3, ("Could not create socket: %s\n", strerror(errno)));
-               return map_nt_error_from_unix(errno);
+               int err = errno;
+               DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
+               return err;
        }
 
        addr.sun_family = AF_UNIX;
        snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", sockname);
 
        salen = sizeof(struct sockaddr_un);
+
        if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
+               int err = errno;
                DEBUG(1, ("connect(%s) failed: %s\n", sockname,
-                         strerror(errno)));
-               close(fd);
-               return map_nt_error_from_unix(errno);
-       }
-
-       if (!(result = ctdb_packet_init(mem_ctx, fd))) {
+                         strerror(err)));
                close(fd);
-               return NT_STATUS_NO_MEMORY;
-       }
-
-       *presult = result;
-       return NT_STATUS_OK;
-}
-
-/*
- * Do we have a complete ctdb packet in the queue?
- */
-
-static bool ctdb_req_complete(const uint8_t *buf, size_t available,
-                             size_t *length,
-                             void *private_data)
-{
-       uint32_t msglen;
-
-       if (available < sizeof(msglen)) {
-               return False;
-       }
-
-       msglen = *((const uint32_t *)buf);
-
-       DEBUG(11, ("msglen = %d\n", msglen));
-
-       if (msglen < sizeof(struct ctdb_req_header)) {
-               DEBUG(0, ("Got invalid msglen: %d, expected at least %d for "
-                         "the req_header\n", (int)msglen,
-                         (int)sizeof(struct ctdb_req_header)));
-               cluster_fatal("ctdbd protocol error\n");
-       }
-
-       if (available < msglen) {
-               return false;
+               return err;
        }
 
-       *length = msglen;
-       return true;
+       *pfd = fd;
+       return 0;
 }
 
 /*
@@ -303,25 +266,6 @@ static void deferred_message_dispatch(struct tevent_context *event_ctx,
        TALLOC_FREE(te);
 }
 
-struct req_pull_state {
-       TALLOC_CTX *mem_ctx;
-       DATA_BLOB req;
-};
-
-/*
- * Pull a ctdb request out of the incoming ctdb_packet queue
- */
-
-static NTSTATUS ctdb_req_pull(uint8_t *buf, size_t length,
-                             void *private_data)
-{
-       struct req_pull_state *state = (struct req_pull_state *)private_data;
-
-       state->req.data = talloc_move(state->mem_ctx, &buf);
-       state->req.length = length;
-       return NT_STATUS_OK;
-}
-
 /*
  * Fetch a messaging_rec from an incoming ctdb style message
  */
@@ -367,14 +311,55 @@ static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
        return result;
 }
 
-static NTSTATUS ctdb_packet_fd_read_sync(struct ctdb_packet_context *ctx)
+static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx,
+                                struct ctdb_req_header **result)
 {
        int timeout = lp_ctdb_timeout();
+       struct ctdb_req_header *req;
+       int ret, revents;
+       uint32_t msglen;
+       NTSTATUS status;
 
        if (timeout == 0) {
                timeout = -1;
        }
-       return ctdb_packet_fd_read_sync_timeout(ctx, timeout);
+
+       ret = poll_one_fd(fd, POLLIN, timeout, &revents);
+       if (ret == -1) {
+               return map_nt_error_from_unix(errno);
+       }
+       if (ret == 0) {
+               return NT_STATUS_IO_TIMEOUT;
+       }
+       if (ret != 1) {
+               return NT_STATUS_UNEXPECTED_IO_ERROR;
+       }
+
+       status = read_data(fd, (char *)&msglen, sizeof(msglen));
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       if (msglen < sizeof(struct ctdb_req_header)) {
+               return NT_STATUS_UNEXPECTED_IO_ERROR;
+       }
+
+       req = talloc_size(mem_ctx, msglen);
+       if (req == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       talloc_set_name_const(req, "struct ctdb_req_header");
+
+       req->length = msglen;
+
+       status = read_data(fd, ((char *)req) + sizeof(msglen),
+                          msglen - sizeof(msglen));
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       *result = req;
+       return NT_STATUS_OK;
 }
 
 /*
@@ -383,44 +368,20 @@ static NTSTATUS ctdb_packet_fd_read_sync(struct ctdb_packet_context *ctx)
  */
 
 static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
-                             TALLOC_CTX *mem_ctx, void *result)
+                             TALLOC_CTX *mem_ctx,
+                             struct ctdb_req_header **result)
 {
        struct ctdb_req_header *hdr;
-       struct req_pull_state state;
        NTSTATUS status;
 
  next_pkt:
-       ZERO_STRUCT(state);
-       state.mem_ctx = mem_ctx;
-
-       while (!ctdb_packet_handler(conn->pkt, ctdb_req_complete,
-                                   ctdb_req_pull, &state, &status)) {
-               /*
-                * Not enough data
-                */
-               status = ctdb_packet_fd_read_sync(conn->pkt);
-
-               if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) {
-                       /* EAGAIN */
-                       continue;
-               } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
-                       /* EAGAIN */
-                       continue;
-               }
-
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
-                       cluster_fatal("ctdbd died\n");
-               }
-       }
 
+       status = ctdb_read_packet(conn->fd, mem_ctx, &hdr);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("Could not read ctdb_packet: %s\n", nt_errstr(status)));
+               DEBUG(0, ("ctdb_read_packet failed: %s\n", nt_errstr(status)));
                cluster_fatal("ctdbd died\n");
        }
 
-       hdr = (struct ctdb_req_header *)state.req.data;
-
        DEBUG(11, ("Received ctdb packet\n"));
        ctdb_packet_dump(hdr);
 
@@ -481,7 +442,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
                }
 
                if (!(msg_state->rec = ctdb_pull_messaging_rec(
-                             msg_state, state.req.length, msg))) {
+                             msg_state, msg->hdr.length, msg))) {
                        DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
                        TALLOC_FREE(msg_state);
                        TALLOC_FREE(hdr);
@@ -519,11 +480,16 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
                goto next_pkt;
        }
 
-       *((void **)result) = talloc_move(mem_ctx, &hdr);
+       *result = talloc_move(mem_ctx, &hdr);
 
        return NT_STATUS_OK;
 }
 
+static int ctdbd_connection_destructor(struct ctdbd_connection *c)
+{
+       close(c->fd);
+       return 0;
+}
 /*
  * Get us a ctdbd connection
  */
@@ -532,6 +498,7 @@ static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
                                      struct ctdbd_connection **pconn)
 {
        struct ctdbd_connection *conn;
+       int ret;
        NTSTATUS status;
 
        if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
@@ -539,12 +506,13 @@ static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
                return NT_STATUS_NO_MEMORY;
        }
 
-       status = ctdbd_connect(conn, &conn->pkt);
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status)));
+       ret = ctdbd_connect(&conn->fd);
+       if (ret != 0) {
+               status = map_nt_error_from_unix(errno);
+               DEBUG(10, ("ctdbd_connect failed: %s\n", strerror(errno)));
                goto fail;
        }
+       talloc_set_destructor(conn, ctdbd_connection_destructor);
 
        status = get_cluster_vnn(conn, &conn->our_vnn);
 
@@ -624,29 +592,27 @@ struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
 
 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
 {
-       return ctdb_packet_get_fd(conn->pkt);
+       return conn->fd;
 }
 
 /*
  * Packet handler to receive and handle a ctdb message
  */
-static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
-                                   void *private_data)
+static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
+                                   struct ctdbd_connection *conn,
+                                   struct ctdb_req_header *hdr)
 {
-       struct ctdbd_connection *conn = talloc_get_type_abort(
-               private_data, struct ctdbd_connection);
        struct ctdb_req_message *msg;
        struct messaging_rec *msg_rec;
 
-       msg = (struct ctdb_req_message *)buf;
-
-       if (msg->hdr.operation != CTDB_REQ_MESSAGE) {
+       if (hdr->operation != CTDB_REQ_MESSAGE) {
                DEBUG(0, ("Received async msg of type %u, discarding\n",
-                         msg->hdr.operation));
-               TALLOC_FREE(buf);
+                         hdr->operation));
                return NT_STATUS_INVALID_PARAMETER;
        }
 
+       msg = (struct ctdb_req_message *)hdr;
+
        if ((conn->release_ip_handler != NULL)
            && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
                bool ret;
@@ -664,7 +630,6 @@ static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
                        conn->release_ip_handler = NULL;
                        conn->release_ip_priv = NULL;
                }
-               TALLOC_FREE(buf);
                return NT_STATUS_OK;
        }
 
@@ -682,7 +647,6 @@ static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
                               messaging_server_id(conn->msg_ctx),
                               MSG_SMB_BRL_VALIDATE, &data_blob_null);
 
-               TALLOC_FREE(buf);
                return NT_STATUS_OK;
        }
 
@@ -690,20 +654,15 @@ static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
        if (msg->srvid != getpid() && msg->srvid != MSG_SRVID_SAMBA) {
                DEBUG(0,("Got unexpected message with srvid=%llu\n", 
                         (unsigned long long)msg->srvid));
-               TALLOC_FREE(buf);
                return NT_STATUS_OK;
        }
 
-       if (!(msg_rec = ctdb_pull_messaging_rec(NULL, length, msg))) {
+       msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg);
+       if (msg_rec == NULL) {
                DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
-               TALLOC_FREE(buf);
                return NT_STATUS_NO_MEMORY;
        }
-
        messaging_dispatch_rec(conn->msg_ctx, msg_rec);
-
-       TALLOC_FREE(msg_rec);
-       TALLOC_FREE(buf);
        return NT_STATUS_OK;
 }
 
@@ -718,22 +677,19 @@ static void ctdbd_socket_handler(struct tevent_context *event_ctx,
 {
        struct ctdbd_connection *conn = talloc_get_type_abort(
                private_data, struct ctdbd_connection);
-
+       struct ctdb_req_header *hdr;
        NTSTATUS status;
 
-       status = ctdb_packet_fd_read(conn->pkt);
-
+       status = ctdb_read_packet(conn->fd, talloc_tos(), &hdr);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
+               DEBUG(0, ("ctdb_read_packet failed: %s\n", nt_errstr(status)));
                cluster_fatal("ctdbd died\n");
        }
 
-       while (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
-                             ctdb_handle_message, conn, &status)) {
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(10, ("could not handle incoming message: %s\n",
-                                  nt_errstr(status)));
-               }
+       status = ctdb_handle_message(conn->msg_ctx, conn, hdr);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("could not handle incoming message: %s\n",
+                          nt_errstr(status)));
        }
 }
 
@@ -749,7 +705,7 @@ NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
 
        if (!(conn->fde = tevent_add_fd(messaging_tevent_context(msg_ctx),
                                       conn,
-                                      ctdb_packet_get_fd(conn->pkt),
+                                      conn->fd,
                                       TEVENT_FD_READ,
                                       ctdbd_socket_handler,
                                       conn))) {
@@ -795,7 +751,8 @@ NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
                                   const uint8_t *buf, size_t buflen)
 {
        struct ctdb_req_message r;
-       NTSTATUS status;
+       struct iovec iov[2];
+       ssize_t nwritten;
 
        r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
        r.hdr.ctdb_magic = CTDB_MAGIC;
@@ -811,21 +768,17 @@ NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
        DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
        ctdb_packet_dump(&r.hdr);
 
-       status = ctdb_packet_send(
-               conn->pkt, 2,
-               data_blob_const(&r, offsetof(struct ctdb_req_message, data)),
-               data_blob_const(buf, buflen));
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
-               return status;
-       }
+       iov[0].iov_base = &r;
+       iov[0].iov_len = offsetof(struct ctdb_req_message, data);
+       iov[1].iov_base = discard_const_p(uint8_t, buf);
+       iov[1].iov_len = buflen;
 
-       status = ctdb_packet_flush(conn->pkt);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
+       nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+       if (nwritten == -1) {
+               DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
                cluster_fatal("cluster dispatch daemon msg write error\n");
        }
+
        return NT_STATUS_OK;
 }
 
@@ -840,8 +793,11 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
                              int *cstatus)
 {
        struct ctdb_req_control req;
+       struct ctdb_req_header *hdr;
        struct ctdb_reply_control *reply = NULL;
        struct ctdbd_connection *new_conn = NULL;
+       struct iovec iov[2];
+       ssize_t nwritten;
        NTSTATUS status;
 
        if (conn == NULL) {
@@ -871,21 +827,15 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
        DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
        ctdb_packet_dump(&req.hdr);
 
-       status = ctdb_packet_send(
-               conn->pkt, 2,
-               data_blob_const(&req, offsetof(struct ctdb_req_control, data)),
-               data_blob_const(data.dptr, data.dsize));
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
-               goto fail;
-       }
+       iov[0].iov_base = &req;
+       iov[0].iov_len = offsetof(struct ctdb_req_control, data);
+       iov[1].iov_base = data.dptr;
+       iov[1].iov_len = data.dsize;
 
-       status = ctdb_packet_flush(conn->pkt);
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
-               cluster_fatal("cluster dispatch daemon control write error\n");
+       nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+       if (nwritten == -1) {
+               DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
+               cluster_fatal("cluster dispatch daemon msg write error\n");
        }
 
        if (flags & CTDB_CTRL_FLAG_NOREPLY) {
@@ -896,17 +846,18 @@ static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
                return NT_STATUS_OK;
        }
 
-       status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
+       status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
 
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
                goto fail;
        }
 
-       if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
+       if (hdr->operation != CTDB_REPLY_CONTROL) {
                DEBUG(0, ("received invalid reply\n"));
                goto fail;
        }
+       reply = (struct ctdb_reply_control *)hdr;
 
        if (outdata) {
                if (!(outdata->dptr = (uint8 *)talloc_memdup(
@@ -964,6 +915,8 @@ bool ctdb_processes_exist(struct ctdbd_connection *conn,
        for (i=0; i<num_pids; i++) {
                struct ctdb_req_control req;
                pid_t pid;
+               struct iovec iov[2];
+               ssize_t nwritten;
 
                results[i] = false;
                reqids[i] = ctdbd_next_reqid(conn);
@@ -995,42 +948,39 @@ bool ctdb_processes_exist(struct ctdbd_connection *conn,
                DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
                ctdb_packet_dump(&req.hdr);
 
-               status = ctdb_packet_send(
-                       conn->pkt, 2,
-                       data_blob_const(
-                               &req, offsetof(struct ctdb_req_control, data)),
-                       data_blob_const(&pid, sizeof(pid)));
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(10, ("ctdb_packet_send failed: %s\n",
-                                  nt_errstr(status)));
+               iov[0].iov_base = &req;
+               iov[0].iov_len = offsetof(struct ctdb_req_control, data);
+               iov[1].iov_base = &pid;
+               iov[1].iov_len = sizeof(pid);
+
+               nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+               if (nwritten == -1) {
+                       status = map_nt_error_from_unix(errno);
+                       DEBUG(10, ("write_data_iov failed: %s\n",
+                                  strerror(errno)));
                        goto fail;
                }
        }
 
-       status = ctdb_packet_flush(conn->pkt);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(10, ("ctdb_packet_flush failed: %s\n",
-                          nt_errstr(status)));
-               goto fail;
-       }
-
        num_received = 0;
 
        while (num_received < num_pids) {
-               struct ctdb_reply_control *reply = NULL;
+               struct ctdb_req_header *hdr;
+               struct ctdb_reply_control *reply;
                uint32_t reqid;
 
-               status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply);
+               status = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(10, ("ctdb_read_req failed: %s\n",
                                   nt_errstr(status)));
                        goto fail;
                }
 
-               if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
+               if (hdr->operation != CTDB_REPLY_CONTROL) {
                        DEBUG(10, ("Received invalid reply\n"));
                        goto fail;
                }
+               reply = (struct ctdb_reply_control *)hdr;
 
                reqid = reply->hdr.reqid;
 
@@ -1177,6 +1127,8 @@ bool ctdb_serverids_exist(struct ctdbd_connection *conn,
        for (i=0; i<num_vnns; i++) {
                struct ctdb_vnn_list *vnn = &vnns[i];
                struct ctdb_req_control req;
+               struct iovec iov[2];
+               ssize_t nwritten;
 
                vnn->reqid = ctdbd_next_reqid(conn);
 
@@ -1200,46 +1152,42 @@ bool ctdb_serverids_exist(struct ctdbd_connection *conn,
                DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
                ctdb_packet_dump(&req.hdr);
 
-               status = ctdb_packet_send(
-                       conn->pkt, 2,
-                       data_blob_const(
-                               &req, offsetof(struct ctdb_req_control,
-                                              data)),
-                       data_blob_const(vnn->srvids, req.datalen));
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(1, ("ctdb_packet_send failed: %s\n",
-                                 nt_errstr(status)));
+               iov[0].iov_base = &req;
+               iov[0].iov_len = offsetof(struct ctdb_req_control, data);
+               iov[1].iov_base = vnn->srvids;
+               iov[1].iov_len = req.datalen;
+
+               nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+               if (nwritten == -1) {
+                       status = map_nt_error_from_unix(errno);
+                       DEBUG(10, ("write_data_iov failed: %s\n",
+                                  strerror(errno)));
                        goto fail;
                }
        }
 
-       status = ctdb_packet_flush(conn->pkt);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(1, ("ctdb_packet_flush failed: %s\n",
-                         nt_errstr(status)));
-               goto fail;
-       }
-
        num_received = 0;
 
        while (num_received < num_vnns) {
-               struct ctdb_reply_control *reply = NULL;
+               struct ctdb_req_header *hdr;
+               struct ctdb_reply_control *reply;
                struct ctdb_vnn_list *vnn;
                uint32_t reqid;
                uint8_t *reply_data;
 
-               status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply);
+               status = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(1, ("ctdb_read_req failed: %s\n",
                                  nt_errstr(status)));
                        goto fail;
                }
 
-               if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
+               if (hdr->operation != CTDB_REPLY_CONTROL) {
                        DEBUG(1, ("Received invalid reply %u\n",
                                  (unsigned)reply->hdr.operation));
                        goto fail;
                }
+               reply = (struct ctdb_reply_control *)hdr;
 
                reqid = reply->hdr.reqid;
 
@@ -1394,7 +1342,9 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
                       TDB_DATA key)
 {
        struct ctdb_req_call req;
-       struct ctdb_reply_call *reply;
+       struct ctdb_req_header *hdr;
+       struct iovec iov[2];
+       ssize_t nwritten;
        NTSTATUS status;
 
        ZERO_STRUCT(req);
@@ -1412,31 +1362,25 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
        DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
        ctdb_packet_dump(&req.hdr);
 
-       status = ctdb_packet_send(
-               conn->pkt, 2,
-               data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
-               data_blob_const(key.dptr, key.dsize));
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
-               return status;
-       }
+       iov[0].iov_base = &req;
+       iov[0].iov_len = offsetof(struct ctdb_req_call, data);
+       iov[1].iov_base = key.dptr;
+       iov[1].iov_len = key.dsize;
 
-       status = ctdb_packet_flush(conn->pkt);
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
-               cluster_fatal("cluster dispatch daemon control write error\n");
+       nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+       if (nwritten == -1) {
+               DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
+               cluster_fatal("cluster dispatch daemon msg write error\n");
        }
 
-       status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
+       status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
 
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
                goto fail;
        }
 
-       if (reply->hdr.operation != CTDB_REPLY_CALL) {
+       if (hdr->operation != CTDB_REPLY_CALL) {
                DEBUG(0, ("received invalid reply\n"));
                status = NT_STATUS_INTERNAL_ERROR;
                goto fail;
@@ -1445,7 +1389,7 @@ NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
        status = NT_STATUS_OK;
  fail:
 
-       TALLOC_FREE(reply);
+       TALLOC_FREE(hdr);
        return status;
 }
 
@@ -1459,7 +1403,10 @@ NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
                     void *private_data)
 {
        struct ctdb_req_call req;
+       struct ctdb_req_header *hdr = NULL;
        struct ctdb_reply_call *reply;
+       struct iovec iov[2];
+       ssize_t nwritten;
        NTSTATUS status;
        uint32_t flags;
 
@@ -1481,35 +1428,30 @@ NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
        req.db_id            = db_id;
        req.keylen           = key.dsize;
 
-       status = ctdb_packet_send(
-               conn->pkt, 2,
-               data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
-               data_blob_const(key.dptr, key.dsize));
+       iov[0].iov_base = &req;
+       iov[0].iov_len = offsetof(struct ctdb_req_call, data);
+       iov[1].iov_base = key.dptr;
+       iov[1].iov_len = key.dsize;
 
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
-               return status;
-       }
-
-       status = ctdb_packet_flush(conn->pkt);
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
-               cluster_fatal("cluster dispatch daemon control write error\n");
+       nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
+       if (nwritten == -1) {
+               DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
+               cluster_fatal("cluster dispatch daemon msg write error\n");
        }
 
-       status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
+       status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
 
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
                goto fail;
        }
 
-       if (reply->hdr.operation != CTDB_REPLY_CALL) {
+       if (hdr->operation != CTDB_REPLY_CALL) {
                DEBUG(0, ("received invalid reply\n"));
                status = NT_STATUS_INTERNAL_ERROR;
                goto fail;
        }
+       reply = (struct ctdb_reply_call *)hdr;
 
        if (reply->datalen == 0) {
                /*
@@ -1524,72 +1466,10 @@ NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
 
        status = NT_STATUS_OK;
  fail:
-       TALLOC_FREE(reply);
+       TALLOC_FREE(hdr);
        return status;
 }
 
-struct ctdbd_traverse_state {
-       void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data);
-       void *private_data;
-};
-
-/*
- * Handle a traverse record coming in on the ctdbd connection
- */
-
-static NTSTATUS ctdb_traverse_handler(uint8_t *buf, size_t length,
-                                     void *private_data)
-{
-       struct ctdbd_traverse_state *state =
-               (struct ctdbd_traverse_state *)private_data;
-
-       struct ctdb_req_message *m;
-       struct ctdb_rec_data *d;
-       TDB_DATA key, data;
-
-       m = (struct ctdb_req_message *)buf;
-
-       if (length < sizeof(*m) || m->hdr.length != length) {
-               DEBUG(0, ("Got invalid message of length %d\n", (int)length));
-               TALLOC_FREE(buf);
-               return NT_STATUS_UNEXPECTED_IO_ERROR;
-       }
-
-       d = (struct ctdb_rec_data *)&m->data[0];
-       if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
-               DEBUG(0, ("Got invalid traverse data of length %d\n",
-                         (int)m->datalen));
-               TALLOC_FREE(buf);
-               return NT_STATUS_UNEXPECTED_IO_ERROR;
-       }
-
-       key.dsize = d->keylen;
-       key.dptr  = &d->data[0];
-       data.dsize = d->datalen;
-       data.dptr = &d->data[d->keylen];                
-
-       if (key.dsize == 0 && data.dsize == 0) {
-               /* end of traverse */
-               return NT_STATUS_END_OF_FILE;
-       }
-
-       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               DEBUG(0, ("Got invalid ltdb header length %d\n",
-                         (int)data.dsize));
-               TALLOC_FREE(buf);
-               return NT_STATUS_UNEXPECTED_IO_ERROR;
-       }
-       data.dsize -= sizeof(struct ctdb_ltdb_header);
-       data.dptr += sizeof(struct ctdb_ltdb_header);
-
-       if (state->fn) {
-               state->fn(key, data, state->private_data);
-       }
-
-       TALLOC_FREE(buf);
-       return NT_STATUS_OK;
-}
-
 /*
   Traverse a ctdb database. This uses a kind-of hackish way to open a second
   connection to ctdbd to avoid the hairy recursive and async problems with
@@ -1604,10 +1484,9 @@ NTSTATUS ctdbd_traverse(uint32_t db_id,
        struct ctdbd_connection *conn;
        NTSTATUS status;
 
-       TDB_DATA data;
+       TDB_DATA key, data;
        struct ctdb_traverse_start t;
        int cstatus;
-       struct ctdbd_traverse_state state;
 
        become_root();
        status = ctdbd_init_connection(NULL, &conn);
@@ -1640,57 +1519,63 @@ NTSTATUS ctdbd_traverse(uint32_t db_id,
                         */
                        status = NT_STATUS_UNSUCCESSFUL;
                }
-               goto done;
+               TALLOC_FREE(conn);
+               return status;
        }
 
-       state.fn = fn;
-       state.private_data = private_data;
-
        while (True) {
+               struct ctdb_req_header *hdr;
+               struct ctdb_req_message *m;
+               struct ctdb_rec_data *d;
 
-               status = NT_STATUS_OK;
-
-               if (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
-                                  ctdb_traverse_handler, &state, &status)) {
-
-                       if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
-                               status = NT_STATUS_OK;
-                               break;
-                       }
+               status = ctdb_read_packet(conn->fd, conn, &hdr);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(0, ("ctdb_read_packet failed: %s\n",
+                                 nt_errstr(status)));
+                       cluster_fatal("ctdbd died\n");
+               }
 
-                       /*
-                        * There might be more in the queue
-                        */
-                       continue;
+               if (hdr->operation != CTDB_REQ_MESSAGE) {
+                       DEBUG(0, ("Got operation %u, expected a message\n",
+                                 (unsigned)hdr->operation));
+                       TALLOC_FREE(conn);
+                       return NT_STATUS_UNEXPECTED_IO_ERROR;
                }
 
-               if (!NT_STATUS_IS_OK(status)) {
-                       break;
+               m = (struct ctdb_req_message *)hdr;
+               d = (struct ctdb_rec_data *)&m->data[0];
+               if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
+                       DEBUG(0, ("Got invalid traverse data of length %d\n",
+                                 (int)m->datalen));
+                       TALLOC_FREE(conn);
+                       return NT_STATUS_UNEXPECTED_IO_ERROR;
                }
 
-               status = ctdb_packet_fd_read_sync(conn->pkt);
+               key.dsize = d->keylen;
+               key.dptr  = &d->data[0];
+               data.dsize = d->datalen;
+               data.dptr = &d->data[d->keylen];
 
-               if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
-                       /*
-                        * There might be more in the queue
-                        */
-                       continue;
+               if (key.dsize == 0 && data.dsize == 0) {
+                       /* end of traverse */
+                       TALLOC_FREE(conn);
+                       return NT_STATUS_OK;
                }
 
-               if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
-                       status = NT_STATUS_OK;
-                       break;
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       DEBUG(0, ("Got invalid ltdb header length %d\n",
+                                 (int)data.dsize));
+                       TALLOC_FREE(conn);
+                       return NT_STATUS_UNEXPECTED_IO_ERROR;
                }
+               data.dsize -= sizeof(struct ctdb_ltdb_header);
+               data.dptr += sizeof(struct ctdb_ltdb_header);
 
-               if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(0, ("ctdb_packet_fd_read_sync failed: %s\n", nt_errstr(status)));
-                       cluster_fatal("ctdbd died\n");
+               if (fn != NULL) {
+                       fn(key, data, private_data);
                }
        }
-
- done:
-       TALLOC_FREE(conn);
-       return status;
+       return NT_STATUS_OK;
 }
 
 /*