return -1;
}
+/*
+ set some ctdb flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
+{
+ ctdb->flags |= flags;
+}
+
/*
add a node to the list of active nodes
*/
static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
- printf("received pkt of length %d\n", length);
+ struct ctdb_req_header *hdr;
+ if (length < sizeof(*hdr)) {
+ ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+ return;
+ }
+ hdr = (struct ctdb_req_header *)data;
+ if (length != hdr->length) {
+ ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
+ hdr->length, length);
+ return;
+ }
+ switch (hdr->operation) {
+ case CTDB_REQ_CALL:
+ ctdb_request_call(ctdb, hdr);
+ break;
+
+ case CTDB_REPLY_CALL:
+ ctdb_reply_call(ctdb, hdr);
+ break;
+
+ default:
+ printf("Packet with unknown operation %d\n", hdr->operation);
+ talloc_free(hdr);
+ break;
+ }
}
/*
*/
void ctdb_connect_wait(struct ctdb_context *ctdb)
{
- while (ctdb->num_connected != ctdb->num_nodes - 1) {
+ int expected = ctdb->num_nodes - 1;
+ if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+ expected++;
+ }
+ while (ctdb->num_connected != expected) {
+ event_loop_once(ctdb->ev);
+ }
+}
+
+/*
+ wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb)
+{
+ int expected = 0;
+ if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+ expected++;
+ }
+ while (ctdb->num_connected > expected) {
event_loop_once(ctdb->ev);
}
}
}
/*
- make a remote ctdb call
+ called when a CTDB_REQ_CALL packet comes in
*/
-int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
- TDB_DATA *call_data, TDB_DATA *reply_data)
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
- uint32_t dest;
+ struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+ TDB_DATA key, call_data, reply_data;
+ struct ctdb_reply_call *r;
+ struct ctdb_node *node;
+
+ key.dptr = c->data;
+ key.dsize = c->keylen;
+ call_data.dptr = c->data + c->keylen;
+ call_data.dsize = c->calldatalen;
+
+ ctdb_call_local(ctdb, key, c->callid,
+ call_data.dsize?&call_data:NULL,
+ &reply_data);
+
+ r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+ r->hdr.length = sizeof(*r) + reply_data.dsize;
+ r->hdr.operation = CTDB_REPLY_CALL;
+ r->hdr.destnode = hdr->srcnode;
+ r->hdr.srcnode = hdr->destnode;
+ r->hdr.reqid = hdr->reqid;
+ r->datalen = reply_data.dsize;
+ memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
+
+ node = ctdb->nodes[hdr->srcnode];
+
+ ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
+
+ talloc_free(reply_data.dptr);
+ talloc_free(r);
+}
+
+enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
+
+/*
+ state of a in-progress ctdb call
+*/
+struct ctdb_call_state {
+ enum call_state state;
struct ctdb_req_call *c;
- uint32_t len;
struct ctdb_node *node;
+ TDB_DATA reply_data;
+};
+
+
+/*
+ called when a CTDB_REPLY_CALL packet comes in
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+ struct ctdb_call_state *state;
+ TDB_DATA reply_data;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ reply_data.dptr = c->data;
+ reply_data.dsize = c->datalen;
+
+ state->reply_data = reply_data;
+
+ talloc_steal(state, c);
+
+ state->state = CTDB_CALL_DONE;
+}
+
+/*
+ destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+ idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+ return 0;
+}
+
+/*
+ called when a call times out
+*/
+void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+ state->state = CTDB_CALL_ERROR;
+ ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
+}
+
+/*
+ fake an event driven local ctdb_call
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ struct ctdb_call_state *state;
+ int ret;
+
+ state = talloc_zero(ctdb, struct ctdb_call_state);
+ CTDB_NO_MEMORY(ctdb, state);
+
+ state->state = CTDB_CALL_DONE;
+ state->node = ctdb->nodes[ctdb->vnn];
+
+ ret = ctdb_call_local(ctdb, key, call_id, call_data, &state->reply_data);
+ return state;
+}
+
+
+/*
+ make a remote ctdb call - async send
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ uint32_t dest;
+ uint32_t len;
+ struct ctdb_call_state *state;
dest = ctdb_hash(&key) % ctdb->num_nodes;
- if (dest == ctdb->vnn) {
- return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
+ if (dest == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+ return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data);
}
- len = sizeof(*c) + key.dsize + (call_data?call_data->dsize:0);
- c = talloc_size(ctdb, len);
- CTDB_NO_MEMORY(ctdb, c);
+ state = talloc_zero(ctdb, struct ctdb_call_state);
+ CTDB_NO_MEMORY(ctdb, state);
- c->hdr.operation = CTDB_OP_CALL;
- c->hdr.destnode = dest;
- c->hdr.srcnode = ctdb->vnn;
+ len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+ state->c = talloc_size(ctdb, len);
+ CTDB_NO_MEMORY(ctdb, state->c);
+
+ state->c->hdr.length = len;
+ state->c->hdr.operation = CTDB_REQ_CALL;
+ state->c->hdr.destnode = dest;
+ state->c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
- c->hdr.reqid = idr_get_new(ctdb->idr, c, 0xFFFF);
- c->callid = call_id;
- c->keylen = key.dsize;
- c->calldatalen = call_data?call_data->dsize:0;
- memcpy(&c->data[0], key.dptr, key.dsize);
+ state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ state->c->callid = call_id;
+ state->c->keylen = key.dsize;
+ state->c->calldatalen = call_data?call_data->dsize:0;
+ memcpy(&state->c->data[0], key.dptr, key.dsize);
if (call_data) {
- memcpy(&c->data[key.dsize], call_data->dptr, call_data->dsize);
+ memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
}
- node = ctdb->nodes[dest];
+ state->node = ctdb->nodes[dest];
+ state->state = CTDB_CALL_WAIT;
- if (ctdb->methods->queue_pkt(node, (uint8_t *)c, len) != 0) {
- talloc_free(c);
- return -1;
+ talloc_set_destructor(state, ctdb_call_destructor);
+
+ if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c, len) != 0) {
+ talloc_free(state);
+ return NULL;
}
- /*
- event_add_timed(ctdb->ev, c, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
- ctdb_call_timeout, c);
- */
- return -1;
+ event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
+ ctdb_call_timeout, state);
+ return state;
+}
+
+
+/*
+ make a remote ctdb call - async recv
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
+{
+ while (state->state < CTDB_CALL_DONE) {
+ event_loop_once(state->node->ctdb->ev);
+ }
+ if (state->state != CTDB_CALL_DONE) {
+ talloc_free(state);
+ return -1;
+ }
+ if (reply_data) {
+ reply_data->dptr = talloc_memdup(state->node->ctdb,
+ state->reply_data.dptr,
+ state->reply_data.dsize);
+ reply_data->dsize = state->reply_data.dsize;
+ }
+ talloc_free(state);
+ return 0;
}
+/*
+ full ctdb_call
+*/
+int ctdb_call(struct ctdb_context *ctdb,
+ TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ struct ctdb_call_state *state;
+ state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
+ return ctdb_call_recv(state, reply_data);
+}