UTIL_OBJ = lib/util/idtree.o lib/util/db_wrap.o lib/util/strlist.o lib/util/util.o
-CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o \
- common/ctdb_io.o common/util.o common/ctdb_util.o \
- common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_lockwait.o \
- common/ctdb_message.o common/cmdline.o common/ctdb_control.o \
- lib/util/debug.o common/ctdb_recover.o common/ctdb_recoverd.o \
- common/ctdb_freeze.o common/ctdb_traverse.o common/ctdb_monitor.o \
- common/ctdb_tunables.o
-
-CTDB_TAKEOVER_OBJ = takeover/system.o takeover/ctdb_takeover.o
+CTDB_COMMON_OBJ = common/ctdb_io.o common/util.o common/ctdb_util.o \
+ common/ctdb_ltdb.o common/ctdb_message.o common/cmdline.o \
+ lib/util/debug.o
CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o
-CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TAKEOVER_OBJ) $(CTDB_TCP_OBJ) $(POPT_OBJ)
+CTDB_CLIENT_OBJ = client/ctdb_client.o \
+ $(CTDB_COMMON_OBJ) $(POPT_OBJ) $(UTIL_OBJ) @TALLOC_OBJ@ @TDB_OBJ@ @LIBREPLACEOBJ@ \
+ $(EXTRA_OBJ) @EVENTS_OBJ@
+
+CTDB_TAKEOVER_OBJ = takeover/system.o takeover/ctdb_takeover.o
-OBJS = @TDB_OBJ@ @TALLOC_OBJ@ @LIBREPLACEOBJ@ @INFINIBAND_WRAPPER_OBJ@ $(EXTRA_OBJ) @EVENTS_OBJ@ $(CTDB_OBJ) $(UTIL_OBJ)
+CTDB_SERVER_OBJ = server/ctdbd.o server/ctdb_daemon.o server/ctdb_lockwait.o server/ctdb_recoverd.o \
+ server/ctdb_recover.o server/ctdb_freeze.o server/ctdb_tunables.o server/ctdb_monitor.o \
+ server/ctdb.o server/ctdb_control.o server/ctdb_call.o server/ctdb_ltdb_server.o \
+ server/ctdb_traverse.o $(CTDB_CLIENT_OBJ) \
+ $(CTDB_TAKEOVER_OBJ) $(CTDB_TCP_OBJ) @INFINIBAND_WRAPPER_OBJ@
TEST_BINS=bin/ctdb_bench bin/ctdb_fetch @INFINIBAND_BINS@
BINS = bin/ctdb
DIRS = lib bin
-all: showflags dirs $(OBJS) $(BINS) $(SBINS) $(TEST_BINS)
+all: showflags dirs $(CTDB_SERVER_OBJ) $(CTDB_CLIENT_OBJ) $(BINS) $(SBINS) $(TEST_BINS)
showflags:
@echo 'ctdb will be compiled with flags:'
dirs:
@mkdir -p $(DIRS)
-bin/ctdbd: $(OBJS) direct/ctdbd.o
+bin/ctdbd: $(CTDB_SERVER_OBJ)
@echo Linking $@
- @$(CC) $(CFLAGS) -o $@ direct/ctdbd.o $(OBJS) $(LIB_FLAGS)
+ @$(CC) $(CFLAGS) -o $@ $(CTDB_SERVER_OBJ) $(LIB_FLAGS)
bin/ctdb: $(OBJS) tools/ctdb_control.o
@echo Linking $@
- @$(CC) $(CFLAGS) -o $@ tools/ctdb_control.o $(OBJS) $(LIB_FLAGS)
+ @$(CC) $(CFLAGS) -o $@ tools/ctdb_control.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
-bin/ctdb_bench: $(OBJS) tests/ctdb_bench.o
+bin/ctdb_bench: $(CTDB_CLIENT_OBJ) tests/ctdb_bench.o
@echo Linking $@
- @$(CC) $(CFLAGS) -o $@ tests/ctdb_bench.o $(OBJS) $(LIB_FLAGS)
+ @$(CC) $(CFLAGS) -o $@ tests/ctdb_bench.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
-bin/ctdb_fetch: $(OBJS) tests/ctdb_fetch.o
+bin/ctdb_fetch: $(CTDB_CLIENT_OBJ) tests/ctdb_fetch.o
@echo Linking $@
- @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o $(OBJS) $(LIB_FLAGS)
+ @$(CC) $(CFLAGS) -o $@ tests/ctdb_fetch.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
-bin/ibwrapper_test: $(OBJS) ib/ibwrapper_test.o
+bin/ibwrapper_test: $(CTDB_CLIENT_OBJ) ib/ibwrapper_test.o
@echo Linking $@
- @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(OBJS) $(LIB_FLAGS)
+ @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
clean:
rm -f *.o */*.o */*/*.o
#include "../include/ctdb.h"
#include "../include/ctdb_private.h"
+/*
+ allocate a packet for use in client<->daemon communication
+ */
+struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx,
+ enum ctdb_operation operation,
+ size_t length, size_t slength,
+ const char *type)
+{
+ int size;
+ struct ctdb_req_header *hdr;
+
+ length = MAX(length, slength);
+ size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
+
+ hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
+ if (hdr == NULL) {
+ DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
+ operation, (unsigned)length));
+ return NULL;
+ }
+ talloc_set_name_const(hdr, type);
+ memset(hdr, 0, slength);
+ hdr->length = length;
+ hdr->operation = operation;
+ hdr->ctdb_magic = CTDB_MAGIC;
+ hdr->ctdb_version = CTDB_VERSION;
+ hdr->srcnode = ctdb->vnn;
+ if (ctdb->vnn_map) {
+ hdr->generation = ctdb->vnn_map->generation;
+ }
+
+ return hdr;
+}
+
+/*
+ local version of ctdb_call
+*/
+int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
+ struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
+ TDB_DATA *data, uint32_t caller)
+{
+ struct ctdb_call_info *c;
+ struct ctdb_registered_call *fn;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+ c = talloc(ctdb, struct ctdb_call_info);
+ CTDB_NO_MEMORY(ctdb, c);
+
+ c->key = call->key;
+ c->call_data = &call->call_data;
+ c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
+ c->record_data.dsize = data->dsize;
+ CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
+ c->new_data = NULL;
+ c->reply_data = NULL;
+ c->status = 0;
+
+ for (fn=ctdb_db->calls;fn;fn=fn->next) {
+ if (fn->id == call->call_id) break;
+ }
+ if (fn == NULL) {
+ ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
+ talloc_free(c);
+ return -1;
+ }
+
+ if (fn->fn(c) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
+ talloc_free(c);
+ return -1;
+ }
+
+ if (header->laccessor != caller) {
+ header->lacount = 0;
+ }
+ header->laccessor = caller;
+ header->lacount++;
+
+ /* we need to force the record to be written out if this was a remote access,
+ so that the lacount is updated */
+ if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
+ c->new_data = &c->record_data;
+ }
+
+ if (c->new_data) {
+ /* XXX check that we always have the lock here? */
+ if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
+ ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
+ talloc_free(c);
+ return -1;
+ }
+ }
+
+ if (c->reply_data) {
+ call->reply_data = *c->reply_data;
+ talloc_steal(ctdb, call->reply_data.dptr);
+ talloc_set_name_const(call->reply_data.dptr, __location__);
+ } else {
+ call->reply_data.dptr = NULL;
+ call->reply_data.dsize = 0;
+ }
+ call->status = c->status;
+
+ talloc_free(c);
+
+ return 0;
+}
+
+
/*
queue a packet for sending from client to daemon
*/
talloc_free(outdata.dptr);
return 0;
}
+
+
+/*
+ initialise the ctdb daemon for client applications
+
+ NOTE: In current code the daemon does not fork. This is for testing purposes only
+ and to simplify the code.
+*/
+struct ctdb_context *ctdb_init(struct event_context *ev)
+{
+ struct ctdb_context *ctdb;
+
+ ctdb = talloc_zero(ev, struct ctdb_context);
+ ctdb->ev = ev;
+
+ return ctdb;
+}
+
+
+/*
+ set some ctdb flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
+{
+ ctdb->flags |= flags;
+}
+
+/*
+ setup the local socket name
+*/
+int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
+{
+ ctdb->daemon.name = talloc_strdup(ctdb, socketname);
+ return 0;
+}
+
+/*
+ return the vnn of this node
+*/
+uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
+{
+ return ctdb->vnn;
+}
+
}
-/*
- this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
- return 0;
-}
-
-/*
- this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
- call->reply_data = &call->record_data;
- return 0;
-}
-
-
/*
return the lmaster given a key
*/
}
return ret;
}
-
-struct lock_fetch_state {
- struct ctdb_context *ctdb;
- void (*recv_pkt)(void *, struct ctdb_req_header *);
- void *recv_context;
- struct ctdb_req_header *hdr;
- uint32_t generation;
- bool ignore_generation;
-};
-
-/*
- called when we should retry the operation
- */
-static void lock_fetch_callback(void *p)
-{
- struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
- if (!state->ignore_generation &&
- state->generation != state->ctdb->vnn_map->generation) {
- DEBUG(0,("Discarding previous generation lockwait packet\n"));
- talloc_free(state->hdr);
- return;
- }
- state->recv_pkt(state->recv_context, state->hdr);
- DEBUG(2,(__location__ " PACKET REQUEUED\n"));
-}
-
-
-/*
- do a non-blocking ltdb_lock, deferring this ctdb request until we
- have the chainlock
-
- It does the following:
-
- 1) tries to get the chainlock. If it succeeds, then it returns 0
-
- 2) if it fails to get a chainlock immediately then it sets up a
- non-blocking chainlock via ctdb_lockwait, and when it gets the
- chainlock it re-submits this ctdb request to the main packet
- receive function
-
- This effectively queues all ctdb requests that cannot be
- immediately satisfied until it can get the lock. This means that
- the main ctdb daemon will not block waiting for a chainlock held by
- a client
-
- There are 3 possible return values:
-
- 0: means that it got the lock immediately.
- -1: means that it failed to get the lock, and won't retry
- -2: means that it failed to get the lock immediately, but will retry
- */
-int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_req_header *hdr,
- void (*recv_pkt)(void *, struct ctdb_req_header *),
- void *recv_context, bool ignore_generation)
-{
- int ret;
- struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- struct lockwait_handle *h;
- struct lock_fetch_state *state;
-
- ret = tdb_chainlock_nonblock(tdb, key);
-
- if (ret != 0 &&
- !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
- /* a hard failure - don't try again */
- return -1;
- }
-
- /* when torturing, ensure we test the contended path */
- if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
- random() % 5 == 0) {
- ret = -1;
- tdb_chainunlock(tdb, key);
- }
-
- /* first the non-contended path */
- if (ret == 0) {
- return 0;
- }
-
- state = talloc(hdr, struct lock_fetch_state);
- state->ctdb = ctdb_db->ctdb;
- state->hdr = hdr;
- state->recv_pkt = recv_pkt;
- state->recv_context = recv_context;
- state->generation = ctdb_db->ctdb->vnn_map->generation;
- state->ignore_generation = ignore_generation;
-
- /* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
- if (h == NULL) {
- tdb_chainunlock(tdb, key);
- return -1;
- }
-
- /* we need to move the packet off the temporary context in ctdb_input_pkt(),
- so it won't be freed yet */
- talloc_steal(state, hdr);
- talloc_steal(state, h);
-
- /* now tell the caller than we will retry asynchronously */
- return -2;
-}
-
-/*
- a varient of ctdb_ltdb_lock_requeue that also fetches the record
- */
-int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_ltdb_header *header,
- struct ctdb_req_header *hdr, TDB_DATA *data,
- void (*recv_pkt)(void *, struct ctdb_req_header *),
- void *recv_context, bool ignore_generation)
-{
- int ret;
-
- ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
- recv_context, ignore_generation);
- if (ret == 0) {
- ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, key);
- }
- }
- return ret;
-}
-
-
-/*
- paraoid check to see if the db is empty
- */
-static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
-{
- struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- int count = tdb_traverse_read(tdb, NULL, NULL);
- if (count != 0) {
- DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
- ctdb_db->db_path));
- ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
- }
-}
-
-/*
- a client has asked to attach a new database
- */
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
- TDB_DATA *outdata)
-{
- const char *db_name = (const char *)indata.dptr;
- struct ctdb_db_context *ctdb_db, *tmp_db;
- int ret;
-
- /* see if we already have this name */
- for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
- if (strcmp(db_name, tmp_db->db_name) == 0) {
- /* this is not an error */
- outdata->dptr = (uint8_t *)&tmp_db->db_id;
- outdata->dsize = sizeof(tmp_db->db_id);
- return 0;
- }
- }
-
- ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
- CTDB_NO_MEMORY(ctdb, ctdb_db);
-
- ctdb_db->ctdb = ctdb;
- ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
- CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
-
- ctdb_db->db_id = ctdb_hash(&indata);
-
- outdata->dptr = (uint8_t *)&ctdb_db->db_id;
- outdata->dsize = sizeof(ctdb_db->db_id);
-
- /* check for hash collisions */
- for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
- if (tmp_db->db_id == ctdb_db->db_id) {
- DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
- tmp_db->db_id, db_name, tmp_db->db_name));
- talloc_free(ctdb_db);
- return -1;
- }
- }
-
- if (ctdb->db_directory == NULL) {
- ctdb->db_directory = VARDIR "/ctdb";
- }
-
- /* make sure the db directory exists */
- if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
- DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
- ctdb->db_directory));
- talloc_free(ctdb_db);
- return -1;
- }
-
- /* open the database */
- ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
- ctdb->db_directory,
- db_name, ctdb->vnn);
-
- ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
- TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
- if (ctdb_db->ltdb == NULL) {
- DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
- talloc_free(ctdb_db);
- return -1;
- }
-
- ctdb_check_db_empty(ctdb_db);
-
- DLIST_ADD(ctdb->db_list, ctdb_db);
-
- /*
- all databases support the "null" function. we need this in
- order to do forced migration of records
- */
- ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
- if (ret != 0) {
- DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
- talloc_free(ctdb_db);
- return -1;
- }
-
- /*
- all databases support the "fetch" function. we need this
- for efficient Samba3 ctdb fetch
- */
- ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
- if (ret != 0) {
- DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
- talloc_free(ctdb_db);
- return -1;
- }
-
- /* tell all the other nodes about this database */
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
- CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
- indata, NULL, NULL);
-
- DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
-
- /* success */
- return 0;
-}
-
-/*
- called when a broadcast seqnum update comes in
- */
-int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
-{
- struct ctdb_db_context *ctdb_db;
- if (srcnode == ctdb->vnn) {
- /* don't update ourselves! */
- return 0;
- }
-
- ctdb_db = find_ctdb_db(ctdb, db_id);
- if (!ctdb_db) {
- DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
- return -1;
- }
-
- tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
- ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
- return 0;
-}
-
-/*
- timer to check for seqnum changes in a ltdb and propogate them
- */
-static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *p)
-{
- struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
- if (new_seqnum != ctdb_db->seqnum) {
- /* something has changed - propogate it */
- TDB_DATA data;
- data.dptr = (uint8_t *)&ctdb_db->db_id;
- data.dsize = sizeof(uint32_t);
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
- CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
- data, NULL, NULL);
- }
- ctdb_db->seqnum = new_seqnum;
-
- /* setup a new timer */
- ctdb_db->te =
- event_add_timed(ctdb->ev, ctdb_db,
- timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
- ctdb_ltdb_seqnum_check, ctdb_db);
-}
-
-/*
- enable seqnum handling on this db
- */
-int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
-{
- struct ctdb_db_context *ctdb_db;
- ctdb_db = find_ctdb_db(ctdb, db_id);
- if (!ctdb_db) {
- DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
- return -1;
- }
-
- if (ctdb_db->te == NULL) {
- ctdb_db->te =
- event_add_timed(ctdb->ev, ctdb_db,
- timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
- ctdb_ltdb_seqnum_check, ctdb_db);
- }
-
- tdb_enable_seqnum(ctdb_db->ltdb->tdb);
- ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
- return 0;
-}
-
ctdb_dispatch_message(ctdb, c->srvid, data);
}
-/*
- this local messaging handler is ugly, but is needed to prevent
- recursion in ctdb_send_message() when the destination node is the
- same as the source node
- */
-struct ctdb_local_message {
- struct ctdb_context *ctdb;
- uint64_t srvid;
- TDB_DATA data;
-};
-
-static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct ctdb_local_message *m = talloc_get_type(private_data,
- struct ctdb_local_message);
- int res;
-
- res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
- if (res != 0) {
- DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n",
- (unsigned long long)m->srvid));
- }
- talloc_free(m);
-}
-
-static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
-{
- struct ctdb_local_message *m;
- m = talloc(ctdb, struct ctdb_local_message);
- CTDB_NO_MEMORY(ctdb, m);
-
- m->ctdb = ctdb;
- m->srvid = srvid;
- m->data = data;
- m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
- if (m->data.dptr == NULL) {
- talloc_free(m);
- return -1;
- }
-
- /* this needs to be done as an event to prevent recursion */
- event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
- return 0;
-}
-
-/*
- send a ctdb message
-*/
-int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
- uint64_t srvid, TDB_DATA data)
-{
- struct ctdb_req_message *r;
- int len;
-
- /* see if this is a message to ourselves */
- if (vnn == ctdb->vnn) {
- return ctdb_local_message(ctdb, srvid, data);
- }
-
- len = offsetof(struct ctdb_req_message, data) + data.dsize;
- r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
- struct ctdb_req_message);
- CTDB_NO_MEMORY(ctdb, r);
-
- r->hdr.destnode = vnn;
- r->srvid = srvid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(r);
- return 0;
-}
-
/*
when a client goes away, we need to remove its srvid handler from the list
}
-/*
- set some ctdb flags
-*/
-void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
-{
- ctdb->flags |= flags;
-}
-
/*
set the directory for the local databases
*/
}
-/*
- setup the local socket name
-*/
-int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
-{
- ctdb->daemon.name = talloc_strdup(ctdb, socketname);
- return 0;
-}
-/*
- return the vnn of this node
-*/
-uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
-{
- return ctdb->vnn;
-}
-
/*
return the number of enabled nodes
*/
}
-/*
- called by the transport layer when a packet comes in
-*/
-static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
-{
- struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
-
- ctdb->statistics.node_packets_recv++;
-
- /* up the counter for this source node, so we know its alive */
- if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
- /* as a special case, redirected calls don't increment the rx_cnt */
- if (hdr->operation != CTDB_REQ_CALL ||
- ((struct ctdb_req_call *)hdr)->hopcount == 0) {
- ctdb->nodes[hdr->srcnode]->rx_cnt++;
- }
- }
-
- ctdb_input_pkt(ctdb, hdr);
-}
-
-
/*
called by the transport layer when a node is dead
*/
}
-static const struct ctdb_upcalls ctdb_upcalls = {
- .recv_pkt = ctdb_recv_pkt,
- .node_dead = ctdb_node_dead,
- .node_connected = ctdb_node_connected
-};
-
-/*
- initialise the ctdb daemon.
-
- NOTE: In current code the daemon does not fork. This is for testing purposes only
- and to simplify the code.
-*/
-struct ctdb_context *ctdb_init(struct event_context *ev)
-{
- struct ctdb_context *ctdb;
-
- ctdb = talloc_zero(ev, struct ctdb_context);
- ctdb->ev = ev;
- ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
- ctdb->recovery_master = (uint32_t)-1;
- ctdb->upcalls = &ctdb_upcalls;
- ctdb->idr = idr_init(ctdb);
- ctdb->recovery_lock_fd = -1;
- ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
-
- ctdb_tunables_set_defaults(ctdb);
-
- return ctdb;
-}
-
}
-/*
- local version of ctdb_call
-*/
-int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
- struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
- TDB_DATA *data, uint32_t caller)
-{
- struct ctdb_call_info *c;
- struct ctdb_registered_call *fn;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
-
- c = talloc(ctdb, struct ctdb_call_info);
- CTDB_NO_MEMORY(ctdb, c);
-
- c->key = call->key;
- c->call_data = &call->call_data;
- c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
- c->record_data.dsize = data->dsize;
- CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
- c->new_data = NULL;
- c->reply_data = NULL;
- c->status = 0;
-
- for (fn=ctdb_db->calls;fn;fn=fn->next) {
- if (fn->id == call->call_id) break;
- }
- if (fn == NULL) {
- ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
- talloc_free(c);
- return -1;
- }
-
- if (fn->fn(c) != 0) {
- ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
- talloc_free(c);
- return -1;
- }
-
- if (header->laccessor != caller) {
- header->lacount = 0;
- }
- header->laccessor = caller;
- header->lacount++;
-
- /* we need to force the record to be written out if this was a remote access,
- so that the lacount is updated */
- if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
- c->new_data = &c->record_data;
- }
-
- if (c->new_data) {
- /* XXX check that we always have the lock here? */
- if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
- ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
- talloc_free(c);
- return -1;
- }
- }
-
- if (c->reply_data) {
- call->reply_data = *c->reply_data;
- talloc_steal(ctdb, call->reply_data.dptr);
- talloc_set_name_const(call->reply_data.dptr, __location__);
- } else {
- call->reply_data.dptr = NULL;
- call->reply_data.dsize = 0;
- }
- call->status = c->status;
-
- talloc_free(c);
-
- return 0;
-}
-
/*
send an error reply
*/
return 0;
}
-/*
- allocate a packet for use in client<->daemon communication
- */
-struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
- TALLOC_CTX *mem_ctx,
- enum ctdb_operation operation,
- size_t length, size_t slength,
- const char *type)
-{
- int size;
- struct ctdb_req_header *hdr;
-
- length = MAX(length, slength);
- size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
-
- hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
- if (hdr == NULL) {
- DEBUG(0,("Unable to allocate packet for operation %u of length %u\n",
- operation, (unsigned)length));
- return NULL;
- }
- talloc_set_name_const(hdr, type);
- memset(hdr, 0, slength);
- hdr->length = length;
- hdr->operation = operation;
- hdr->ctdb_magic = CTDB_MAGIC;
- hdr->ctdb_version = CTDB_VERSION;
- hdr->srcnode = ctdb->vnn;
- if (ctdb->vnn_map) {
- hdr->generation = ctdb->vnn_map->generation;
- }
-
- return hdr;
-}
-
-
/*
allocate a packet for use in daemon<->daemon communication
*/
DLIST_ADD(ctdb_db->calls, call);
return 0;
}
+
+
+
+/*
+ this local messaging handler is ugly, but is needed to prevent
+ recursion in ctdb_send_message() when the destination node is the
+ same as the source node
+ */
+struct ctdb_local_message {
+ struct ctdb_context *ctdb;
+ uint64_t srvid;
+ TDB_DATA data;
+};
+
+static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
+{
+ struct ctdb_local_message *m = talloc_get_type(private_data,
+ struct ctdb_local_message);
+ int res;
+
+ res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
+ if (res != 0) {
+ DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n",
+ (unsigned long long)m->srvid));
+ }
+ talloc_free(m);
+}
+
+static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
+{
+ struct ctdb_local_message *m;
+ m = talloc(ctdb, struct ctdb_local_message);
+ CTDB_NO_MEMORY(ctdb, m);
+
+ m->ctdb = ctdb;
+ m->srvid = srvid;
+ m->data = data;
+ m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
+ if (m->data.dptr == NULL) {
+ talloc_free(m);
+ return -1;
+ }
+
+ /* this needs to be done as an event to prevent recursion */
+ event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
+ return 0;
+}
+
+/*
+ send a ctdb message
+*/
+int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
+ uint64_t srvid, TDB_DATA data)
+{
+ struct ctdb_req_message *r;
+ int len;
+
+ /* see if this is a message to ourselves */
+ if (vnn == ctdb->vnn) {
+ return ctdb_local_message(ctdb, srvid, data);
+ }
+
+ len = offsetof(struct ctdb_req_message, data) + data.dsize;
+ r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
+ struct ctdb_req_message);
+ CTDB_NO_MEMORY(ctdb, r);
+
+ r->hdr.destnode = vnn;
+ r->srvid = srvid;
+ r->datalen = data.dsize;
+ memcpy(&r->data[0], data.dptr, data.dsize);
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(r);
+ return 0;
+}
+
--- /dev/null
+/*
+ ctdb ltdb code - server side
+
+ Copyright (C) Andrew Tridgell 2007
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "../include/ctdb_private.h"
+#include "db_wrap.h"
+#include "lib/util/dlinklist.h"
+
+/*
+ this is the dummy null procedure that all databases support
+*/
+static int ctdb_null_func(struct ctdb_call_info *call)
+{
+ return 0;
+}
+
+/*
+ this is a plain fetch procedure that all databases support
+*/
+static int ctdb_fetch_func(struct ctdb_call_info *call)
+{
+ call->reply_data = &call->record_data;
+ return 0;
+}
+
+
+
+struct lock_fetch_state {
+ struct ctdb_context *ctdb;
+ void (*recv_pkt)(void *, struct ctdb_req_header *);
+ void *recv_context;
+ struct ctdb_req_header *hdr;
+ uint32_t generation;
+ bool ignore_generation;
+};
+
+/*
+ called when we should retry the operation
+ */
+static void lock_fetch_callback(void *p)
+{
+ struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+ if (!state->ignore_generation &&
+ state->generation != state->ctdb->vnn_map->generation) {
+ DEBUG(0,("Discarding previous generation lockwait packet\n"));
+ talloc_free(state->hdr);
+ return;
+ }
+ state->recv_pkt(state->recv_context, state->hdr);
+ DEBUG(2,(__location__ " PACKET REQUEUED\n"));
+}
+
+
+/*
+ do a non-blocking ltdb_lock, deferring this ctdb request until we
+ have the chainlock
+
+ It does the following:
+
+ 1) tries to get the chainlock. If it succeeds, then it returns 0
+
+ 2) if it fails to get a chainlock immediately then it sets up a
+ non-blocking chainlock via ctdb_lockwait, and when it gets the
+ chainlock it re-submits this ctdb request to the main packet
+ receive function
+
+ This effectively queues all ctdb requests that cannot be
+ immediately satisfied until it can get the lock. This means that
+ the main ctdb daemon will not block waiting for a chainlock held by
+ a client
+
+ There are 3 possible return values:
+
+ 0: means that it got the lock immediately.
+ -1: means that it failed to get the lock, and won't retry
+ -2: means that it failed to get the lock immediately, but will retry
+ */
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_req_header *hdr,
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
+ void *recv_context, bool ignore_generation)
+{
+ int ret;
+ struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+ struct lockwait_handle *h;
+ struct lock_fetch_state *state;
+
+ ret = tdb_chainlock_nonblock(tdb, key);
+
+ if (ret != 0 &&
+ !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
+ /* a hard failure - don't try again */
+ return -1;
+ }
+
+ /* when torturing, ensure we test the contended path */
+ if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+ random() % 5 == 0) {
+ ret = -1;
+ tdb_chainunlock(tdb, key);
+ }
+
+ /* first the non-contended path */
+ if (ret == 0) {
+ return 0;
+ }
+
+ state = talloc(hdr, struct lock_fetch_state);
+ state->ctdb = ctdb_db->ctdb;
+ state->hdr = hdr;
+ state->recv_pkt = recv_pkt;
+ state->recv_context = recv_context;
+ state->generation = ctdb_db->ctdb->vnn_map->generation;
+ state->ignore_generation = ignore_generation;
+
+ /* now the contended path */
+ h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
+ if (h == NULL) {
+ tdb_chainunlock(tdb, key);
+ return -1;
+ }
+
+ /* we need to move the packet off the temporary context in ctdb_input_pkt(),
+ so it won't be freed yet */
+ talloc_steal(state, hdr);
+ talloc_steal(state, h);
+
+ /* now tell the caller than we will retry asynchronously */
+ return -2;
+}
+
+/*
+ a varient of ctdb_ltdb_lock_requeue that also fetches the record
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, struct ctdb_req_header *),
+ void *recv_context, bool ignore_generation)
+{
+ int ret;
+
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
+ recv_context, ignore_generation);
+ if (ret == 0) {
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ }
+ }
+ return ret;
+}
+
+
+/*
+ paraoid check to see if the db is empty
+ */
+static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
+{
+ struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+ int count = tdb_traverse_read(tdb, NULL, NULL);
+ if (count != 0) {
+ DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
+ ctdb_db->db_path));
+ ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
+ }
+}
+
+/*
+ a client has asked to attach a new database
+ */
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+ TDB_DATA *outdata)
+{
+ const char *db_name = (const char *)indata.dptr;
+ struct ctdb_db_context *ctdb_db, *tmp_db;
+ int ret;
+
+ /* see if we already have this name */
+ for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+ if (strcmp(db_name, tmp_db->db_name) == 0) {
+ /* this is not an error */
+ outdata->dptr = (uint8_t *)&tmp_db->db_id;
+ outdata->dsize = sizeof(tmp_db->db_id);
+ return 0;
+ }
+ }
+
+ ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
+ CTDB_NO_MEMORY(ctdb, ctdb_db);
+
+ ctdb_db->ctdb = ctdb;
+ ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
+ CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
+
+ ctdb_db->db_id = ctdb_hash(&indata);
+
+ outdata->dptr = (uint8_t *)&ctdb_db->db_id;
+ outdata->dsize = sizeof(ctdb_db->db_id);
+
+ /* check for hash collisions */
+ for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+ if (tmp_db->db_id == ctdb_db->db_id) {
+ DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
+ tmp_db->db_id, db_name, tmp_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+ }
+
+ if (ctdb->db_directory == NULL) {
+ ctdb->db_directory = VARDIR "/ctdb";
+ }
+
+ /* make sure the db directory exists */
+ if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
+ DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
+ ctdb->db_directory));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ /* open the database */
+ ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
+ ctdb->db_directory,
+ db_name, ctdb->vnn);
+
+ ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
+ TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
+ if (ctdb_db->ltdb == NULL) {
+ DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ ctdb_check_db_empty(ctdb_db);
+
+ DLIST_ADD(ctdb->db_list, ctdb_db);
+
+ /*
+ all databases support the "null" function. we need this in
+ order to do forced migration of records
+ */
+ ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
+ if (ret != 0) {
+ DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ /*
+ all databases support the "fetch" function. we need this
+ for efficient Samba3 ctdb fetch
+ */
+ ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
+ if (ret != 0) {
+ DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ /* tell all the other nodes about this database */
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
+ indata, NULL, NULL);
+
+ DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
+
+ /* success */
+ return 0;
+}
+
+/*
+ called when a broadcast seqnum update comes in
+ */
+int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
+{
+ struct ctdb_db_context *ctdb_db;
+ if (srcnode == ctdb->vnn) {
+ /* don't update ourselves! */
+ return 0;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
+ return -1;
+ }
+
+ tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
+ ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ return 0;
+}
+
+/*
+ timer to check for seqnum changes in a ltdb and propogate them
+ */
+static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *p)
+{
+ struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ if (new_seqnum != ctdb_db->seqnum) {
+ /* something has changed - propogate it */
+ TDB_DATA data;
+ data.dptr = (uint8_t *)&ctdb_db->db_id;
+ data.dsize = sizeof(uint32_t);
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
+ CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
+ data, NULL, NULL);
+ }
+ ctdb_db->seqnum = new_seqnum;
+
+ /* setup a new timer */
+ ctdb_db->te =
+ event_add_timed(ctdb->ev, ctdb_db,
+ timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
+ ctdb_ltdb_seqnum_check, ctdb_db);
+}
+
+/*
+ enable seqnum handling on this db
+ */
+int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
+{
+ struct ctdb_db_context *ctdb_db;
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
+ return -1;
+ }
+
+ if (ctdb_db->te == NULL) {
+ ctdb_db->te =
+ event_add_timed(ctdb->ev, ctdb_db,
+ timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
+ ctdb_ltdb_seqnum_check, ctdb_db);
+ }
+
+ tdb_enable_seqnum(ctdb_db->ltdb->tdb);
+ ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ return 0;
+}
+
};
+/*
+ called by the transport layer when a packet comes in
+*/
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+{
+ struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+
+ ctdb->statistics.node_packets_recv++;
+
+ /* up the counter for this source node, so we know its alive */
+ if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
+ /* as a special case, redirected calls don't increment the rx_cnt */
+ if (hdr->operation != CTDB_REQ_CALL ||
+ ((struct ctdb_req_call *)hdr)->hopcount == 0) {
+ ctdb->nodes[hdr->srcnode]->rx_cnt++;
+ }
+ }
+
+ ctdb_input_pkt(ctdb, hdr);
+}
+
+
+
+static const struct ctdb_upcalls ctdb_upcalls = {
+ .recv_pkt = ctdb_recv_pkt,
+ .node_dead = ctdb_node_dead,
+ .node_connected = ctdb_node_connected
+};
+
+
/*
main program
ctdb = ctdb_cmdline_init(ev);
+ ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
+ ctdb->recovery_master = (uint32_t)-1;
+ ctdb->upcalls = &ctdb_upcalls;
+ ctdb->idr = idr_init(ctdb);
+ ctdb->recovery_lock_fd = -1;
+ ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;
+
+ ctdb_tunables_set_defaults(ctdb);
+
ret = ctdb_set_recovery_lock_file(ctdb, options.recovery_lock_file);
if (ret == -1) {
printf("ctdb_set_recovery_lock_file failed - %s\n", ctdb_errstr(ctdb));