const char *myaddress;
int self_connect;
const char *db_dir;
+ int torture;
} ctdb_cmdline = {
.nlist = NULL,
.transport = "tcp",
.myaddress = NULL,
.self_connect = 0,
- .db_dir = NULL
+ .db_dir = NULL,
+ .torture = 0
};
{ "self-connect", 0, POPT_ARG_NONE, &ctdb_cmdline.self_connect, 0, "enable self connect", "boolean" },
{ "debug", 'd', POPT_ARG_INT, &LogLevel, 0, "debug level"},
{ "dbdir", 0, POPT_ARG_STRING, &ctdb_cmdline.db_dir, 0, "directory for the tdb files", NULL },
+ { "torture", 0, POPT_ARG_NONE, &ctdb_cmdline.torture, 0, "enable nastiness in library", NULL },
{ NULL }
};
if (ctdb_cmdline.self_connect) {
ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
}
+ if (ctdb_cmdline.torture) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_TORTURE);
+ }
ret = ctdb_set_transport(ctdb, ctdb_cmdline.transport);
if (ret == -1) {
talloc_free(tmp_ctx);
}
+/*
+ called by the transport layer when a packet comes in
+*/
+void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length)
+{
+ struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+ ctdb_recv_pkt(ctdb, data, length);
+}
+
/*
called by the transport layer when a node is dead
*/
if (r->hdr.destnode == ctdb->vnn) {
/* we are the lmaster - don't send to ourselves */
- ctdb_request_dmaster(ctdb, &r->hdr);
+ ctdb_recv_pkt(ctdb, (uint8_t *)&r->hdr, r->hdr.length);
+ return;
} else {
ctdb_queue_packet(ctdb, &r->hdr);
return;
}
- /* if the new dmaster and the lmaster are the same node, then
- we don't need to update the record header now */
- if (c->dmaster != ctdb->vnn) {
- /* fetch the current record */
- ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2);
- if (ret == -1) {
- ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
- return;
- }
- if (ret == -2) {
- DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
- return;
- }
-
- /* its a protocol error if the sending node is not the current dmaster */
- if (header.dmaster != hdr->srcnode) {
- ctdb_fatal(ctdb, "dmaster request from non-master");
- return;
- }
-
- header.dmaster = c->dmaster;
- ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
- ctdb_ltdb_unlock(ctdb_db, key);
- if (ret != 0) {
- ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
- return;
- }
+ /* fetch the current record */
+ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
+ ctdb_recv_raw_pkt, ctdb);
+ if (ret == -1) {
+ ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
+ return;
+ }
+ if (ret == -2) {
+ DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
+ return;
+ }
+
+ /* its a protocol error if the sending node is not the current dmaster */
+ if (header.dmaster != hdr->srcnode) {
+ ctdb_fatal(ctdb, "dmaster request from non-master");
+ return;
+ }
+
+ header.dmaster = c->dmaster;
+ ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
+ ctdb_ltdb_unlock(ctdb_db, key);
+ if (ret != 0) {
+ ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+ return;
}
/* put the packet on a temporary context, allowing us to safely free
fetches the record data (if any), thus avoiding a 2nd fetch of the data
if the call will be answered locally */
- ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data);
+ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
+ ctdb_recv_raw_pkt, ctdb);
if (ret == -1) {
ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
return;
return state;
}
-/*
- make a remote ctdb call - async send. Called in daemon context.
-
- This constructs a ctdb_call request and queues it for processing.
- This call never blocks.
-*/
-struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call)
-{
- int ret;
- struct ctdb_ltdb_header header;
- TDB_DATA data;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
-
- /*
- if we are the dmaster for this key then we don't need to
- send it off at all, we can bypass the network and handle it
- locally. To find out if we are the dmaster we need to look
- in our ltdb
- */
- ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
- if (ret != 0) return NULL;
-
- if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- struct ctdb_call_state *state;
- state = ctdb_call_local_send(ctdb_db, call, &header, &data);
- talloc_free(data.dptr);
- return state;
- }
-
- talloc_free(data.dptr);
-
- return ctdb_daemon_call_send_remote(ctdb_db, call, &header);
-}
-
-
/*
make a remote ctdb call - async recv - called in daemon context
return NULL;
}
+ /* when torturing, ensure we test the remote path */
+ if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+ random() % 5 == 0) {
+ h->header.dmaster = (uint32_t)-1;
+ }
+
+
DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
if (h->header.dmaster != ctdb_db->ctdb->vnn) {
#include "../include/ctdb.h"
#include "../include/ctdb_private.h"
+/*
+ structure describing a connected client in the daemon
+ */
+struct ctdb_client {
+ struct ctdb_context *ctdb;
+ int fd;
+ struct ctdb_queue *queue;
+};
+
+
+
+static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
+
static void ctdb_main_loop(struct ctdb_context *ctdb)
{
ctdb->methods->start(ctdb);
}
-/*
- structure describing a connected client in the daemon
- */
-struct ctdb_client {
- struct ctdb_context *ctdb;
- int fd;
- struct ctdb_queue *queue;
-};
-
-
/*
message handler for when we are in daemon mode. This redirects the message
to the right client
talloc_free(dstate);
}
+
/*
this is called when the ctdb daemon received a ctdb request call
from a local client over the unix domain socket
struct ctdb_db_context *ctdb_db;
struct daemon_call_state *dstate;
struct ctdb_call *call;
+ struct ctdb_ltdb_header header;
+ TDB_DATA key, data;
+ int ret;
+ struct ctdb_context *ctdb = client->ctdb;
ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
if (!ctdb_db) {
return;
}
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+
+ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
+ (struct ctdb_req_header *)c, &data,
+ daemon_incoming_packet, client);
+ if (ret == -2) {
+ /* will retry later */
+ return;
+ }
+
+ if (ret != 0) {
+ DEBUG(0,(__location__ " Unable to fetch record\n"));
+ return;
+ }
+
dstate = talloc(client, struct daemon_call_state);
if (dstate == NULL) {
+ ctdb_ltdb_unlock(ctdb_db, key);
DEBUG(0,(__location__ " Unable to allocate dstate\n"));
return;
}
dstate->client = client;
dstate->reqid = c->hdr.reqid;
+ talloc_steal(dstate, data.dptr);
call = dstate->call = talloc_zero(dstate, struct ctdb_call);
if (call == NULL) {
+ ctdb_ltdb_unlock(ctdb_db, key);
DEBUG(0,(__location__ " Unable to allocate call\n"));
return;
}
call->call_id = c->callid;
- call->key.dptr = c->data;
- call->key.dsize = c->keylen;
+ call->key = key;
call->call_data.dptr = c->data + c->keylen;
call->call_data.dsize = c->calldatalen;
- state = ctdb_daemon_call_send(ctdb_db, call);
+ if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ state = ctdb_call_local_send(ctdb_db, call, &header, &data);
+ } else {
+ state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+ }
+
+ ctdb_ltdb_unlock(ctdb_db, key);
+
if (state == NULL) {
DEBUG(0,(__location__ " Unable to setup call send\n"));
return;
}
/* data contains a packet from the client */
-static void daemon_incoming_packet(struct ctdb_client *client, void *data, size_t nread)
+static void daemon_incoming_packet(void *p, uint8_t *data, uint32_t nread)
{
- struct ctdb_req_header *hdr = data;
+ struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
+ struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
TALLOC_CTX *tmp_ctx;
/* place the packet as a child of a tmp_ctx. We then use
return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
}
+struct lock_fetch_state {
+ struct ctdb_context *ctdb;
+ void (*recv_pkt)(void *, uint8_t *, uint32_t);
+ void *recv_context;
+ struct ctdb_req_header *hdr;
+};
+
/*
called when we should retry the operation
*/
static void lock_fetch_callback(void *p)
{
- struct ctdb_req_header *hdr = p;
- struct ctdb_context *ctdb = talloc_find_parent_bytype(p, struct ctdb_context);
- ctdb_recv_pkt(ctdb, (uint8_t *)hdr, hdr->length);
- DEBUG(0,(__location__ " PACKET REQUEUED\n"));
+ struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
+ state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
+ talloc_free(state);
+ DEBUG(2,(__location__ " PACKET REQUEUED\n"));
}
/*
*/
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_ltdb_header *header,
- struct ctdb_req_header *hdr, TDB_DATA *data)
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
{
int ret;
struct tdb_context *tdb = ctdb_db->ltdb->tdb;
struct lockwait_handle *h;
+ struct lock_fetch_state *state;
ret = tdb_chainlock_nonblock(tdb, key);
return -1;
}
+ /* when torturing, ensure we test the contended path */
+ if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
+ random() % 5 == 0) {
+ ret = -1;
+ tdb_chainunlock(tdb, key);
+ }
+
/* first the non-contended path */
if (ret == 0) {
ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
return ret;
}
+ state = talloc(ctdb_db, struct lock_fetch_state);
+ state->ctdb = ctdb_db->ctdb;
+ state->hdr = hdr;
+ state->recv_pkt = recv_pkt;
+ state->recv_context = recv_context;
+
/* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, hdr);
+ h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
if (h == NULL) {
tdb_chainunlock(tdb, key);
return -1;
/* we need to move the packet off the temporary context in ctdb_recv_pkt(),
so it won't be freed yet */
- talloc_steal(ctdb_db, hdr);
+ talloc_steal(state, hdr);
+ talloc_steal(state, h);
/* now tell the caller than we will retry asynchronously */
return -2;
ctdb flags
*/
#define CTDB_FLAG_SELF_CONNECT (1<<0)
-/* for test code only: make ctdb_start() block until all nodes are connected */
-#define CTDB_FLAG_CONNECT_WAIT (1<<2)
+#define CTDB_FLAG_TORTURE (1<<1)
/*
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_ltdb_header *header,
- struct ctdb_req_header *hdr, TDB_DATA *data);
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context);
void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
void *_idr_find_type(struct idr_context *idp, int id, const char *type, const char *location);
#define idr_find_type(idp, id, type) (type *)_idr_find_type(idp, id, #type, __location__)
+void ctdb_recv_raw_pkt(void *p, uint8_t *data, uint32_t length);
+
#endif
ctdb_tcp_node_connect, node);
}
- if (ctdb->flags&CTDB_FLAG_CONNECT_WAIT) {
- /* wait until all nodes are connected (should not be needed
- outide of test code) */
- ctdb_connect_wait(ctdb);
- }
-
return 0;
}