This is a completely standalone library using only ctdb_protocol.h.
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
Header from folded patch 'libctdb-message-handling.patch':
libctdb: add message handling to libctdb.
Now clients can send and receive ctdb messages.
--- /dev/null
+/*
+ Misc control routines of libctdb
+
+ Copyright (C) Rusty Russell 2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <ctdb_protocol.h>
+#include "libctdb_private.h"
+
+struct ctdb_request *ctdb_getrecmaster_send(struct ctdb_connection *ctdb,
+ uint32_t destnode,
+ ctdb_getrecmaster_cb callback,
+ void *private_data)
+{
+ struct ctdb_request *req;
+
+ req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER,
+ destnode, NULL, 0);
+ if (!req)
+ return NULL;
+ req->callback.getrecmaster = callback;
+ req->priv_data = private_data;
+ return req;
+}
+
+struct ctdb_request *
+ctdb_getpnn_send(struct ctdb_connection *ctdb,
+ uint32_t destnode,
+ ctdb_getpnn_cb callback,
+ void *private_data)
+{
+ struct ctdb_request *req;
+
+ req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode,
+ NULL, 0);
+ if (!req) {
+ return NULL;
+ }
+ req->callback.getpnn = callback;
+ req->priv_data = private_data;
+ return req;
+}
--- /dev/null
+/*
+ core of libctdb
+
+ Copyright (C) Rusty Russell 2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <poll.h>
+#include <errno.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include "libctdb_private.h"
+#include "io_elem.h"
+#include "local_tdb.h"
+#include "messages.h"
+#include <dlinklist.h>
+#include <ctdb_protocol.h>
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void close_noerr(int fd)
+{
+ int olderr = errno;
+ close(fd);
+ errno = olderr;
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void free_noerr(void *p)
+{
+ int olderr = errno;
+ free(p);
+ errno = olderr;
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void set_nonblocking(int fd)
+{
+ unsigned v;
+ v = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, v | O_NONBLOCK);
+}
+
+/* FIXME: Could be in shared util code with rest of ctdb */
+static void set_close_on_exec(int fd)
+{
+ unsigned v;
+ v = fcntl(fd, F_GETFD, 0);
+ fcntl(fd, F_SETFD, v | FD_CLOEXEC);
+}
+
+static void set_pnn(int32_t status, uint32_t pnn, void *private_data)
+{
+ if (status != 0) {
+ /* FIXME: Report error. */
+ ((struct ctdb_connection *)private_data)->broken = true;
+ } else {
+ ((struct ctdb_connection *)private_data)->pnn = pnn;
+ }
+}
+
+struct ctdb_connection *ctdb_connect(const char *addr)
+{
+ struct ctdb_connection *ctdb;
+ struct sockaddr_un sun;
+
+ ctdb = malloc(sizeof(*ctdb));
+ if (!ctdb)
+ goto fail;
+ ctdb->outq = NULL;
+ ctdb->doneq = NULL;
+ ctdb->immediateq = NULL;
+ ctdb->in = NULL;
+ ctdb->message_handlers = NULL;
+
+ memset(&sun, 0, sizeof(sun));
+ sun.sun_family = AF_UNIX;
+ if (!addr)
+ addr = CTDB_PATH;
+ strncpy(sun.sun_path, addr, sizeof(sun.sun_path));
+ ctdb->fd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (ctdb->fd < 0)
+ goto free_fail;
+
+ set_nonblocking(ctdb->fd);
+ set_close_on_exec(ctdb->fd);
+
+ if (connect(ctdb->fd, (struct sockaddr *)&sun, sizeof(sun)) == -1)
+ goto close_fail;
+
+ /* Immediately queue a request to get our pnn. */
+ if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb))
+ goto close_fail;
+
+ return ctdb;
+
+close_fail:
+ close_noerr(ctdb->fd);
+free_fail:
+ free_noerr(ctdb);
+fail:
+ return NULL;
+}
+
+int ctdb_get_fd(struct ctdb_connection *ctdb)
+{
+ return ctdb->fd;
+}
+
+int ctdb_which_events(struct ctdb_connection *ctdb)
+{
+ int events = POLLIN;
+
+ if (ctdb->outq)
+ events |= POLLOUT;
+ return events;
+}
+
+struct ctdb_request *new_ctdb_request(size_t len)
+{
+ struct ctdb_request *req = malloc(sizeof(*req));
+ if (!req)
+ return NULL;
+ req->io = new_io_elem(len);
+ if (!req->io) {
+ free(req);
+ return NULL;
+ }
+ req->hdr.hdr = io_elem_data(req->io, NULL);
+ req->cancelled = false;
+ return req;
+}
+
+static struct ctdb_request *new_immediate_request(void)
+{
+ struct ctdb_request *req = malloc(sizeof(*req));
+ if (!req)
+ return NULL;
+ req->cancelled = false;
+ req->io = NULL;
+ req->hdr.hdr = NULL;
+ return req;
+}
+
+static void free_ctdb_request(struct ctdb_request *req)
+{
+ /* immediate requests don't have IO */
+ if (req->io) {
+ free_io_elem(req->io);
+ }
+ free(req);
+}
+
+static void handle_call_reply(struct ctdb_connection *ctdb,
+ struct ctdb_req_header *hdr,
+ struct ctdb_request *i)
+{
+ struct ctdb_req_call *call = i->hdr.call;
+ struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr;
+
+ switch (call->callid) {
+ case CTDB_NULL_FUNC:
+ /* FIXME: We should let it steal the request, rathe than copy */
+ i->callback.nullfunc(reply->status, reply, i->priv_data);
+ break;
+ }
+}
+
+static void handle_control_reply(struct ctdb_connection *ctdb,
+ struct ctdb_req_header *hdr,
+ struct ctdb_request *i)
+{
+ struct ctdb_req_control *control = i->hdr.control;
+ struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr;
+
+ switch (control->opcode) {
+ case CTDB_CONTROL_GET_RECMASTER:
+ i->callback.getrecmaster(0, reply->status, i->priv_data);
+ break;
+ case CTDB_CONTROL_GET_PNN:
+ i->callback.getpnn(0, reply->status, i->priv_data);
+ break;
+ case CTDB_CONTROL_REGISTER_SRVID:
+ i->callback.register_srvid(reply->status, i->priv_data);
+ break;
+ case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+ case CTDB_CONTROL_DB_ATTACH:
+ i->callback.attachdb(reply->status, *(uint32_t *)reply->data,
+ i->priv_data);
+ break;
+ case CTDB_CONTROL_GETDBPATH:
+ i->callback.getdbpath(reply->status, (char *)reply->data,
+ i->priv_data);
+ break;
+ }
+}
+
+static void handle_incoming(struct ctdb_connection *ctdb,
+ struct ctdb_req_header *hdr,
+ size_t len /* FIXME: use len to check packet! */)
+{
+ struct ctdb_request *i;
+
+ if (hdr->operation == CTDB_REQ_MESSAGE) {
+ deliver_message(ctdb, hdr);
+ return;
+ }
+
+ if (hdr->operation != CTDB_REPLY_CALL
+ && hdr->operation != CTDB_REPLY_CONTROL) {
+ /* FIXME: report this error. */
+ return;
+ }
+
+ for (i = ctdb->doneq; i; i = i->next) {
+ if (i->hdr.hdr->reqid == hdr->reqid) {
+ if (!i->cancelled) {
+ if (hdr->operation == CTDB_REPLY_CALL)
+ handle_call_reply(ctdb, hdr, i);
+ else
+ handle_control_reply(ctdb, hdr, i);
+ }
+ DLIST_REMOVE(ctdb->doneq, i);
+ free_ctdb_request(i);
+ return;
+ }
+ }
+ /* FIXME: report this error. */
+}
+
+/* Remove "harmless" errors. */
+static ssize_t real_error(ssize_t ret)
+{
+ if (ret < 0 && (errno == EINTR || errno == EWOULDBLOCK))
+ return 0;
+ return ret;
+}
+
+int ctdb_service(struct ctdb_connection *ctdb, int revents)
+{
+ if (ctdb->broken) {
+ return -1;
+ }
+
+ if (revents & POLLOUT) {
+ while (ctdb->outq) {
+ if (real_error(write_io_elem(ctdb->fd,
+ ctdb->outq->io)) < 0) {
+ ctdb->broken = true;
+ return -1;
+ }
+ if (io_elem_finished(ctdb->outq->io)) {
+ struct ctdb_request *done = ctdb->outq;
+ DLIST_REMOVE(ctdb->outq, done);
+ if (done->cancelled) {
+ free_ctdb_request(done);
+ } else {
+ DLIST_ADD_END(ctdb->doneq, done,
+ struct ctdb_request);
+ }
+ }
+ }
+ }
+
+ while (revents & POLLIN) {
+ int ret;
+
+ if (!ctdb->in) {
+ ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
+ if (!ctdb->in) {
+ ctdb->broken = true;
+ return -1;
+ }
+ }
+
+ ret = read_io_elem(ctdb->fd, ctdb->in);
+ if (real_error(ret) < 0 || ret == 0) {
+ /* They closed fd? */
+ if (ret == 0)
+ errno = EBADF;
+ ctdb->broken = true;
+ return -1;
+ } else if (ret < 0) {
+ /* No progress, stop loop. */
+ revents = 0;
+ } else if (io_elem_finished(ctdb->in)) {
+ struct ctdb_req_header *hdr;
+ size_t len;
+
+ hdr = io_elem_data(ctdb->in, &len);
+ handle_incoming(ctdb, hdr, len);
+ free_io_elem(ctdb->in);
+ ctdb->in = NULL;
+ }
+ }
+
+ while (ctdb->immediateq) {
+ struct ctdb_request *imm = ctdb->immediateq;
+ /* This has to handle fake->cancelled internally. */
+ imm->callback.immediate(imm, imm->priv_data);
+ DLIST_REMOVE(ctdb->immediateq, imm);
+ free_ctdb_request(imm);
+ }
+
+ return 0;
+}
+
+/* This is inefficient. We could pull in idtree.c. */
+static bool reqid_used(const struct ctdb_connection *ctdb, uint32_t reqid)
+{
+ struct ctdb_request *i;
+
+ for (i = ctdb->outq; i; i = i->next) {
+ if (i->hdr.hdr->reqid == reqid) {
+ return true;
+ }
+ }
+ for (i = ctdb->doneq; i; i = i->next) {
+ if (i->hdr.hdr->reqid == reqid) {
+ return true;
+ }
+ }
+ return false;
+}
+
+uint32_t new_reqid(struct ctdb_connection *ctdb)
+{
+ while (reqid_used(ctdb, ctdb->next_id)) {
+ ctdb->next_id++;
+ }
+ return ctdb->next_id++;
+}
+
+struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
+ uint32_t opcode,
+ uint32_t destnode,
+ const void *extra_data,
+ size_t extra)
+{
+ struct ctdb_request *req;
+ struct ctdb_req_control *pkt;
+
+ req = new_ctdb_request(sizeof(*pkt) + extra);
+ if (!req)
+ return NULL;
+
+ io_elem_init_req_header(req->io,
+ CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
+
+ pkt = req->hdr.control;
+ pkt->opcode = opcode;
+ pkt->srvid = 0;
+ pkt->client_id = 0;
+ pkt->flags = 0;
+ pkt->datalen = extra;
+ memcpy(pkt->data, extra_data, extra);
+ DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+ return req;
+}
+
+int ctdb_cancel(struct ctdb_request *req)
+{
+ /* FIXME: If it's not sent, we could just free it right now. */
+ req->cancelled = true;
+ return 0;
+}
+
+struct ctdb_db {
+ struct ctdb_connection *ctdb;
+ bool persistent;
+ uint32_t tdb_flags;
+ uint32_t id;
+ struct tdb_context *tdb;
+
+ ctdb_attachdb_cb callback;
+ void *private_data;
+};
+
+static void attachdb_getdbpath_done(int status, const char *path,
+ void *_db)
+{
+ struct ctdb_db *db = _db;
+ uint32_t tdb_flags = db->tdb_flags;
+
+ if (status != 0) {
+ db->callback(status, NULL, db->private_data);
+ free(db);
+ return;
+ }
+
+ tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC;
+ tdb_flags |= TDB_DISALLOW_NESTING;
+
+ db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0);
+ if (db->tdb == NULL) {
+ db->callback(-1, NULL, db->private_data);
+ free(db);
+ return;
+ }
+
+ /* Finally, we tell the client that we opened the db. */
+ db->callback(status, db, db->private_data);
+}
+
+static void attachdb_done(int status, uint32_t id, struct ctdb_db *db)
+{
+ struct ctdb_request *req;
+
+ if (status != 0) {
+ db->callback(status, NULL, db->private_data);
+ free(db);
+ return;
+ }
+ db->id = id;
+
+ /* Now we do another call, to get the dbpath. */
+ req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
+ CTDB_CURRENT_NODE, &id, sizeof(id));
+ if (!req) {
+ db->callback(-1, NULL, db->private_data);
+ free(db);
+ return;
+ }
+ req->callback.getdbpath = attachdb_getdbpath_done;
+ req->priv_data = db;
+}
+
+struct ctdb_request *
+ctdb_attachdb_send(struct ctdb_connection *ctdb,
+ const char *name, int persistent, uint32_t tdb_flags,
+ ctdb_attachdb_cb callback,
+ void *private_data)
+{
+ struct ctdb_request *req;
+ struct ctdb_db *db;
+ uint32_t opcode;
+
+ /* FIXME: Search if db already open. */
+
+ db = malloc(sizeof(*db));
+ if (!db) {
+ return NULL;
+ }
+
+ if (persistent) {
+ opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+ } else {
+ opcode = CTDB_CONTROL_DB_ATTACH;
+ }
+
+ req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name,
+ strlen(name) + 1);
+ if (!req) {
+ free(db);
+ return NULL;
+ }
+
+ db->ctdb = ctdb;
+ db->tdb_flags = tdb_flags;
+ db->persistent = persistent;
+ db->callback = callback;
+ db->private_data = private_data;
+
+ req->callback.attachdb = attachdb_done;
+ req->priv_data = db;
+
+ /* Flags get overloaded into srvid. */
+ req->hdr.control->srvid = tdb_flags;
+ return req;
+}
+
+struct ctdb_lock {
+ struct ctdb_db *ctdb_db;
+ TDB_DATA key;
+ struct ctdb_ltdb_header *hdr;
+ TDB_DATA data;
+ bool held;
+ /* For convenience, we stash this here. */
+ ctdb_readrecordlock_cb callback;
+ void *private_data;
+};
+
+void ctdb_release_lock(struct ctdb_lock *lock)
+{
+ if (lock->held) {
+ tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+ }
+ free(lock->key.dptr);
+ free(lock->hdr); /* Also frees lock->data */
+ free(lock);
+}
+
+/* We keep the lock if local node is the dmaster. */
+static bool try_readrecordlock(struct ctdb_lock *lock)
+{
+ if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) {
+ return false;
+ }
+
+ lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb,
+ lock->key, &lock->data);
+ if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
+ lock->held = true;
+ return true;
+ }
+
+ tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+ free(lock->hdr);
+ return false;
+}
+
+static void readrecordlock_done(int, struct ctdb_reply_call *, void *);
+
+static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock)
+{
+ struct ctdb_request *req;
+ struct ctdb_req_call *pkt;
+
+ req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize);
+ if (!req)
+ return NULL;
+ req->callback.nullfunc = readrecordlock_done;
+ req->priv_data = lock;
+
+ io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
+ new_reqid(lock->ctdb_db->ctdb));
+
+ pkt = req->hdr.call;
+ pkt->flags = CTDB_IMMEDIATE_MIGRATION;
+ pkt->db_id = lock->ctdb_db->id;
+ pkt->callid = CTDB_NULL_FUNC;
+ pkt->hopcount = 0;
+ pkt->keylen = lock->key.dsize;
+ pkt->calldatalen = 0;
+ memcpy(pkt->data, lock->key.dptr, lock->key.dsize);
+ DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request);
+ return req;
+}
+
+/* OK, let's try again... */
+static void readrecordlock_done(int status, struct ctdb_reply_call *reply,
+ void *_lock)
+{
+ struct ctdb_lock *lock = _lock;
+
+ if (status != 0) {
+ lock->callback(status, NULL, tdb_null, lock->private_data);
+ ctdb_release_lock(lock);
+ return;
+ }
+
+ if (try_readrecordlock(lock)) {
+ lock->callback(0, lock, lock->data, lock->private_data);
+ return;
+ }
+
+ if (!new_readrecordlock_request(lock)) {
+ lock->callback(-1, NULL, tdb_null, lock->private_data);
+ ctdb_release_lock(lock);
+ }
+}
+
+static void lock_complete(struct ctdb_request *req, void *_lock)
+{
+ struct ctdb_lock *lock = _lock;
+
+ if (!req->cancelled) {
+ lock->callback(0, lock, lock->data, lock->private_data);
+ } else {
+ ctdb_release_lock(lock);
+ }
+}
+
+struct ctdb_request *
+ctdb_readrecordlock_send(struct ctdb_db *ctdb_db,
+ TDB_DATA key,
+ ctdb_readrecordlock_cb callback,
+ void *private_data)
+{
+ struct ctdb_request *req;
+ struct ctdb_lock *lock;
+
+ lock = malloc(sizeof(*lock));
+ if (!lock)
+ return NULL;
+ lock->key.dptr = malloc(key.dsize);
+ if (!lock->key.dptr) {
+ free_noerr(lock);
+ return NULL;
+ }
+ memcpy(lock->key.dptr, key.dptr, key.dsize);
+ lock->key.dsize = key.dsize;
+ lock->ctdb_db = ctdb_db;
+ lock->callback = callback;
+ lock->private_data = private_data;
+ lock->hdr = NULL;
+ lock->held = false;
+
+ if (try_readrecordlock(lock)) {
+ /* We pretend to be async, so we just queue this. */
+ req = new_immediate_request();
+ if (!req) {
+ ctdb_release_lock(lock);
+ return NULL;
+ }
+ req->callback.immediate = lock_complete;
+ req->priv_data = lock;
+ DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq,
+ req, struct ctdb_request);
+ return req;
+ }
+
+ req = new_readrecordlock_request(lock);
+ if (!req) {
+ ctdb_release_lock(lock);
+ return NULL;
+ }
+ return req;
+}
+
+int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
+{
+ if (lock->ctdb_db->persistent) {
+ /* FIXME: Report error. */
+ return -1;
+ }
+
+ return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data);
+}
--- /dev/null
+/*
+ Simple queuing of input and output records for libctdb
+
+ Copyright (C) Rusty Russell 2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <sys/types.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdlib.h>
+#include "io_elem.h"
+#include <tdb.h>
+#include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
+
+struct io_elem {
+ size_t len, off;
+ char *data;
+};
+
+struct io_elem *new_io_elem(size_t len)
+{
+ struct io_elem *elem;
+ len = (len + (CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
+ elem = malloc(sizeof(*elem));
+ if (!elem)
+ return NULL;
+ elem->data = malloc(len);
+ if (!elem->data) {
+ free(elem);
+ return NULL;
+ }
+
+ elem->len = len;
+ elem->off = 0;
+ return elem;
+}
+
+void free_io_elem(struct io_elem *io)
+{
+ free(io->data);
+ free(io);
+}
+
+bool io_elem_finished(const struct io_elem *io)
+{
+ return io->off == io->len;
+}
+
+void io_elem_init_req_header(struct io_elem *io,
+ uint32_t operation,
+ uint32_t destnode,
+ uint32_t reqid)
+{
+ struct ctdb_req_header *hdr = io_elem_data(io, NULL);
+
+ hdr->length = io->len;
+ hdr->ctdb_magic = CTDB_MAGIC;
+ hdr->ctdb_version = CTDB_VERSION;
+ /* Generation and srcnode only used for inter-ctdbd communication. */
+ hdr->generation = 0;
+ hdr->destnode = destnode;
+ hdr->srcnode = 0;
+ hdr->operation = operation;
+ hdr->reqid = reqid;
+}
+
+/* Access to raw data: if len is non-NULL it is filled in. */
+void *io_elem_data(const struct io_elem *io, size_t *len)
+{
+ if (len)
+ *len = io->len;
+ return io->data;
+}
+
+/* Returns -1 if we hit an error. Errno will be set. */
+int read_io_elem(int fd, struct io_elem *io)
+{
+ ssize_t ret;
+
+ ret = read(fd, io->data + io->off, io->len - io->off);
+ if (ret < 0)
+ return ret;
+
+ io->off += ret;
+ if (io_elem_finished(io)) {
+ struct ctdb_req_header *hdr = (void *)io->data;
+
+ /* Finished. But maybe this was just header? */
+ if (io->len == sizeof(*hdr) && hdr->length > io->len) {
+ int reret;
+ /* Enlarge and re-read. */
+ io->len = hdr->length;
+ io->data = realloc(io->data, io->len);
+ if (!io->data)
+ return -1;
+ /* Try reading again immediately. */
+ reret = read_io_elem(fd, io);
+ if (reret >= 0)
+ reret += ret;
+ return reret;
+ }
+ }
+ return ret;
+}
+
+/* Returns -1 if we hit an error. Errno will be set. */
+int write_io_elem(int fd, struct io_elem *io)
+{
+ ssize_t ret;
+
+ ret = write(fd, io->data + io->off, io->len - io->off);
+ if (ret < 0)
+ return ret;
+
+ io->off += ret;
+ return ret;
+}
--- /dev/null
+#ifndef _LIBCTDB_IO_ELEM_H
+#define _LIBCTDB_IO_ELEM_H
+#include <stdbool.h>
+
+/* Packets are of form: <u32 length><data>. */
+
+/* Create a new queue element of at least len bytes (for reading or writing).
+ * Len may be rounded up for alignment. */
+struct io_elem *new_io_elem(size_t len);
+
+/* Free a queue element. */
+void free_io_elem(struct io_elem *io);
+
+/* If finished, this returns the request header, otherwise NULL. */
+bool io_elem_finished(const struct io_elem *io);
+
+/* Access to raw data: if len is non-NULL it is filled in. */
+void *io_elem_data(const struct io_elem *io, size_t *len);
+
+/* Initialise the struct ctdb_req_header at the front of the I/O. */
+void io_elem_init_req_header(struct io_elem *io,
+ uint32_t operation,
+ uint32_t destnode,
+ uint32_t reqid);
+
+/* Returns -1 if we hit an error. Otherwise bytes read. */
+int read_io_elem(int fd, struct io_elem *io);
+
+/* Returns -1 if we hit an error. Otherwise bytes written. */
+int write_io_elem(int fd, struct io_elem *io);
+
+#endif /* _LIBCTDB_IO_ELEM_H */
--- /dev/null
+#ifndef _LIBCTDB_PRIVATE_H
+#define _LIBCTDB_PRIVATE_H
+#include <dlinklist.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <ctdb.h>
+
+struct message_handler_info;
+struct ctdb_reply_call;
+
+struct ctdb_request {
+ struct ctdb_request *next, *prev;
+ struct io_elem *io;
+ union {
+ struct ctdb_req_header *hdr;
+ struct ctdb_req_call *call;
+ struct ctdb_req_control *control;
+ struct ctdb_req_message *message;
+ } hdr;
+ bool cancelled;
+ union {
+ ctdb_getrecmaster_cb getrecmaster;
+ ctdb_getpnn_cb getpnn;
+ void (*register_srvid)(int, struct message_handler_info *);
+ void (*attachdb)(int, uint32_t id, struct ctdb_db *);
+ void (*getdbpath)(int, const char *, void *);
+ void (*nullfunc)(int, struct ctdb_reply_call *, void *);
+ void (*immediate)(struct ctdb_request *, void *);
+ } callback;
+ void *priv_data;
+};
+
+struct ctdb_connection {
+ /* Socket to ctdbd. */
+ int fd;
+ /* Currently our failure mode is simple; return -1 from ctdb_service */
+ bool broken;
+ /* Linked list of pending outgoings. */
+ struct ctdb_request *outq;
+ /* Finished outgoings (awaiting response) */
+ struct ctdb_request *doneq;
+ /* Successful sync requests, waiting for next service. */
+ struct ctdb_request *immediateq;
+ /* Current incoming. */
+ struct io_elem *in;
+ /* Guess at a good reqid to try next. */
+ uint32_t next_id;
+ /* List of messages */
+ struct message_handler_info *message_handlers;
+ /* PNN of this ctdb: valid by the time we do our first db connection. */
+ uint32_t pnn;
+};
+
+/* ctdb.c */
+struct ctdb_request *new_ctdb_request(size_t len);
+struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
+ uint32_t opcode,
+ uint32_t destnode,
+ const void *extra_data,
+ size_t extra);
+uint32_t new_reqid(struct ctdb_connection *ctdb);
+#endif /* _LIBCTDB_PRIVATE_H */
--- /dev/null
+/*
+ libctdb local tdb access code
+
+ Copyright (C) Andrew Tridgell 2006
+ Copyright (C) Rusty Russell 2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <ctdb.h>
+#include <tdb.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <string.h>
+#include <ctdb_protocol.h> // For struct ctdb_ltdb_header
+#include "local_tdb.h"
+
+/*
+ fetch a record from the ltdb, separating out the header information
+ and returning the body of the record. The caller should free() the
+ header when done, rather than the (optional) data->dptr.
+*/
+struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA *data)
+{
+ TDB_DATA rec;
+
+ rec = tdb_fetch(tdb, key);
+ if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
+ free(rec.dptr);
+ return NULL;
+ }
+
+ if (data) {
+ data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = rec.dptr + sizeof(struct ctdb_ltdb_header);
+ }
+ return (struct ctdb_ltdb_header *)rec.dptr;
+}
+
+
+/*
+ write a record to a normal database
+*/
+int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ TDB_DATA rec;
+ int ret;
+ bool seqnum_suppressed = false;
+
+ rec.dsize = sizeof(*header) + data.dsize;
+ rec.dptr = malloc(rec.dsize);
+ if (!rec.dptr) {
+ return -1;
+ }
+
+ memcpy(rec.dptr, header, sizeof(*header));
+ memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+ /* Databases with seqnum updates enabled only get their seqnum
+ changes when/if we modify the data */
+ if (tdb_get_flags(tdb) & TDB_SEQNUM) {
+ TDB_DATA old;
+ old = tdb_fetch(tdb, key);
+
+ if ( (old.dsize == rec.dsize)
+ && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+ rec.dptr+sizeof(struct ctdb_ltdb_header),
+ rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+ tdb_remove_flags(tdb, TDB_SEQNUM);
+ seqnum_suppressed = true;
+ }
+ free(old.dptr);
+ }
+ ret = tdb_store(tdb, key, rec, TDB_REPLACE);
+ if (seqnum_suppressed) {
+ tdb_add_flags(tdb, TDB_SEQNUM);
+ }
+ free(rec.dptr);
+
+ return ret;
+}
--- /dev/null
+#ifndef _LIBCTDB_LOCAL_TDB_H
+#define _LIBCTDB_LOCAL_TDB_H
+
+struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb,
+ TDB_DATA key, TDB_DATA *data);
+
+int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data);
+
+#endif /* _LIBCTDB_LOCAL_TDB_H */
--- /dev/null
+#include "libctdb_private.h"
+#include "messages.h"
+#include "io_elem.h"
+#include <ctdb.h>
+#include <tdb.h>
+#include <ctdb_protocol.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct message_handler_info {
+ struct message_handler_info *next, *prev;
+ /* Callback when we're first registered. */
+ ctdb_set_message_handler_cb callback;
+
+ uint64_t srvid;
+ ctdb_message_fn_t handler;
+ void *private_data;
+ struct ctdb_connection *ctdb;
+};
+
+void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
+{
+ struct message_handler_info *i;
+ struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
+ TDB_DATA data;
+
+ data.dptr = msg->data;
+ data.dsize = msg->datalen;
+
+ for (i = ctdb->message_handlers; i; i = i->next) {
+ if (i->srvid == msg->srvid) {
+ i->handler(ctdb, msg->srvid, data, i->private_data);
+ }
+ }
+ /* FIXME: Report unknown messages */
+}
+
+static void set_message_handler(int status, struct message_handler_info *info)
+{
+ /* If registration failed, tell callback and clean up */
+ if (status < 0) {
+ info->callback(status, info->private_data);
+ free(info);
+ return;
+ } else {
+ /* Put ourselves in list of handlers. */
+ DLIST_ADD_END(info->ctdb->message_handlers, info,
+ struct message_handler_info);
+ /* Now call callback: it could remove us in theory. */
+ info->callback(status, info->private_data);
+ }
+}
+
+struct ctdb_request *
+ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
+ ctdb_set_message_handler_cb callback,
+ ctdb_message_fn_t handler, void *private_data)
+{
+ struct ctdb_request *req;
+ struct message_handler_info *info;
+
+ info = malloc(sizeof(*info));
+ if (!info) {
+ return NULL;
+ }
+ req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
+ CTDB_CURRENT_NODE, NULL, 0);
+ if (!req) {
+ free(info);
+ return NULL;
+ }
+ req->hdr.control->srvid = srvid;
+
+ info->srvid = srvid;
+ info->handler = handler;
+ info->callback = callback;
+ info->private_data = private_data;
+ info->ctdb = ctdb;
+
+ req->callback.register_srvid = set_message_handler;
+ req->priv_data = info;
+
+ return req;
+}
+
+int ctdb_send_message(struct ctdb_connection *ctdb,
+ uint32_t pnn, uint64_t srvid,
+ TDB_DATA data)
+{
+ struct ctdb_request *req;
+ struct ctdb_req_message *pkt;
+
+ req = new_ctdb_request(sizeof(*pkt) + data.dsize);
+ if (!req) {
+ return -1;
+ }
+
+ io_elem_init_req_header(req->io,
+ CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
+
+ /* There's no reply to this, so we mark it cancelled immediately. */
+ req->cancelled = true;
+
+ pkt = req->hdr.message;
+ pkt->srvid = srvid;
+ pkt->datalen = data.dsize;
+ memcpy(pkt->data, data.dptr, data.dsize);
+ DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+ return 0;
+}
--- /dev/null
+#ifndef _LIBCTDB_MESSAGE_H
+#define _LIBCTDB_MESSAGE_H
+struct message_handler_info;
+struct ctdb_connection;
+struct ctdb_req_header;
+
+void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr);
+#endif /* _LIBCTDB_MESSAGE_H */
--- /dev/null
+/*
+ synchronous wrappers for libctdb
+
+ Copyright (C) Rusty Russell 2010
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+#include <ctdb.h>
+#include <stdbool.h>
+#include <poll.h>
+#include <errno.h>
+#include <stdlib.h>
+
+/* FIXME: Find a way to share more code here. */
+struct ctdb_getrecmaster {
+ bool done;
+ int status;
+ uint32_t *recmaster;
+};
+
+static bool ctdb_service_flush(struct ctdb_connection *ctdb)
+{
+ struct pollfd fds;
+
+ fds.fd = ctdb_get_fd(ctdb);
+ fds.events = ctdb_which_events(ctdb);
+ if (poll(&fds, 1, -1) < 0) {
+ /* Signalled is OK, other error is bad. */
+ return errno == EINTR;
+ }
+ return ctdb_service(ctdb, fds.revents) >= 0;
+}
+
+static void getrecmaster_done(int status, uint32_t recmaster, void *priv_data)
+{
+ struct ctdb_getrecmaster *grm = priv_data;
+ *grm->recmaster = recmaster;
+ grm->status = status;
+ grm->done = true;
+}
+
+int ctdb_getrecmaster(struct ctdb_connection *ctdb,
+ uint32_t destnode, uint32_t *recmaster)
+{
+ struct ctdb_request *req;
+ struct ctdb_getrecmaster grm;
+
+ grm.done = false;
+ grm.recmaster = recmaster;
+ req = ctdb_getrecmaster_send(ctdb, destnode, getrecmaster_done, &grm);
+ if (!req)
+ return -1;
+
+ while (!grm.done) {
+ if (!ctdb_service_flush(ctdb)) {
+ ctdb_cancel(req);
+ return -1;
+ }
+ }
+ return grm.status;
+}
+
+struct ctdb_attachdb {
+ bool done;
+ int status;
+ struct ctdb_db *ctdb_db;
+};
+
+static void attachdb_sync_done(int status,
+ struct ctdb_db *ctdb_db, void *private_data)
+{
+ struct ctdb_attachdb *atb = private_data;
+ atb->ctdb_db = ctdb_db;
+ atb->status = status;
+ atb->done = true;
+}
+
+struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb,
+ const char *name, int persistent,
+ uint32_t tdb_flags)
+{
+ struct ctdb_request *req;
+ struct ctdb_attachdb atb;
+
+ atb.done = false;
+ req = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags,
+ attachdb_sync_done, &atb);
+ if (!req)
+ return NULL;
+
+ while (!atb.done) {
+ if (!ctdb_service_flush(ctdb)) {
+ ctdb_cancel(req);
+ return NULL;
+ }
+ }
+ if (atb.status != 0)
+ return NULL;
+ return atb.ctdb_db;
+}
#include <fcntl.h>
#include <stdlib.h>
#include <err.h>
+#include <stdbool.h>
#include "lib/tdb/include/tdb.h"
#include "include/ctdb.h"
printf("Message received on port %d : %s\n", (int)srvid, data.dptr);
}
+static bool registered = false;
void message_handler_cb(int status, void *private_data)
{
printf("Message handler registered: %i\n", status);
+ registered = true;
}
void rm_cb(int status, uint32_t recmaster, void *private_data)
exit(10);
}
+ /* Hack for testing: this makes sure registration goes out. */
+ while (!registered) {
+ ctdb_service(ctdb_connection, POLLIN|POLLOUT);
+ }
+
msg.dptr="HelloWorld";
msg.dsize = strlen(msg.dptr);