- added ctdb_set_flags() call
[vlendec/samba-autobuild/.git] / ctdb / common / ctdb_call.c
index d0e9790ec8aaf083621f2c3e81ce5e89eb9e8189..c6dff1e5c0da382109ffe1232857b20c7ee00c62 100644 (file)
@@ -86,49 +86,199 @@ static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
 }
 
 /*
-  make a remote ctdb call
+  called when a CTDB_REQ_CALL packet comes in
 */
-int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id, 
-             TDB_DATA *call_data, TDB_DATA *reply_data)
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       uint32_t dest;
+       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       TDB_DATA key, call_data, reply_data;
+       struct ctdb_reply_call *r;
+       struct ctdb_node *node;
+
+       key.dptr = c->data;
+       key.dsize = c->keylen;
+       call_data.dptr = c->data + c->keylen;
+       call_data.dsize = c->calldatalen;
+
+       ctdb_call_local(ctdb, key, c->callid, 
+                       call_data.dsize?&call_data:NULL,
+                       &reply_data);
+
+       r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+       r->hdr.length = sizeof(*r) + reply_data.dsize;
+       r->hdr.operation = CTDB_REPLY_CALL;
+       r->hdr.destnode  = hdr->srcnode;
+       r->hdr.srcnode   = hdr->destnode;
+       r->hdr.reqid     = hdr->reqid;
+       r->datalen       = reply_data.dsize;
+       memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
+
+       node = ctdb->nodes[hdr->srcnode];
+
+       ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
+
+       talloc_free(reply_data.dptr);
+       talloc_free(r);
+}
+
+enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
+
+/*
+  state of a in-progress ctdb call
+*/
+struct ctdb_call_state {
+       enum call_state state;
        struct ctdb_req_call *c;
-       uint32_t len;
        struct ctdb_node *node;
+       TDB_DATA reply_data;
+};
+
+
+/*
+  called when a CTDB_REPLY_CALL packet comes in
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_call_state *state;
+       TDB_DATA reply_data;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       reply_data.dptr = c->data;
+       reply_data.dsize = c->datalen;
+
+       state->reply_data = reply_data;
+
+       talloc_steal(state, c);
+
+       state->state = CTDB_CALL_DONE;
+}
+
+/*
+  destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+       idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+       return 0;
+}
+
+/*
+  called when a call times out
+*/
+void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private)
+{
+       struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+       state->state = CTDB_CALL_ERROR;
+       ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
+}
+
+/*
+  fake an event driven local ctdb_call
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, 
+                                            TDB_DATA key, int call_id, 
+                                            TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       struct ctdb_call_state *state;
+       int ret;
+
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY(ctdb, state);
+
+       state->state = CTDB_CALL_DONE;
+       state->node = ctdb->nodes[ctdb->vnn];
+
+       ret = ctdb_call_local(ctdb, key, call_id, call_data, &state->reply_data);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async send
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, 
+                                      TDB_DATA key, int call_id, 
+                                      TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       uint32_t dest;
+       uint32_t len;
+       struct ctdb_call_state *state;
 
        dest = ctdb_hash(&key) % ctdb->num_nodes;
-       if (dest == ctdb->vnn) {
-               return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
+       if (dest == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data);
        }
 
-       len = sizeof(*c) + key.dsize + (call_data?call_data->dsize:0);
-       c = talloc_size(ctdb, len);
-       CTDB_NO_MEMORY(ctdb, c);
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY(ctdb, state);
 
-       c->hdr.operation = CTDB_OP_CALL;
-       c->hdr.destnode  = dest;
-       c->hdr.srcnode   = ctdb->vnn;
+       len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+       state->c = talloc_size(ctdb, len);
+       CTDB_NO_MEMORY(ctdb, state->c);
+
+       state->c->hdr.length    = len;
+       state->c->hdr.operation = CTDB_REQ_CALL;
+       state->c->hdr.destnode  = dest;
+       state->c->hdr.srcnode   = ctdb->vnn;
        /* this limits us to 16k outstanding messages - not unreasonable */
-       c->hdr.reqid     = idr_get_new(ctdb->idr, c, 0xFFFF);
-       c->callid        = call_id;
-       c->keylen        = key.dsize;
-       c->calldatalen   = call_data?call_data->dsize:0;
-       memcpy(&c->data[0], key.dptr, key.dsize);
+       state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
+       state->c->callid        = call_id;
+       state->c->keylen        = key.dsize;
+       state->c->calldatalen   = call_data?call_data->dsize:0;
+       memcpy(&state->c->data[0], key.dptr, key.dsize);
        if (call_data) {
-               memcpy(&c->data[key.dsize], call_data->dptr, call_data->dsize);
+               memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
        }
 
-       node = ctdb->nodes[dest];
+       state->node = ctdb->nodes[dest];
+       state->state = CTDB_CALL_WAIT;
 
-       if (ctdb->methods->queue_pkt(node, (uint8_t *)c, len) != 0) {
-               talloc_free(c);
-               return -1;
+       talloc_set_destructor(state, ctdb_call_destructor);
+
+       if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c, len) != 0) {
+               talloc_free(state);
+               return NULL;
        }
 
-       /*
-       event_add_timed(ctdb->ev, c, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
-                       ctdb_call_timeout, c);
-       */
-       return -1;
+       event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
+                       ctdb_call_timeout, state);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async recv
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
+{
+       while (state->state < CTDB_CALL_DONE) {
+               event_loop_once(state->node->ctdb->ev);
+       }
+       if (state->state != CTDB_CALL_DONE) {
+               talloc_free(state);
+               return -1;
+       }
+       if (reply_data) {
+               reply_data->dptr = talloc_memdup(state->node->ctdb,
+                                                state->reply_data.dptr,
+                                                state->reply_data.dsize);
+               reply_data->dsize = state->reply_data.dsize;
+       }
+       talloc_free(state);
+       return 0;
 }
 
+/*
+  full ctdb_call
+*/
+int ctdb_call(struct ctdb_context *ctdb, 
+             TDB_DATA key, int call_id, 
+             TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       struct ctdb_call_state *state;
+       state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
+       return ctdb_call_recv(state, reply_data);
+}