This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
- version 2 of the License, or (at your option) any later version.
+ version 3 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
return NULL;
}
+
+/*
+ this is the dummy null procedure that all databases support
+*/
+static int ctdb_null_func(struct ctdb_call_info *call)
+{
+ return 0;
+}
+
+
/*
attach to a specific database
*/
{
struct ctdb_db_context *ctdb_db, *tmp_db;
TDB_DATA data;
+ int ret;
ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
}
}
+ if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
+ DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
+ ctdb->db_directory));
+ talloc_free(ctdb_db);
+ return NULL;
+ }
+
+ /* add the node id to the database name, so when we run on loopback
+ we don't conflict in the local filesystem */
+ name = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name);
+
/* when we have a separate daemon this will need to be a real
file, not a TDB_INTERNAL, so the parent can access it to
for ltdb bypass */
- ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_INTERNAL, open_flags, mode);
+ ctdb_db->ltdb = tdb_wrap_open(ctdb, name, 0, TDB_CLEAR_IF_FIRST, open_flags, mode);
if (ctdb_db->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
talloc_free(ctdb_db);
return NULL;
}
+
+ /*
+ all databases support the "null" function. we need this in
+ order to do forced migration of records
+ */
+ ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
+ if (ret != 0) {
+ talloc_free(ctdb_db);
+ return NULL;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
+
return ctdb_db;
}
returned if the record is not present
*/
int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data)
{
TDB_DATA rec;
struct ctdb_context *ctdb = ctdb_db->ctdb;
rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
if (rec.dsize < sizeof(*header)) {
+ TDB_DATA d2;
/* return an initial header */
- free(rec.dptr);
+ if (rec.dptr) free(rec.dptr);
ltdb_initial_header(ctdb_db, key, header);
- data->dptr = NULL;
- data->dsize = 0;
+ ZERO_STRUCT(d2);
+ if (data) {
+ *data = d2;
+ }
+ ctdb_ltdb_store(ctdb_db, key, header, d2);
return 0;
}
*header = *(struct ctdb_ltdb_header *)rec.dptr;
- data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
- data->dptr = talloc_memdup(ctdb_db, sizeof(struct ctdb_ltdb_header)+rec.dptr,
- data->dsize);
+ if (data) {
+ data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(mem_ctx,
+ sizeof(struct ctdb_ltdb_header)+rec.dptr,
+ data->dsize);
+ }
+
free(rec.dptr);
- CTDB_NO_MEMORY(ctdb, data->dptr);
+ if (data) {
+ CTDB_NO_MEMORY(ctdb, data->dptr);
+ }
return 0;
}
return ret;
}
+
+
+/*
+ lock a record in the ltdb, given a key
+ */
+int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
+{
+ return tdb_chainlock(ctdb_db->ltdb->tdb, key);
+}
+
+/*
+ unlock a record in the ltdb, given a key
+ */
+int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
+{
+ return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+}
+
+struct lock_fetch_state {
+ struct ctdb_context *ctdb;
+ void (*recv_pkt)(void *, uint8_t *, uint32_t);
+ void *recv_context;
+ struct ctdb_req_header *hdr;
+};
+
+/*
+ called when we should retry the operation
+ */
+static void lock_fetch_callback(void *p)
+{
+ struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+ state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
+ talloc_free(state);
+ DEBUG(2,(__location__ " PACKET REQUEUED\n"));
+}
+
+
+/*
+ do a non-blocking ltdb_lock, deferring this ctdb request until we
+ have the chainlock
+
+ It does the following:
+
+ 1) tries to get the chainlock. If it succeeds, then it returns 0
+
+ 2) if it fails to get a chainlock immediately then it sets up a
+ non-blocking chainlock via ctdb_lockwait, and when it gets the
+ chainlock it re-submits this ctdb request to the main packet
+ receive function
+
+ This effectively queues all ctdb requests that cannot be
+ immediately satisfied until it can get the lock. This means that
+ the main ctdb daemon will not block waiting for a chainlock held by
+ a client
+
+ There are 3 possible return values:
+
+ 0: means that it got the lock immediately.
+ -1: means that it failed to get the lock, and won't retry
+ -2: means that it failed to get the lock immediately, but will retry
+ */
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_req_header *hdr,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
+{
+ int ret;
+ struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+ struct lockwait_handle *h;
+ struct lock_fetch_state *state;
+
+ ret = tdb_chainlock_nonblock(tdb, key);
+
+ if (ret != 0 &&
+ !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
+ /* a hard failure - don't try again */
+ return -1;
+ }
+
+ /* when torturing, ensure we test the contended path */
+ if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+ random() % 5 == 0) {
+ ret = -1;
+ tdb_chainunlock(tdb, key);
+ }
+
+ /* first the non-contended path */
+ if (ret == 0) {
+ return 0;
+ }
+
+ state = talloc(ctdb_db, struct lock_fetch_state);
+ state->ctdb = ctdb_db->ctdb;
+ state->hdr = hdr;
+ state->recv_pkt = recv_pkt;
+ state->recv_context = recv_context;
+
+ /* now the contended path */
+ h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
+ if (h == NULL) {
+ tdb_chainunlock(tdb, key);
+ return -1;
+ }
+
+ /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
+ so it won't be freed yet */
+ talloc_steal(state, hdr);
+ talloc_steal(state, h);
+
+ /* now tell the caller than we will retry asynchronously */
+ return -2;
+}
+
+/*
+ a varient of ctdb_ltdb_lock_requeue that also fetches the record
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
+{
+ int ret;
+
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
+ if (ret == 0) {
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ }
+ }
+ return ret;
+}