s3: Add a second ctdb connect library
authorVolker Lendecke <vl@samba.org>
Wed, 15 Feb 2012 11:26:59 +0000 (12:26 +0100)
committerVolker Lendecke <vl@samba.org>
Tue, 17 Apr 2012 08:21:01 +0000 (10:21 +0200)
The existing one is not async at all.

source3/Makefile.in
source3/lib/ctdb_conn.c [new file with mode: 0644]
source3/lib/ctdb_conn.h [new file with mode: 0644]
source3/torture/proto.h
source3/torture/test_ctdbconn.c [new file with mode: 0644]
source3/torture/torture.c
source3/wscript_build

index 7a565058224b8fe8a2de1efae2ede1e2606d42fe..c8fb256de76ba8f13d3e5b31f1f9cea5de459837 100644 (file)
@@ -433,6 +433,7 @@ CRYPTO_OBJ = ../lib/crypto/crc32.o @CRYPTO_MD5_OBJ@ \
 LIB_OBJ = $(LIBSAMBAUTIL_OBJ) $(UTIL_OBJ) $(CRYPTO_OBJ) $(LIBTSOCKET_OBJ) \
          lib/messages.o librpc/gen_ndr/ndr_messaging.o lib/messages_local.o \
          lib/messages_ctdbd.o lib/ctdb_packet.o lib/ctdbd_conn.o \
+         lib/ctdb_conn.o \
          lib/id_cache.o \
          ../lib/socket/interfaces.o lib/memcache.o \
          lib/talloc_dict.o \
@@ -1273,6 +1274,7 @@ SMBTORTURE_OBJ1 = torture/torture.o torture/nbio.o torture/scanner.o torture/uta
                torture/test_chain3.o \
                torture/test_authinfo_structs.o \
                torture/test_cleanup.o \
+               torture/test_ctdbconn.o \
                torture/t_strappend.o
 
 SMBTORTURE_OBJ = $(SMBTORTURE_OBJ1) $(PARAM_OBJ) $(TLDAP_OBJ) \
