libctdb: first cut, supports getrecmaster only
authorRusty Russell <rusty@rustcorp.com.au>
Fri, 21 May 2010 02:37:41 +0000 (12:07 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Fri, 21 May 2010 02:37:41 +0000 (12:07 +0930)
This is a completely standalone library using only ctdb_protocol.h.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Header from folded patch 'libctdb-message-handling.patch':

libctdb: add message handling to libctdb.

Now clients can send and receive ctdb messages.

libctdb/control.c [new file with mode: 0644]
libctdb/ctdb.c [new file with mode: 0644]
libctdb/io_elem.c [new file with mode: 0644]
libctdb/io_elem.h [new file with mode: 0644]
libctdb/libctdb_private.h [new file with mode: 0644]
libctdb/local_tdb.c [new file with mode: 0644]
libctdb/local_tdb.h [new file with mode: 0644]
libctdb/messages.c [new file with mode: 0644]
libctdb/messages.h [new file with mode: 0644]
libctdb/sync.c [new file with mode: 0644]
libctdb/tst.c

diff --git a/libctdb/control.c b/libctdb/control.c
new file mode 100644 (file)
index 0000000..84d703e
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+   Misc control routines of libctdb
+
+   Copyright (C) Rusty Russell 2010
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <ctdb_protocol.h>
+#include "libctdb_private.h"
+
+struct ctdb_request *ctdb_getrecmaster_send(struct ctdb_connection *ctdb,
+                                   uint32_t destnode,
+                                   ctdb_getrecmaster_cb callback,
+                                   void *private_data)
+{
+       struct ctdb_request *req;
+
+       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER,
+                                      destnode, NULL, 0);
+       if (!req)
+               return NULL;
+       req->callback.getrecmaster = callback;
+       req->priv_data = private_data;
+       return req;
+}
+
+struct ctdb_request *
+ctdb_getpnn_send(struct ctdb_connection *ctdb,
+                uint32_t destnode,
+                ctdb_getpnn_cb callback,
+                void *private_data)
+{
+       struct ctdb_request *req;
+
+       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode,
+                                      NULL, 0);
+       if (!req) {
+               return NULL;
+       }
+       req->callback.getpnn = callback;
+       req->priv_data = private_data;
+       return req;
+}
diff --git a/libctdb/ctdb.c b/libctdb/ctdb.c
new file mode 100644 (file)
index 0000000..85875ac
--- /dev/null
@@ -0,0 +1,643 @@
+/*
+   core of libctdb
+
+   Copyright (C) Rusty Russell 2010
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <poll.h>
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include "libctdb_private.h"
+#include "io_elem.h"
+#include "local_tdb.h"
+#include "messages.h"
+#include <dlinklist.h>
+#include <ctdb_protocol.h>
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void close_noerr(int fd)
+{
+       int olderr = errno;
+       close(fd);
+       errno = olderr;
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void free_noerr(void *p)
+{
+       int olderr = errno;
+       free(p);
+       errno = olderr;
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void set_nonblocking(int fd)
+{
+       unsigned v;
+       v = fcntl(fd, F_GETFL, 0);
+        fcntl(fd, F_SETFL, v | O_NONBLOCK);
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void set_close_on_exec(int fd)
+{
+       unsigned v;
+       v = fcntl(fd, F_GETFD, 0);
+        fcntl(fd, F_SETFD, v | FD_CLOEXEC);
+}
+
+static void set_pnn(int32_t status, uint32_t pnn, void *private_data)
+{
+       if (status != 0) {
+               /* FIXME: Report error. */
+               ((struct ctdb_connection *)private_data)->broken = true;
+       } else {
+               ((struct ctdb_connection *)private_data)->pnn = pnn;
+       }
+}
+
+struct ctdb_connection *ctdb_connect(const char *addr)
+{
+       struct ctdb_connection *ctdb;
+       struct sockaddr_un sun;
+
+       ctdb = malloc(sizeof(*ctdb));
+       if (!ctdb)
+               goto fail;
+       ctdb->outq = NULL;
+       ctdb->doneq = NULL;
+       ctdb->immediateq = NULL;
+       ctdb->in = NULL;
+       ctdb->message_handlers = NULL;
+
+       memset(&sun, 0, sizeof(sun));
+       sun.sun_family = AF_UNIX;
+       if (!addr)
+               addr = CTDB_PATH;
+       strncpy(sun.sun_path, addr, sizeof(sun.sun_path));
+       ctdb->fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (ctdb->fd < 0)
+               goto free_fail;
+
+       set_nonblocking(ctdb->fd);
+       set_close_on_exec(ctdb->fd);
+
+       if (connect(ctdb->fd, (struct sockaddr *)&sun, sizeof(sun)) == -1)
+               goto close_fail;
+
+       /* Immediately queue a request to get our pnn. */
+       if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb))
+               goto close_fail;
+
+       return ctdb;
+
+close_fail:
+       close_noerr(ctdb->fd);
+free_fail:
+       free_noerr(ctdb);
+fail:
+       return NULL;
+}
+
+int ctdb_get_fd(struct ctdb_connection *ctdb)
+{
+       return ctdb->fd;
+}
+
+int ctdb_which_events(struct ctdb_connection *ctdb)
+{
+       int events = POLLIN;
+
+       if (ctdb->outq)
+               events |= POLLOUT;
+       return events;
+}
+
+struct ctdb_request *new_ctdb_request(size_t len)
+{
+       struct ctdb_request *req = malloc(sizeof(*req));
+       if (!req)
+               return NULL;
+       req->io = new_io_elem(len);
+       if (!req->io) {
+               free(req);
+               return NULL;
+       }
+       req->hdr.hdr = io_elem_data(req->io, NULL);
+       req->cancelled = false;
+       return req;
+}
+
+static struct ctdb_request *new_immediate_request(void)
+{
+       struct ctdb_request *req = malloc(sizeof(*req));
+       if (!req)
+               return NULL;
+       req->cancelled = false;
+       req->io = NULL;
+       req->hdr.hdr = NULL;
+       return req;
+}
+
+static void free_ctdb_request(struct ctdb_request *req)
+{
+       /* immediate requests don't have IO */
+       if (req->io) {
+               free_io_elem(req->io);
+       }
+       free(req);
+}
+
+static void handle_call_reply(struct ctdb_connection *ctdb,
+                             struct ctdb_req_header *hdr,
+                             struct ctdb_request *i)
+{
+       struct ctdb_req_call *call = i->hdr.call;
+       struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr;
+
+       switch (call->callid) {
+       case CTDB_NULL_FUNC:
+               /* FIXME: We should let it steal the request, rathe than copy */
+               i->callback.nullfunc(reply->status, reply, i->priv_data);
+               break;
+       }
+}
+
+static void handle_control_reply(struct ctdb_connection *ctdb,
+                                struct ctdb_req_header *hdr,
+                                struct ctdb_request *i)
+{
+       struct ctdb_req_control *control = i->hdr.control;
+       struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr;
+
+       switch (control->opcode) {
+       case CTDB_CONTROL_GET_RECMASTER:
+               i->callback.getrecmaster(0, reply->status, i->priv_data);
+               break;
+       case CTDB_CONTROL_GET_PNN:
+               i->callback.getpnn(0, reply->status, i->priv_data);
+               break;
+       case CTDB_CONTROL_REGISTER_SRVID:
+               i->callback.register_srvid(reply->status, i->priv_data);
+               break;
+       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+       case CTDB_CONTROL_DB_ATTACH:
+               i->callback.attachdb(reply->status, *(uint32_t *)reply->data,
+                                    i->priv_data);
+               break;
+       case CTDB_CONTROL_GETDBPATH:
+               i->callback.getdbpath(reply->status, (char *)reply->data,
+                                     i->priv_data);
+               break;
+       }
+}
+
+static void handle_incoming(struct ctdb_connection *ctdb,
+                           struct ctdb_req_header *hdr,
+                           size_t len /* FIXME: use len to check packet! */)
+{
+       struct ctdb_request *i;
+
+       if (hdr->operation == CTDB_REQ_MESSAGE) {
+               deliver_message(ctdb, hdr);
+               return;
+       }
+
+       if (hdr->operation != CTDB_REPLY_CALL
+           && hdr->operation != CTDB_REPLY_CONTROL) {
+               /* FIXME: report this error. */
+               return;
+       }
+
+       for (i = ctdb->doneq; i; i = i->next) {
+               if (i->hdr.hdr->reqid == hdr->reqid) {
+                       if (!i->cancelled) {
+                               if (hdr->operation == CTDB_REPLY_CALL)
+                                       handle_call_reply(ctdb, hdr, i);
+                               else
+                                       handle_control_reply(ctdb, hdr, i);
+                       }
+                       DLIST_REMOVE(ctdb->doneq, i);
+                       free_ctdb_request(i);
+                       return;
+               }
+       }
+       /* FIXME: report this error. */
+}
+
+/* Remove "harmless" errors. */
+static ssize_t real_error(ssize_t ret)
+{
+       if (ret < 0 && (errno == EINTR || errno == EWOULDBLOCK))
+               return 0;
+       return ret;
+}
+
+int ctdb_service(struct ctdb_connection *ctdb, int revents)
+{
+       if (ctdb->broken) {
+               return -1;
+       }
+
+       if (revents & POLLOUT) {
+               while (ctdb->outq) {
+                       if (real_error(write_io_elem(ctdb->fd,
+                                                    ctdb->outq->io)) < 0) {
+                               ctdb->broken = true;
+                               return -1;
+                       }
+                       if (io_elem_finished(ctdb->outq->io)) {
+                               struct ctdb_request *done = ctdb->outq;
+                               DLIST_REMOVE(ctdb->outq, done);
+                               if (done->cancelled) {
+                                       free_ctdb_request(done);
+                               } else {
+                                       DLIST_ADD_END(ctdb->doneq, done,
+                                                     struct ctdb_request);
+                               }
+                       }
+               }
+       }
+
+       while (revents & POLLIN) {
+               int ret;
+
+               if (!ctdb->in) {
+                       ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
+                       if (!ctdb->in) {
+                               ctdb->broken = true;
+                               return -1;
+                       }
+               }
+
+               ret = read_io_elem(ctdb->fd, ctdb->in);
+               if (real_error(ret) < 0 || ret == 0) {
+                       /* They closed fd? */
+                       if (ret == 0)
+                               errno = EBADF;
+                       ctdb->broken = true;
+                       return -1;
+               } else if (ret < 0) {
+                       /* No progress, stop loop. */
+                       revents = 0;
+               } else if (io_elem_finished(ctdb->in)) {
+                       struct ctdb_req_header *hdr;
+                       size_t len;
+
+                       hdr = io_elem_data(ctdb->in, &len);
+                       handle_incoming(ctdb, hdr, len);
+                       free_io_elem(ctdb->in);
+                       ctdb->in = NULL;
+               }
+       }
+
+       while (ctdb->immediateq) {
+               struct ctdb_request *imm = ctdb->immediateq;
+               /* This has to handle fake->cancelled internally. */
+               imm->callback.immediate(imm, imm->priv_data);
+               DLIST_REMOVE(ctdb->immediateq, imm);
+               free_ctdb_request(imm);
+       }
+
+       return 0;
+}
+
+/* This is inefficient.  We could pull in idtree.c. */
+static bool reqid_used(const struct ctdb_connection *ctdb, uint32_t reqid)
+{
+       struct ctdb_request *i;
+
+       for (i = ctdb->outq; i; i = i->next) {
+               if (i->hdr.hdr->reqid == reqid) {
+                       return true;
+               }
+       }
+       for (i = ctdb->doneq; i; i = i->next) {
+               if (i->hdr.hdr->reqid == reqid) {
+                       return true;
+               }
+       }
+       return false;
+}
+
+uint32_t new_reqid(struct ctdb_connection *ctdb)
+{
+       while (reqid_used(ctdb, ctdb->next_id)) {
+               ctdb->next_id++;
+       }
+       return ctdb->next_id++;
+}
+
+struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
+                                             uint32_t opcode,
+                                             uint32_t destnode,
+                                             const void *extra_data,
+                                             size_t extra)
+{
+       struct ctdb_request *req;
+       struct ctdb_req_control *pkt;
+
+       req = new_ctdb_request(sizeof(*pkt) + extra);
+       if (!req)
+               return NULL;
+
+       io_elem_init_req_header(req->io,
+                               CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
+
+       pkt = req->hdr.control;
+       pkt->opcode = opcode;
+       pkt->srvid = 0;
+       pkt->client_id = 0;
+       pkt->flags = 0;
+       pkt->datalen = extra;
+       memcpy(pkt->data, extra_data, extra);
+       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+       return req;
+}
+
+int ctdb_cancel(struct ctdb_request *req)
+{
+       /* FIXME: If it's not sent, we could just free it right now. */
+       req->cancelled = true;
+       return 0;
+}
+
+struct ctdb_db {
+       struct ctdb_connection *ctdb;
+       bool persistent;
+       uint32_t tdb_flags;
+       uint32_t id;
+       struct tdb_context *tdb;
+
+       ctdb_attachdb_cb callback;
+       void *private_data;
+};
+
+static void attachdb_getdbpath_done(int status, const char *path,
+                                   void *_db)
+{
+       struct ctdb_db *db = _db;
+       uint32_t tdb_flags = db->tdb_flags;
+
+       if (status != 0) {
+               db->callback(status, NULL, db->private_data);
+               free(db);
+               return;
+       }
+
+       tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC;
+       tdb_flags |= TDB_DISALLOW_NESTING;
+
+       db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0);
+       if (db->tdb == NULL) {
+               db->callback(-1, NULL, db->private_data);
+               free(db);
+               return;
+       }
+
+       /* Finally, we tell the client that we opened the db. */
+       db->callback(status, db, db->private_data);
+}
+
+static void attachdb_done(int status, uint32_t id, struct ctdb_db *db)
+{
+       struct ctdb_request *req;
+
+       if (status != 0) {
+               db->callback(status, NULL, db->private_data);
+               free(db);
+               return;
+       }
+       db->id = id;
+
+       /* Now we do another call, to get the dbpath. */
+       req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
+                                      CTDB_CURRENT_NODE, &id, sizeof(id));
+       if (!req) {
+               db->callback(-1, NULL, db->private_data);
+               free(db);
+               return;
+       }
+       req->callback.getdbpath = attachdb_getdbpath_done;
+       req->priv_data = db;
+}
+
+struct ctdb_request *
+ctdb_attachdb_send(struct ctdb_connection *ctdb,
+                  const char *name, int persistent, uint32_t tdb_flags,
+                  ctdb_attachdb_cb callback,
+                  void *private_data)
+{
+       struct ctdb_request *req;
+       struct ctdb_db *db;
+       uint32_t opcode;
+
+       /* FIXME: Search if db already open. */
+
+       db = malloc(sizeof(*db));
+       if (!db) {
+               return NULL;
+       }
+
+       if (persistent) {
+               opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+       } else {
+               opcode = CTDB_CONTROL_DB_ATTACH;
+       }
+
+       req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name,
+                                      strlen(name) + 1);
+       if (!req) {
+               free(db);
+               return NULL;
+       }
+
+       db->ctdb = ctdb;
+       db->tdb_flags = tdb_flags;
+       db->persistent = persistent;
+       db->callback = callback;
+       db->private_data = private_data;
+
+       req->callback.attachdb = attachdb_done;
+       req->priv_data = db;
+
+       /* Flags get overloaded into srvid. */
+       req->hdr.control->srvid = tdb_flags;
+       return req;
+}
+
+struct ctdb_lock {
+       struct ctdb_db *ctdb_db;
+       TDB_DATA key;
+       struct ctdb_ltdb_header *hdr;
+       TDB_DATA data;
+       bool held;
+       /* For convenience, we stash this here. */
+       ctdb_readrecordlock_cb callback;
+       void *private_data;
+};
+
+void ctdb_release_lock(struct ctdb_lock *lock)
+{
+       if (lock->held) {
+               tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+       }
+       free(lock->key.dptr);
+       free(lock->hdr); /* Also frees lock->data */
+       free(lock);
+}
+
+/* We keep the lock if local node is the dmaster. */
+static bool try_readrecordlock(struct ctdb_lock *lock)
+{
+       if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) {
+               return false;
+       }
+
+       lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb,
+                                    lock->key, &lock->data);
+       if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
+               lock->held = true;
+               return true;
+       }
+
+       tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+       free(lock->hdr);
+       return false;
+}
+
+static void readrecordlock_done(int, struct ctdb_reply_call *, void *);
+
+static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock)
+{
+       struct ctdb_request *req;
+       struct ctdb_req_call *pkt;
+
+       req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize);
+       if (!req)
+               return NULL;
+       req->callback.nullfunc = readrecordlock_done;
+       req->priv_data = lock;
+
+       io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
+                               new_reqid(lock->ctdb_db->ctdb));
+
+       pkt = req->hdr.call;
+       pkt->flags = CTDB_IMMEDIATE_MIGRATION;
+       pkt->db_id = lock->ctdb_db->id;
+       pkt->callid = CTDB_NULL_FUNC;
+       pkt->hopcount = 0;
+       pkt->keylen = lock->key.dsize;
+       pkt->calldatalen = 0;
+       memcpy(pkt->data, lock->key.dptr, lock->key.dsize);
+       DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request);
+       return req;
+}
+
+/* OK, let's try again... */
+static void readrecordlock_done(int status, struct ctdb_reply_call *reply,
+                               void *_lock)
+{
+       struct ctdb_lock *lock = _lock;
+
+       if (status != 0) {
+               lock->callback(status, NULL, tdb_null, lock->private_data);
+               ctdb_release_lock(lock);
+               return;
+       }
+
+       if (try_readrecordlock(lock)) {
+               lock->callback(0, lock, lock->data, lock->private_data);
+               return;
+       }
+
+       if (!new_readrecordlock_request(lock)) {
+               lock->callback(-1, NULL, tdb_null, lock->private_data);
+               ctdb_release_lock(lock);
+       }
+}
+
+static void lock_complete(struct ctdb_request *req, void *_lock)
+{
+       struct ctdb_lock *lock = _lock;
+
+       if (!req->cancelled) {
+               lock->callback(0, lock, lock->data, lock->private_data);
+       } else {
+               ctdb_release_lock(lock);
+       }
+}
+
+struct ctdb_request *
+ctdb_readrecordlock_send(struct ctdb_db *ctdb_db,
+                        TDB_DATA key,
+                        ctdb_readrecordlock_cb callback,
+                        void *private_data)
+{
+       struct ctdb_request *req;
+       struct ctdb_lock *lock;
+
+       lock = malloc(sizeof(*lock));
+       if (!lock)
+               return NULL;
+       lock->key.dptr = malloc(key.dsize);
+       if (!lock->key.dptr) {
+               free_noerr(lock);
+               return NULL;
+       }
+       memcpy(lock->key.dptr, key.dptr, key.dsize);
+       lock->key.dsize = key.dsize;
+       lock->ctdb_db = ctdb_db;
+       lock->callback = callback;
+       lock->private_data = private_data;
+       lock->hdr = NULL;
+       lock->held = false;
+
+       if (try_readrecordlock(lock)) {
+               /* We pretend to be async, so we just queue this. */
+               req = new_immediate_request();
+               if (!req) {
+                       ctdb_release_lock(lock);
+                       return NULL;
+               }
+               req->callback.immediate = lock_complete;
+               req->priv_data = lock;
+               DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq,
+                             req, struct ctdb_request);
+               return req;
+       }
+
+       req = new_readrecordlock_request(lock);
+       if (!req) {
+               ctdb_release_lock(lock);
+               return NULL;
+       }
+       return req;
+}
+
+int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
+{
+       if (lock->ctdb_db->persistent) {
+               /* FIXME: Report error. */
+               return -1;
+       }
+
+       return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data);
+}
diff --git a/libctdb/io_elem.c b/libctdb/io_elem.c
new file mode 100644 (file)
index 0000000..91e84cc
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+   Simple queuing of input and output records for libctdb
+
+   Copyright (C) Rusty Russell 2010
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <sys/types.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdlib.h>
+#include "io_elem.h"
+#include <tdb.h>
+#include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
+
+struct io_elem {
+       size_t len, off;
+       char *data;
+};
+
+struct io_elem *new_io_elem(size_t len)
+{
+       struct io_elem *elem;
+       len = (len + (CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
+       elem = malloc(sizeof(*elem));
+       if (!elem)
+               return NULL;
+       elem->data = malloc(len);
+       if (!elem->data) {
+               free(elem);
+               return NULL;
+       }
+
+       elem->len = len;
+       elem->off = 0;
+       return elem;
+}
+
+void free_io_elem(struct io_elem *io)
+{
+       free(io->data);
+       free(io);
+}
+
+bool io_elem_finished(const struct io_elem *io)
+{
+       return io->off == io->len;
+}
+
+void io_elem_init_req_header(struct io_elem *io,
+                            uint32_t operation,
+                            uint32_t destnode,
+                            uint32_t reqid)
+{
+       struct ctdb_req_header *hdr = io_elem_data(io, NULL);
+
+       hdr->length = io->len;
+       hdr->ctdb_magic = CTDB_MAGIC;
+       hdr->ctdb_version = CTDB_VERSION;
+       /* Generation and srcnode only used for inter-ctdbd communication. */
+       hdr->generation = 0;
+       hdr->destnode = destnode;
+       hdr->srcnode = 0;
+       hdr->operation = operation;
+       hdr->reqid = reqid;
+}
+
+/* Access to raw data: if len is non-NULL it is filled in. */
+void *io_elem_data(const struct io_elem *io, size_t *len)
+{
+       if (len)
+               *len = io->len;
+       return io->data;
+}
+
+/* Returns -1 if we hit an error.  Errno will be set. */
+int read_io_elem(int fd, struct io_elem *io)
+{
+       ssize_t ret;
+
+       ret = read(fd, io->data + io->off, io->len - io->off);
+       if (ret < 0)
+               return ret;
+
+       io->off += ret;
+       if (io_elem_finished(io)) {
+               struct ctdb_req_header *hdr = (void *)io->data;
+
+               /* Finished.  But maybe this was just header? */
+               if (io->len == sizeof(*hdr) && hdr->length > io->len) {
+                       int reret;
+                       /* Enlarge and re-read. */
+                       io->len = hdr->length;
+                       io->data = realloc(io->data, io->len);
+                       if (!io->data)
+                               return -1;
+                       /* Try reading again immediately. */
+                       reret = read_io_elem(fd, io);
+                       if (reret >= 0)
+                               reret += ret;
+                       return reret;
+               }
+       }
+       return ret;
+}
+
+/* Returns -1 if we hit an error.  Errno will be set. */
+int write_io_elem(int fd, struct io_elem *io)
+{
+       ssize_t ret;
+
+       ret = write(fd, io->data + io->off, io->len - io->off);
+       if (ret < 0)
+               return ret;
+
+       io->off += ret;
+       return ret;
+}
diff --git a/libctdb/io_elem.h b/libctdb/io_elem.h
new file mode 100644 (file)
index 0000000..5c234fe
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef _LIBCTDB_IO_ELEM_H
+#define _LIBCTDB_IO_ELEM_H
+#include <stdbool.h>
+
+/* Packets are of form: <u32 length><data>. */
+
+/* Create a new queue element of at least len bytes (for reading or writing).
+ * Len may be rounded up for alignment. */
+struct io_elem *new_io_elem(size_t len);
+
+/* Free a queue element. */
+void free_io_elem(struct io_elem *io);
+
+/* If finished, this returns the request header, otherwise NULL. */
+bool io_elem_finished(const struct io_elem *io);
+
+/* Access to raw data: if len is non-NULL it is filled in. */
+void *io_elem_data(const struct io_elem *io, size_t *len);
+
+/* Initialise the struct ctdb_req_header at the front of the I/O. */
+void io_elem_init_req_header(struct io_elem *io,
+                            uint32_t operation,
+                            uint32_t destnode,
+                            uint32_t reqid);
+
+/* Returns -1 if we hit an error.  Otherwise bytes read. */
+int read_io_elem(int fd, struct io_elem *io);
+
+/* Returns -1 if we hit an error.  Otherwise bytes written. */
+int write_io_elem(int fd, struct io_elem *io);
+
+#endif /* _LIBCTDB_IO_ELEM_H */
diff --git a/libctdb/libctdb_private.h b/libctdb/libctdb_private.h
new file mode 100644 (file)
index 0000000..6e789cf
--- /dev/null
@@ -0,0 +1,63 @@
+#ifndef _LIBCTDB_PRIVATE_H
+#define _LIBCTDB_PRIVATE_H
+#include <dlinklist.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <ctdb.h>
+
+struct message_handler_info;
+struct ctdb_reply_call;
+
+struct ctdb_request {
+       struct ctdb_request *next, *prev;
+       struct io_elem *io;
+       union {
+               struct ctdb_req_header *hdr;
+               struct ctdb_req_call *call;
+               struct ctdb_req_control *control;
+               struct ctdb_req_message *message;
+       } hdr;
+       bool cancelled;
+       union {
+               ctdb_getrecmaster_cb getrecmaster;
+               ctdb_getpnn_cb getpnn;
+               void (*register_srvid)(int, struct message_handler_info *);
+               void (*attachdb)(int, uint32_t id, struct ctdb_db *);
+               void (*getdbpath)(int, const char *, void *);
+               void (*nullfunc)(int, struct ctdb_reply_call *, void *);
+               void (*immediate)(struct ctdb_request *, void *);
+       } callback;
+       void *priv_data;
+};
+
+struct ctdb_connection {
+       /* Socket to ctdbd. */
+       int fd;
+       /* Currently our failure mode is simple; return -1 from ctdb_service */
+       bool broken;
+       /* Linked list of pending outgoings. */
+       struct ctdb_request *outq;
+       /* Finished outgoings (awaiting response) */
+       struct ctdb_request *doneq;
+       /* Successful sync requests, waiting for next service. */
+       struct ctdb_request *immediateq;
+       /* Current incoming. */
+       struct io_elem *in;
+       /* Guess at a good reqid to try next. */
+       uint32_t next_id;
+       /* List of messages */
+       struct message_handler_info *message_handlers;
+       /* PNN of this ctdb: valid by the time we do our first db connection. */
+       uint32_t pnn;
+};
+
+/* ctdb.c */
+struct ctdb_request *new_ctdb_request(size_t len);
+struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
+                                             uint32_t opcode,
+                                             uint32_t destnode,
+                                             const void *extra_data,
+                                             size_t extra);
+uint32_t new_reqid(struct ctdb_connection *ctdb);
+#endif /* _LIBCTDB_PRIVATE_H */
diff --git a/libctdb/local_tdb.c b/libctdb/local_tdb.c
new file mode 100644 (file)
index 0000000..dbf06ed
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+   libctdb local tdb access code
+
+   Copyright (C) Andrew Tridgell  2006
+   Copyright (C) Rusty Russell  2010
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <ctdb.h>
+#include <tdb.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <string.h>
+#include <ctdb_protocol.h> // For struct ctdb_ltdb_header
+#include "local_tdb.h"
+
+/*
+  fetch a record from the ltdb, separating out the header information
+  and returning the body of the record.  The caller should free() the
+  header when done, rather than the (optional) data->dptr.
+*/
+struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb,
+                                         TDB_DATA key, TDB_DATA *data)
+{
+       TDB_DATA rec;
+
+       rec = tdb_fetch(tdb, key);
+       if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
+               free(rec.dptr);
+               return NULL;
+       }
+
+       if (data) {
+               data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+               data->dptr = rec.dptr + sizeof(struct ctdb_ltdb_header);
+       }
+       return (struct ctdb_ltdb_header *)rec.dptr;
+}
+
+
+/*
+  write a record to a normal database
+*/
+int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key,
+                    struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+       TDB_DATA rec;
+       int ret;
+       bool seqnum_suppressed = false;
+
+       rec.dsize = sizeof(*header) + data.dsize;
+       rec.dptr = malloc(rec.dsize);
+       if (!rec.dptr) {
+               return -1;
+       }
+
+       memcpy(rec.dptr, header, sizeof(*header));
+       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+       /* Databases with seqnum updates enabled only get their seqnum
+          changes when/if we modify the data */
+       if (tdb_get_flags(tdb) & TDB_SEQNUM) {
+               TDB_DATA old;
+               old = tdb_fetch(tdb, key);
+
+               if ( (old.dsize == rec.dsize)
+               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+                       tdb_remove_flags(tdb, TDB_SEQNUM);
+                       seqnum_suppressed = true;
+               }
+               free(old.dptr);
+       }
+       ret = tdb_store(tdb, key, rec, TDB_REPLACE);
+       if (seqnum_suppressed) {
+               tdb_add_flags(tdb, TDB_SEQNUM);
+       }
+       free(rec.dptr);
+
+       return ret;
+}
diff --git a/libctdb/local_tdb.h b/libctdb/local_tdb.h
new file mode 100644 (file)
index 0000000..549db7d
--- /dev/null
@@ -0,0 +1,10 @@
+#ifndef _LIBCTDB_LOCAL_TDB_H
+#define _LIBCTDB_LOCAL_TDB_H
+
+struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb,
+                                         TDB_DATA key, TDB_DATA *data);
+
+int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key,
+                    struct ctdb_ltdb_header *header, TDB_DATA data);
+
+#endif /* _LIBCTDB_LOCAL_TDB_H */
diff --git a/libctdb/messages.c b/libctdb/messages.c
new file mode 100644 (file)
index 0000000..6ec3541
--- /dev/null
@@ -0,0 +1,110 @@
+#include "libctdb_private.h"
+#include "messages.h"
+#include "io_elem.h"
+#include <ctdb.h>
+#include <tdb.h>
+#include <ctdb_protocol.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct message_handler_info {
+       struct message_handler_info *next, *prev;
+       /* Callback when we're first registered. */
+       ctdb_set_message_handler_cb callback;
+
+       uint64_t srvid;
+       ctdb_message_fn_t handler;
+       void *private_data;
+       struct ctdb_connection *ctdb;
+};
+
+void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
+{
+       struct message_handler_info *i;
+       struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
+       TDB_DATA data;
+
+       data.dptr = msg->data;
+       data.dsize = msg->datalen;
+
+       for (i = ctdb->message_handlers; i; i = i->next) {
+               if (i->srvid == msg->srvid) {
+                       i->handler(ctdb, msg->srvid, data, i->private_data);
+               }
+       }
+       /* FIXME: Report unknown messages */
+}
+
+static void set_message_handler(int status, struct message_handler_info *info)
+{
+       /* If registration failed, tell callback and clean up */
+       if (status < 0) {
+               info->callback(status, info->private_data);
+               free(info);
+               return;
+       } else {
+               /* Put ourselves in list of handlers. */
+               DLIST_ADD_END(info->ctdb->message_handlers, info,
+                             struct message_handler_info);
+               /* Now call callback: it could remove us in theory. */
+               info->callback(status, info->private_data);
+       }
+}
+
+struct ctdb_request *
+ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
+                             ctdb_set_message_handler_cb callback,
+                             ctdb_message_fn_t handler, void *private_data)
+{
+       struct ctdb_request *req;
+       struct message_handler_info *info;
+
+       info = malloc(sizeof(*info));
+       if (!info) {
+               return NULL;
+       }
+       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
+                                      CTDB_CURRENT_NODE, NULL, 0);
+       if (!req) {
+               free(info);
+               return NULL;
+       }
+       req->hdr.control->srvid = srvid;
+
+       info->srvid = srvid;
+       info->handler = handler;
+       info->callback = callback;
+       info->private_data = private_data;
+       info->ctdb = ctdb;
+
+       req->callback.register_srvid = set_message_handler;
+       req->priv_data = info;
+
+       return req;
+}
+
+int ctdb_send_message(struct ctdb_connection *ctdb,
+                     uint32_t pnn, uint64_t srvid,
+                     TDB_DATA data)
+{
+       struct ctdb_request *req;
+       struct ctdb_req_message *pkt;
+
+       req = new_ctdb_request(sizeof(*pkt) + data.dsize);
+       if (!req) {
+               return -1;
+       }
+
+       io_elem_init_req_header(req->io,
+                               CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
+
+       /* There's no reply to this, so we mark it cancelled immediately. */
+       req->cancelled = true;
+
+       pkt = req->hdr.message;
+       pkt->srvid = srvid;
+       pkt->datalen = data.dsize;
+       memcpy(pkt->data, data.dptr, data.dsize);
+       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+       return 0;
+}
diff --git a/libctdb/messages.h b/libctdb/messages.h
new file mode 100644 (file)
index 0000000..dcf19c8
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef _LIBCTDB_MESSAGE_H
+#define _LIBCTDB_MESSAGE_H
+struct message_handler_info;
+struct ctdb_connection;
+struct ctdb_req_header;
+
+void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr);
+#endif /* _LIBCTDB_MESSAGE_H */
diff --git a/libctdb/sync.c b/libctdb/sync.c
new file mode 100644 (file)
index 0000000..8c4892d
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+   synchronous wrappers for libctdb
+
+   Copyright (C) Rusty Russell 2010
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <stdbool.h>
+#include <poll.h>
+#include <errno.h>
+#include <stdlib.h>
+
+/* FIXME: Find a way to share more code here. */
+struct ctdb_getrecmaster {
+       bool done;
+       int status;
+       uint32_t *recmaster;
+};
+
+static bool ctdb_service_flush(struct ctdb_connection *ctdb)
+{
+       struct pollfd fds;
+
+       fds.fd = ctdb_get_fd(ctdb);
+       fds.events = ctdb_which_events(ctdb);
+       if (poll(&fds, 1, -1) < 0) {
+               /* Signalled is OK, other error is bad. */
+               return errno == EINTR;
+       }
+       return ctdb_service(ctdb, fds.revents) >= 0;
+}
+
+static void getrecmaster_done(int status, uint32_t recmaster, void *priv_data)
+{
+       struct ctdb_getrecmaster *grm = priv_data;
+       *grm->recmaster = recmaster;
+       grm->status = status;
+       grm->done = true;
+}
+
+int ctdb_getrecmaster(struct ctdb_connection *ctdb,
+                     uint32_t destnode, uint32_t *recmaster)
+{
+       struct ctdb_request *req;
+       struct ctdb_getrecmaster grm;
+
+       grm.done = false;
+       grm.recmaster = recmaster;
+       req = ctdb_getrecmaster_send(ctdb, destnode, getrecmaster_done, &grm);
+       if (!req)
+               return -1;
+
+       while (!grm.done) {
+               if (!ctdb_service_flush(ctdb)) {
+                       ctdb_cancel(req);
+                       return -1;
+               }
+       }
+       return grm.status;
+}
+
+struct ctdb_attachdb {
+       bool done;
+       int status;
+       struct ctdb_db *ctdb_db;
+};
+
+static void attachdb_sync_done(int status,
+                              struct ctdb_db *ctdb_db, void *private_data)
+{
+       struct ctdb_attachdb *atb = private_data;
+       atb->ctdb_db = ctdb_db;
+       atb->status = status;
+       atb->done = true;
+}
+
+struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb,
+                             const char *name, int persistent,
+                             uint32_t tdb_flags)
+{
+       struct ctdb_request *req;
+       struct ctdb_attachdb atb;
+
+       atb.done = false;
+       req = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags,
+                                attachdb_sync_done, &atb);
+       if (!req)
+               return NULL;
+
+       while (!atb.done) {
+               if (!ctdb_service_flush(ctdb)) {
+                       ctdb_cancel(req);
+                       return NULL;
+               }
+       }
+       if (atb.status != 0)
+               return NULL;
+       return atb.ctdb_db;
+}
index 8b51f99846917b432ed00d23943d073f73f39e80..ab4b3c7075472b2b608742302a788124d872a115 100644 (file)
@@ -4,6 +4,7 @@
 #include <fcntl.h>
 #include <stdlib.h>
 #include <err.h>
+#include <stdbool.h>
 #include "lib/tdb/include/tdb.h"
 #include "include/ctdb.h"
 
@@ -12,9 +13,11 @@ void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *pr
        printf("Message received on port %d : %s\n", (int)srvid, data.dptr);
 }
 
+static bool registered = false;
 void message_handler_cb(int status, void *private_data)
 {
        printf("Message handler registered: %i\n", status);
+       registered = true;
 }
 
 void rm_cb(int status, uint32_t recmaster, void *private_data)
@@ -42,6 +45,11 @@ int main(int argc, char *argv[])
                exit(10);
        }
 
+       /* Hack for testing: this makes sure registration goes out. */
+       while (!registered) {
+               ctdb_service(ctdb_connection, POLLIN|POLLOUT);
+       }
+
        msg.dptr="HelloWorld";
        msg.dsize = strlen(msg.dptr);