diff --git a/source3/lib/ctdb_conn.c b/source3/lib/ctdb_conn.c
new file mode 100644 (file)
index 0000000..a96615f
--- /dev/null
@@ -0,0 +1,603 @@
+/*
+   Unix SMB/CIFS implementation.
+   Samba3 ctdb connection handling
+   Copyright (C) Volker Lendecke 2012
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/util/tevent_unix.h"
+#include "ctdb_conn.h"
+
+#ifdef CLUSTER_SUPPORT
+
+#include "lib/async_req/async_sock.h"
+
+struct ctdb_conn {
+       int fd;
+       struct tevent_queue *outqueue;
+};
+
+struct ctdb_conn_init_state {
+       struct sockaddr_un addr;
+       struct ctdb_conn *conn;
+};
+
+static void ctdb_conn_init_done(struct tevent_req *subreq);
+static int ctdb_conn_destructor(struct ctdb_conn *conn);
+
+struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      const char *sock)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_conn_init_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdb_conn_init_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       if (!lp_clustering()) {
+               tevent_req_error(req, ENOSYS);
+               return tevent_req_post(req, ev);
+       }
+
+       if (strlen(sock) >= sizeof(state->addr.sun_path)) {
+               tevent_req_error(req, ENAMETOOLONG);
+               return tevent_req_post(req, ev);
+       }
+
+       state->conn = talloc(state, struct ctdb_conn);
+       if (tevent_req_nomem(state->conn, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       state->conn->outqueue = tevent_queue_create(
+               state->conn, "ctdb outqueue");
+       if (tevent_req_nomem(state->conn->outqueue, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       state->conn->fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (state->conn->fd == -1) {
+               tevent_req_error(req, errno);
+               return tevent_req_post(req, ev);
+       }
+       talloc_set_destructor(state->conn, ctdb_conn_destructor);
+
+       state->addr.sun_family = AF_UNIX;
+       strncpy(state->addr.sun_path, sock, sizeof(state->addr.sun_path));
+
+       subreq = async_connect_send(state, ev, state->conn->fd,
+                                   (struct sockaddr *)&state->addr,
+                                   sizeof(state->addr));
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_init_done, req);
+       return req;
+}
+
+static int ctdb_conn_destructor(struct ctdb_conn *c)
+{
+       if (c->fd != -1) {
+               close(c->fd);
+               c->fd = -1;
+       }
+       return 0;
+}
+
+static void ctdb_conn_init_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       int ret, err;
+
+       ret = async_connect_recv(subreq, &err);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       tevent_req_done(req);
+}
+
+int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                       struct ctdb_conn **pconn)
+{
+       struct ctdb_conn_init_state *state = tevent_req_data(
+               req, struct ctdb_conn_init_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       *pconn = talloc_move(mem_ctx, &state->conn);
+
+       return 0;
+}
+
+struct ctdb_conn_control_state {
+       struct tevent_context *ev;
+       struct ctdb_conn *conn;
+       struct ctdb_req_control req;
+       struct iovec iov[2];
+       struct ctdb_reply_control *reply;
+};
+
+static void ctdb_conn_control_written(struct tevent_req *subreq);
+static void ctdb_conn_control_done(struct tevent_req *subreq);
+static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p);
+
+struct tevent_req *ctdb_conn_control_send(TALLOC_CTX *mem_ctx,
+                                         struct tevent_context *ev,
+                                         struct ctdb_conn *conn,
+                                         uint32_t vnn, uint32_t opcode,
+                                         uint64_t srvid, uint32_t flags,
+                                         uint8_t *data, size_t datalen)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_conn_control_state *state;
+       struct ctdb_req_header *hdr;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_conn_control_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->conn = conn;
+
+       hdr = &state->req.hdr;
+       hdr->length = offsetof(struct ctdb_req_control, data) + datalen;
+       hdr->ctdb_magic    = CTDB_MAGIC;
+       hdr->ctdb_version  = CTDB_VERSION;
+       hdr->operation     = CTDB_REQ_CONTROL;
+       hdr->reqid         = 1; /* FIXME */
+       hdr->destnode      = vnn;
+       state->req.opcode  = opcode;
+       state->req.srvid   = srvid;
+       state->req.datalen = datalen;
+       state->req.flags   = flags;
+
+       state->iov[0].iov_base = &state->req;
+       state->iov[0].iov_len = offsetof(struct ctdb_req_control, data);
+       state->iov[1].iov_base = data;
+       state->iov[1].iov_len = datalen;
+
+       subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
+                            state->iov, 2);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_control_written, req);
+       return req;
+}
+
+static void ctdb_conn_control_written(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_control_state *state = tevent_req_data(
+               req, struct ctdb_conn_control_state);
+       ssize_t written;
+       int err;
+
+       written = writev_recv(subreq, &err);
+       TALLOC_FREE(subreq);
+       if (written == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       subreq = read_packet_send(
+               state, state->ev, state->conn->fd, sizeof(uint32_t),
+               ctdb_packet_more, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_control_done, req);
+}
+
+static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p)
+{
+       uint32_t len;
+
+       if (buflen > sizeof(uint32_t)) {
+               /* Been here, done */
+               return 0;
+       }
+       memcpy(&len, buf, sizeof(len));
+       return (len - sizeof(uint32_t));
+}
+
+static void ctdb_conn_control_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_control_state *state = tevent_req_data(
+               req, struct ctdb_conn_control_state);
+       ssize_t nread;
+       uint8_t *buf;
+       int err;
+
+       nread = read_packet_recv(subreq, state, &buf, &err);
+       TALLOC_FREE(subreq);
+       if (nread == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       state->reply = (struct ctdb_reply_control *)buf;
+       tevent_req_done(req);
+}
+
+int ctdb_conn_control_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                          struct ctdb_reply_control **preply)
+{
+       struct ctdb_conn_control_state *state = tevent_req_data(
+               req, struct ctdb_conn_control_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       if (preply != NULL) {
+               *preply = talloc_move(mem_ctx, &state->reply);
+       }
+       return 0;
+}
+
+struct ctdb_conn_msg_write_state {
+       struct ctdb_req_message ctdb_msg;
+       struct iovec iov[2];
+};
+
+static void ctdb_conn_msg_write_done(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
+                                           struct tevent_context *ev,
+                                           struct ctdb_conn *conn,
+                                           uint32_t vnn, uint64_t srvid,
+                                           uint8_t *msg, size_t msg_len)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_conn_msg_write_state *state;
+       struct ctdb_req_header *h;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_conn_msg_write_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       h = &state->ctdb_msg.hdr;
+
+       h->length = offsetof(struct ctdb_req_message, data) + msg_len;
+       h->ctdb_magic = CTDB_MAGIC;
+       h->ctdb_version = CTDB_VERSION;
+       h->generation = 1;
+       h->operation  = CTDB_REQ_MESSAGE;
+       h->destnode   = vnn;
+       h->srcnode    = CTDB_CURRENT_NODE;
+       h->reqid      = 0;
+       state->ctdb_msg.srvid   = srvid;
+       state->ctdb_msg.datalen = msg_len;
+
+       state->iov[0].iov_base = &state->ctdb_msg;
+       state->iov[0].iov_len = offsetof(struct ctdb_req_message, data);
+       state->iov[1].iov_base = msg;
+       state->iov[1].iov_len = msg_len;
+
+       subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
+                            state->iov, 2);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_msg_write_done, req);
+       return req;
+}
+
+static void ctdb_conn_msg_write_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       ssize_t written;
+       int err;
+
+       written = writev_recv(subreq, &err);
+       TALLOC_FREE(subreq);
+       if (written == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       tevent_req_done(req);
+}
+
+int ctdb_conn_msg_write_recv(struct tevent_req *req)
+{
+       int err;
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       return 0;
+}
+
+struct ctdb_msg_channel {
+       struct ctdb_conn *conn;
+};
+
+struct ctdb_msg_channel_init_state {
+       struct tevent_context *ev;
+       struct ctdb_conn *conn;
+       uint64_t srvid;
+       struct ctdb_msg_channel *channel;
+};
+
+static void ctdb_msg_channel_init_connected(struct tevent_req *subreq);
+static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_msg_channel_init_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       const char *sock, uint64_t srvid)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_msg_channel_init_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_msg_channel_init_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->srvid = srvid;
+
+       subreq = ctdb_conn_init_send(state, ev, sock);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_msg_channel_init_connected, req);
+       return req;
+}
+
+static void ctdb_msg_channel_init_connected(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_msg_channel_init_state *state = tevent_req_data(
+               req, struct ctdb_msg_channel_init_state);
+       int ret;
+
+       ret = ctdb_conn_init_recv(subreq, state, &state->conn);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       subreq = ctdb_conn_control_send(state, state->ev, state->conn,
+                                       CTDB_CURRENT_NODE,
+                                       CTDB_CONTROL_REGISTER_SRVID,
+                                       state->srvid, 0, NULL, 0);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(
+               subreq, ctdb_msg_channel_init_registered_srvid, req);
+}
+
+static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_msg_channel_init_state *state = tevent_req_data(
+               req, struct ctdb_msg_channel_init_state);
+       struct ctdb_reply_control *reply;
+       int ret;
+
+       ret = ctdb_conn_control_recv(subreq, talloc_tos(), &reply);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       if (reply->status != 0) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+       state->channel = talloc(state, struct ctdb_msg_channel);
+       if (tevent_req_nomem(state->channel, req)) {
+               return;
+       }
+       state->channel->conn = talloc_move(state->channel, &state->conn);
+       tevent_req_done(req);
+}
+
+int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                              struct ctdb_msg_channel **pchannel)
+{
+       struct ctdb_msg_channel_init_state *state = tevent_req_data(
+               req, struct ctdb_msg_channel_init_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       *pchannel = talloc_move(mem_ctx, &state->channel);
+       return 0;
+}
+
+struct ctdb_msg_read_state {
+       size_t buflen;
+       uint8_t *buf;
+};
+
+static void ctdb_msg_channel_got_msg(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct ctdb_msg_channel *channel)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_msg_read_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_msg_read_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       subreq = read_packet_send(state, ev, channel->conn->fd,
+               sizeof(uint32_t), ctdb_packet_more, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_msg_channel_got_msg, req);
+       return req;
+}
+
+static void ctdb_msg_channel_got_msg(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_msg_read_state *state = tevent_req_data(
+               req, struct ctdb_msg_read_state);
+       ssize_t nread;
+       uint8_t *buf;
+       int err;
+
+       nread = read_packet_recv(subreq, state, &buf, &err);
+       if (nread == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       state->buflen = nread;
+       state->buf = buf;
+       tevent_req_done(req);
+}
+
+int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                      uint8_t **pmsg, size_t *pmsg_len)
+{
+       struct ctdb_msg_read_state *state = tevent_req_data(
+               req, struct ctdb_msg_read_state);
+       struct ctdb_req_header *hdr;
+       struct ctdb_req_message *msg;
+       uint8_t *buf;
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+
+       hdr = (struct ctdb_req_header *)state->buf;
+       if (hdr->length != state->buflen) {
+               DEBUG(10, ("Got invalid header length\n"));
+               return EIO;
+       }
+       if (hdr->operation != CTDB_REQ_MESSAGE) {
+               DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
+                          CTDB_REQ_MESSAGE, (int)hdr->operation));
+               return EIO;
+       }
+       if (hdr->length < offsetof(struct ctdb_req_message, data)) {
+               DEBUG(10, ("Got short msg, len=%d\n", (int)hdr->length));
+               return EIO;
+       }
+
+       msg = (struct ctdb_req_message *)hdr;
+       if (msg->datalen >
+           hdr->length - offsetof(struct ctdb_req_message, data)) {
+               DEBUG(10, ("Got invalid datalen %d\n", (int)msg->datalen));
+               return EIO;
+       }
+
+       buf = (uint8_t *)talloc_memdup(mem_ctx, msg->data, msg->datalen);
+       if (buf == NULL) {
+               return ENOMEM;
+       }
+       *pmsg = buf;
+       *pmsg_len = msg->datalen;
+       return 0;
+}
+
+#else
+
+struct dummy_state {
+       uint8_t dummy;
+};
+
+static struct tevent_req *dummy_send(TALLOC_CTX *mem_ctx,
+                                    struct tevent_context *ev)
+{
+       struct tevent_req *req;
+       struct dummy_state *state;
+       req = tevent_req_create(mem_ctx, &state, struct dummy_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       tevent_req_done(req);
+       return tevent_req_post(req, ev);
+}
+
+struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      const char *sock)
+{
+       return dummy_send(mem_ctx, ev);
+}
+
+int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                       struct ctdb_conn **pconn)
+{
+       return ENOSYS;
+}
+
+struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
+                                           struct tevent_context *ev,
+                                           struct ctdb_conn *conn,
+                                           uint32_t vnn, uint64_t srvid,
+                                           uint8_t *msg, size_t msg_len)
+{
+       return dummy_send(mem_ctx, ev);
+}
+
+int ctdb_conn_msg_write_recv(struct tevent_req *req)
+{
+       return ENOSYS;
+}
+
+struct tevent_req *ctdb_msg_channel_init_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       const char *sock, uint64_t srvid)
+{
+       return dummy_send(mem_ctx, ev);
+}
+
+int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                              struct ctdb_msg_channel **pchannel)
+{
+       return ENOSYS;
+}
+
+struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct ctdb_msg_channel *channel)
+{
+       return dummy_send(mem_ctx, ev);
+}
+
+int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                      uint8_t **pmsg, size_t *pmsg_len)
+{
+       return ENOSYS;
+}
+
+#endif
diff --git a/source3/lib/ctdb_conn.h b/source3/lib/ctdb_conn.h
new file mode 100644 (file)
index 0000000..9229536
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+   Unix SMB/CIFS implementation.
+   Samba3 ctdb connection handling
+   Copyright (C) Volker Lendecke 2012
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CTDB_CONN_H
+#define _CTDB_CONN_H
+
+#ifdef CLUSTER_SUPPORT
+
+#include <tdb.h>
+#include <ctdb_protocol.h>
+
+#else /* CLUSTER_SUPPORT */
+
+struct ctdb_reply_control;
+
+#endif /* CLUSTER_SUPPORT */
+
+#include "tevent.h"
+#include "librpc/gen_ndr/messaging.h"
+
+struct ctdb_conn;
+
+struct tevent_req *ctdb_conn_control_send(TALLOC_CTX *mem_ctx,
+                                         struct tevent_context *ev,
+                                         struct ctdb_conn *conn,
+                                         uint32_t vnn, uint32_t opcode,
+                                         uint64_t srvid, uint32_t flags,
+                                         uint8_t *data, size_t datalen);
+int ctdb_conn_control_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                          struct ctdb_reply_control **preply);
+
+struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      const char *sock);
+int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                       struct ctdb_conn **pconn);
+
+struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
+                                           struct tevent_context *ev,
+                                           struct ctdb_conn *conn,
+                                           uint32_t vnn, uint64_t srvid,
+                                           uint8_t *msg, size_t msg_len);
+int ctdb_conn_msg_write_recv(struct tevent_req *req);
+
+struct ctdb_msg_channel;
+
+struct tevent_req *ctdb_msg_channel_init_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       const char *sock, uint64_t srvid);
+int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                              struct ctdb_msg_channel **pchannel);
+
+struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
+                                     struct tevent_context *ev,
+                                     struct ctdb_msg_channel *channel);
+int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                      uint8_t **pmsg, size_t *pmsg_len);
+
+#endif /* _CTDB_CONN_H */
index e65b2722d0a1d320cb1a360534efcea65b395ff2..6e2a00450f4484ae9706676ee2dfde9bf11f9ad0 100644 (file)
@@ -104,5 +104,6 @@ bool run_local_conv_auth_info(int dummy);
 bool run_local_sprintf_append(int dummy);
 bool run_cleanup1(int dummy);
 bool run_cleanup2(int dummy);
+bool run_ctdb_conn(int dummy);
 
 #endif /* __TORTURE_H__ */
diff --git a/source3/torture/test_ctdbconn.c b/source3/torture/test_ctdbconn.c
new file mode 100644 (file)
index 0000000..539e224
--- /dev/null
@@ -0,0 +1,247 @@
+/*
+   Unix SMB/CIFS implementation.
+   Test new ctdb API
+   Copyright (C) Volker Lendecke 2012
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "torture/proto.h"
+
+#ifdef CLUSTER_SUPPORT
+
+#include "ctdb_conn.h"
+#include "lib/util/tevent_unix.h"
+#include "tdb.h"
+#include "ctdb_protocol.h"
+#include "messages.h"
+
+struct ctdb_conn_test_state {
+       struct tevent_context *ev;
+       struct ctdb_conn *conn;
+       struct ctdb_msg_channel *channel;
+       int msgno;
+};
+
+static void ctdb_conn_test_got_conn(struct tevent_req *subreq);
+static void ctdb_conn_test_got_pnn(struct tevent_req *subreq);
+static void ctdb_conn_test_got_channel(struct tevent_req *subreq);
+static void ctdb_conn_test_got_msg(struct tevent_req *subreq);
+static void ctdb_conn_test_msg_sent(struct tevent_req *subreq);
+
+static struct tevent_req *ctdb_conn_test_send(TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_conn_test_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdb_conn_test_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+
+       subreq = ctdb_conn_init_send(mem_ctx, ev, lp_ctdbd_socket());
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_got_conn, req);
+       return req;
+}
+
+static void ctdb_conn_test_got_conn(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_test_state *state = tevent_req_data(
+               req, struct ctdb_conn_test_state);
+       uint64_t ret;
+
+       ret = ctdb_conn_init_recv(subreq, state, &state->conn);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       subreq = ctdb_conn_control_send(state, state->ev, state->conn,
+                                       CTDB_CURRENT_NODE,
+                                       CTDB_CONTROL_GET_PNN, 0, 0, NULL, 0);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_got_pnn, req);
+}
+
+static void ctdb_conn_test_got_pnn(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_test_state *state = tevent_req_data(
+               req, struct ctdb_conn_test_state);
+       int ret;
+       struct ctdb_reply_control *reply;
+
+       ret = ctdb_conn_control_recv(subreq, talloc_tos(), &reply);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       printf("vnn=%d\n", (int)reply->status);
+
+       subreq = ctdb_msg_channel_init_send(
+               state, state->ev, lp_ctdbd_socket(), 999999);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_got_channel, req);
+}
+
+static void ctdb_conn_test_got_channel(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_test_state *state = tevent_req_data(
+               req, struct ctdb_conn_test_state);
+       int ret;
+
+       ret = ctdb_msg_channel_init_recv(subreq, state, &state->channel);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+
+       subreq = ctdb_msg_read_send(state, state->ev, state->channel);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_got_msg, req);
+
+       state->msgno += 1;
+
+       subreq = ctdb_conn_msg_write_send(
+               state, state->ev, state->conn, CTDB_CURRENT_NODE, 999999,
+               (uint8_t *)&state->msgno, sizeof(state->msgno));
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_msg_sent, req);
+}
+
+static void ctdb_conn_test_got_msg(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_test_state *state = tevent_req_data(
+               req, struct ctdb_conn_test_state);
+       uint8_t *buf;
+       size_t buf_len;
+       int ret;
+
+       ret = ctdb_msg_read_recv(subreq, talloc_tos(), &buf, &buf_len);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       if (buf_len != sizeof(int)) {
+               printf("got invalid msg\n");
+               tevent_req_error(req, EINVAL);
+               return;
+       }
+       memcpy(&ret, buf, buf_len);
+       printf("got msg %d\n", ret);
+       if (ret == 5) {
+               tevent_req_done(req);
+               return;
+       }
+
+       subreq = ctdb_msg_read_send(state, state->ev, state->channel);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_got_msg, req);
+}
+
+static void ctdb_conn_test_msg_sent(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_conn_test_state *state = tevent_req_data(
+               req, struct ctdb_conn_test_state);
+       int ret;
+
+       ret = ctdb_conn_msg_write_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       state->msgno += 1;
+
+       if (state->msgno >= 10) {
+               return;
+       }
+
+       subreq = ctdb_conn_msg_write_send(
+               state, state->ev, state->conn, CTDB_CURRENT_NODE, 999999,
+               (uint8_t *)&state->msgno, sizeof(state->msgno));
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_conn_test_msg_sent, req);
+}
+
+static int ctdb_conn_test_recv(struct tevent_req *req)
+{
+       int err;
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       return 0;
+}
+
+bool run_ctdb_conn(int dummy)
+{
+       struct tevent_context *ev;
+       struct tevent_req *req;
+       int ret;
+
+       ev = tevent_context_init(talloc_tos());
+       if (ev == NULL) {
+               fprintf(stderr, "tevent_context_init failed\n");
+               return false;
+       }
+       req = ctdb_conn_test_send(ev, ev);
+       if (req == NULL) {
+               fprintf(stderr, "ctdb_conn_test_send failed\n");
+               return false;
+       }
+       if (!tevent_req_poll(req, ev)) {
+               fprintf(stderr, "tevent_req_poll failed\n");
+               return false;
+       }
+       ret = ctdb_conn_test_recv(req);
+       TALLOC_FREE(req);
+       printf("ctdb_conn_test returned %s\n",
+              ret ? strerror(ret) : "success");
+       TALLOC_FREE(ev);
+       return (ret == 0);
+}
+
+#else /* CLUSTER_SUPPORT */
+
+bool run_ctdb_conn(int dummy)
+{
+       return true;
+}
+
+#endif
index 5214e1319d1cfd9d7dfdf6af2f6bf6e18b8ad96f..0c421b5342dc4c075e5a4006af7ecbfc8c11cb92 100644 (file)
@@ -8917,6 +8917,7 @@ static struct {
        { "LOCAL-SUBSTITUTE", run_local_substitute, 0},
        { "LOCAL-GENCACHE", run_local_gencache, 0},
        { "LOCAL-TALLOC-DICT", run_local_talloc_dict, 0},
+       { "LOCAL-CTDB-CONN", run_ctdb_conn, 0},
        { "LOCAL-BASE64", run_local_base64, 0},
        { "LOCAL-RBTREE", run_local_rbtree, 0},
        { "LOCAL-MEMCACHE", run_local_memcache, 0},
index f356e2b276068d4cfb251e05dffd1f0b33d6666b..af8cb84f4888912d8f8f155f4193f8e1cb2af61f 100755 (executable)
@@ -41,6 +41,7 @@ REG_PARSE_PRS_SRC = '''registry/reg_parse_prs.c'''
 LIB_SRC = '''
           lib/messages.c lib/messages_local.c
           lib/messages_ctdbd.c lib/ctdb_packet.c lib/ctdbd_conn.c
+         lib/ctdb_conn.c
           lib/id_cache.c
           lib/talloc_dict.c
           lib/serverid.c
@@ -572,6 +573,7 @@ SMBTORTURE_SRC1 = '''torture/torture.c torture/nbio.c torture/scanner.c torture/
                 torture/test_authinfo_structs.c
                 torture/test_smbsock_any_connect.c
                 torture/test_cleanup.c
+               torture/test_ctdbconn.c
                 torture/t_strappend.c'''
 
 SMBTORTURE_SRC = '''${SMBTORTURE_SRC1}