- added ctdb_set_flags() call
authorAndrew Tridgell <tridge@samba.org>
Fri, 1 Dec 2006 04:45:24 +0000 (15:45 +1100)
committerAndrew Tridgell <tridge@samba.org>
Fri, 1 Dec 2006 04:45:24 +0000 (15:45 +1100)
- added --self-connect option to ctdb_test, allowing testing when a
  node connects to itself. not as efficient as local bypass, but very
  useful for testing purposes (easier to work with 1 task in gdb than
  2)

- split the ctdb_call() into an async triple, in the style of Samba4
  async functions. So we now have ctdb_call_send(), ctdb_call_recv()
  and ctdb_call().

- added the main ctdb_call protocol logic. No error checking yet, but
  seems to work for simple cases

- ensure we initialise the length argument to getsockopt()

common/ctdb.c
common/ctdb_call.c
ctdb_test.c
include/ctdb.h
include/ctdb_private.h
tcp/tcp_connect.c
tcp/tcp_init.c
tcp/tcp_io.c

index 1ab1f8ab86b5653729d7bb63a9fcf0062d601303..f09f6029b1b2a6b1fe7fda7e87e6443455532cc2 100644 (file)
@@ -38,6 +38,14 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
        return -1;
 }
 
+/*
+  set some ctdb flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
+{
+       ctdb->flags |= flags;
+}
+
 
 /*
   add a node to the list of active nodes
@@ -149,7 +157,31 @@ int ctdb_start(struct ctdb_context *ctdb)
 */
 static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
 {
-       printf("received pkt of length %d\n", length);
+       struct ctdb_req_header *hdr;
+       if (length < sizeof(*hdr)) {
+               ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+               return;
+       }
+       hdr = (struct ctdb_req_header *)data;
+       if (length != hdr->length) {
+               ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
+                              hdr->length, length);
+               return;
+       }
+       switch (hdr->operation) {
+       case CTDB_REQ_CALL:
+               ctdb_request_call(ctdb, hdr);
+               break;
+
+       case CTDB_REPLY_CALL:
+               ctdb_reply_call(ctdb, hdr);
+               break;
+
+       default:
+               printf("Packet with unknown operation %d\n", hdr->operation);
+               talloc_free(hdr);
+               break;
+       }
 }
 
 /*
@@ -177,7 +209,25 @@ static void ctdb_node_connected(struct ctdb_node *node)
 */
 void ctdb_connect_wait(struct ctdb_context *ctdb)
 {
-       while (ctdb->num_connected != ctdb->num_nodes - 1) {
+       int expected = ctdb->num_nodes - 1;
+       if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+               expected++;
+       }
+       while (ctdb->num_connected != expected) {
+               event_loop_once(ctdb->ev);
+       }
+}
+
+/*
+  wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb)
+{
+       int expected = 0;
+       if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+               expected++;
+       }
+       while (ctdb->num_connected > expected) {
                event_loop_once(ctdb->ev);
        }
 }
index d0e9790ec8aaf083621f2c3e81ce5e89eb9e8189..c6dff1e5c0da382109ffe1232857b20c7ee00c62 100644 (file)
@@ -86,49 +86,199 @@ static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
 }
 
 /*
-  make a remote ctdb call
+  called when a CTDB_REQ_CALL packet comes in
 */
-int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id, 
-             TDB_DATA *call_data, TDB_DATA *reply_data)
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       uint32_t dest;
+       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       TDB_DATA key, call_data, reply_data;
+       struct ctdb_reply_call *r;
+       struct ctdb_node *node;
+
+       key.dptr = c->data;
+       key.dsize = c->keylen;
+       call_data.dptr = c->data + c->keylen;
+       call_data.dsize = c->calldatalen;
+
+       ctdb_call_local(ctdb, key, c->callid, 
+                       call_data.dsize?&call_data:NULL,
+                       &reply_data);
+
+       r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+       r->hdr.length = sizeof(*r) + reply_data.dsize;
+       r->hdr.operation = CTDB_REPLY_CALL;
+       r->hdr.destnode  = hdr->srcnode;
+       r->hdr.srcnode   = hdr->destnode;
+       r->hdr.reqid     = hdr->reqid;
+       r->datalen       = reply_data.dsize;
+       memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
+
+       node = ctdb->nodes[hdr->srcnode];
+
+       ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
+
+       talloc_free(reply_data.dptr);
+       talloc_free(r);
+}
+
+enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
+
+/*
+  state of a in-progress ctdb call
+*/
+struct ctdb_call_state {
+       enum call_state state;
        struct ctdb_req_call *c;
-       uint32_t len;
        struct ctdb_node *node;
+       TDB_DATA reply_data;
+};
+
+
+/*
+  called when a CTDB_REPLY_CALL packet comes in
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_call_state *state;
+       TDB_DATA reply_data;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       reply_data.dptr = c->data;
+       reply_data.dsize = c->datalen;
+
+       state->reply_data = reply_data;
+
+       talloc_steal(state, c);
+
+       state->state = CTDB_CALL_DONE;
+}
+
+/*
+  destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+       idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+       return 0;
+}
+
+/*
+  called when a call times out
+*/
+void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private)
+{
+       struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+       state->state = CTDB_CALL_ERROR;
+       ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
+}
+
+/*
+  fake an event driven local ctdb_call
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, 
+                                            TDB_DATA key, int call_id, 
+                                            TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       struct ctdb_call_state *state;
+       int ret;
+
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY(ctdb, state);
+
+       state->state = CTDB_CALL_DONE;
+       state->node = ctdb->nodes[ctdb->vnn];
+
+       ret = ctdb_call_local(ctdb, key, call_id, call_data, &state->reply_data);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async send
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, 
+                                      TDB_DATA key, int call_id, 
+                                      TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       uint32_t dest;
+       uint32_t len;
+       struct ctdb_call_state *state;
 
        dest = ctdb_hash(&key) % ctdb->num_nodes;
-       if (dest == ctdb->vnn) {
-               return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
+       if (dest == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data);
        }
 
-       len = sizeof(*c) + key.dsize + (call_data?call_data->dsize:0);
-       c = talloc_size(ctdb, len);
-       CTDB_NO_MEMORY(ctdb, c);
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY(ctdb, state);
 
-       c->hdr.operation = CTDB_OP_CALL;
-       c->hdr.destnode  = dest;
-       c->hdr.srcnode   = ctdb->vnn;
+       len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+       state->c = talloc_size(ctdb, len);
+       CTDB_NO_MEMORY(ctdb, state->c);
+
+       state->c->hdr.length    = len;
+       state->c->hdr.operation = CTDB_REQ_CALL;
+       state->c->hdr.destnode  = dest;
+       state->c->hdr.srcnode   = ctdb->vnn;
        /* this limits us to 16k outstanding messages - not unreasonable */
-       c->hdr.reqid     = idr_get_new(ctdb->idr, c, 0xFFFF);
-       c->callid        = call_id;
-       c->keylen        = key.dsize;
-       c->calldatalen   = call_data?call_data->dsize:0;
-       memcpy(&c->data[0], key.dptr, key.dsize);
+       state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
+       state->c->callid        = call_id;
+       state->c->keylen        = key.dsize;
+       state->c->calldatalen   = call_data?call_data->dsize:0;
+       memcpy(&state->c->data[0], key.dptr, key.dsize);
        if (call_data) {
-               memcpy(&c->data[key.dsize], call_data->dptr, call_data->dsize);
+               memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
        }
 
-       node = ctdb->nodes[dest];
+       state->node = ctdb->nodes[dest];
+       state->state = CTDB_CALL_WAIT;
 
-       if (ctdb->methods->queue_pkt(node, (uint8_t *)c, len) != 0) {
-               talloc_free(c);
-               return -1;
+       talloc_set_destructor(state, ctdb_call_destructor);
+
+       if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c, len) != 0) {
+               talloc_free(state);
+               return NULL;
        }
 
-       /*
-       event_add_timed(ctdb->ev, c, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
-                       ctdb_call_timeout, c);
-       */
-       return -1;
+       event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
+                       ctdb_call_timeout, state);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async recv
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
+{
+       while (state->state < CTDB_CALL_DONE) {
+               event_loop_once(state->node->ctdb->ev);
+       }
+       if (state->state != CTDB_CALL_DONE) {
+               talloc_free(state);
+               return -1;
+       }
+       if (reply_data) {
+               reply_data->dptr = talloc_memdup(state->node->ctdb,
+                                                state->reply_data.dptr,
+                                                state->reply_data.dsize);
+               reply_data->dsize = state->reply_data.dsize;
+       }
+       talloc_free(state);
+       return 0;
 }
 
+/*
+  full ctdb_call
+*/
+int ctdb_call(struct ctdb_context *ctdb, 
+             TDB_DATA key, int call_id, 
+             TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       struct ctdb_call_state *state;
+       state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
+       return ctdb_call_recv(state, reply_data);
+}
index e6ccf339a08f409b274f49fe06d4a94d0bfdeade..5bc35ad332228ebbcbe727f1c5c5454992328073 100644 (file)
@@ -79,12 +79,14 @@ int main(int argc, const char *argv[])
        const char *nlist = NULL;
        const char *transport = "tcp";
        const char *myaddress = NULL;
+       int self_connect=0;
 
        struct poptOption popt_options[] = {
                POPT_AUTOHELP
                { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
                { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
                { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+               { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
                POPT_TABLEEND
        };
        int opt;
@@ -127,6 +129,10 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       if (self_connect) {
+               ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+       }
+
        ret = ctdb_set_transport(ctdb, transport);
        if (ret == -1) {
                printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
@@ -192,6 +198,9 @@ int main(int argc, const char *argv[])
        }
        talloc_free(data.dptr);
 
+       /* go into a wait loop to allow other nodes to complete */
+       ctdb_wait_loop(ctdb);
+
        /* shut it down */
        talloc_free(ctdb);
        return 0;
index 624a93975f4b6d18e949367f9874dce83497a4e8..ed00dc8a84323c689c9e2da336bc1f532e0239e5 100644 (file)
@@ -33,6 +33,12 @@ struct ctdb_call {
 #define CTDB_ERR_INVALID 1
 #define CTDB_ERR_NOMEM 2
 
+/*
+  ctdb flags
+*/
+#define CTDB_FLAG_SELF_CONNECT (1<<0)
+
+
 struct event_context;
 
 /*
@@ -45,6 +51,11 @@ struct ctdb_context *ctdb_init(struct event_context *ev);
 */
 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
 
+/*
+  set some flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags);
+
 /*
   tell ctdb what address to listen on, in transport specific format
 */
@@ -93,3 +104,8 @@ int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
 */
 void ctdb_connect_wait(struct ctdb_context *ctdb);
 
+/*
+  wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb);
+
index 653f74ffdcd2e077ab0380e005328ef8789712e2..e2f851749de1ec54445bc133d9f817fea05c4165 100644 (file)
@@ -79,6 +79,7 @@ struct ctdb_context {
        uint32_t vnn; /* our own vnn */
        uint32_t num_nodes;
        uint32_t num_connected;
+       unsigned flags;
        struct idr_context *idr;
        struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
        struct ctdb_registered_call *calls; /* list of registered calls */
@@ -101,23 +102,15 @@ struct ctdb_context {
   operation IDs
 */
 enum ctdb_operation {
-       CTDB_OP_CALL = 0
+       CTDB_REQ_CALL   = 0,
+       CTDB_REPLY_CALL = 1
 };
 
 /*
   packet structures
 */
 struct ctdb_req_header {
-       uint32_t _length; /* ignored by datagram transports */
-       uint32_t operation;
-       uint32_t destnode;
-       uint32_t srcnode;
-       uint32_t reqid;
-       uint32_t reqtimeout;
-};
-
-struct ctdb_reply_header {
-       uint32_t _length; /* ignored by datagram transports */
+       uint32_t length;
        uint32_t operation;
        uint32_t destnode;
        uint32_t srcnode;
@@ -133,7 +126,7 @@ struct ctdb_req_call {
 };
 
 struct ctdb_reply_call {
-       struct ctdb_reply_header hdr;
+       struct ctdb_req_header hdr;
        uint32_t datalen;
        uint8_t  data[0];
 };
@@ -145,4 +138,6 @@ int ctdb_parse_address(struct ctdb_context *ctdb,
                       TALLOC_CTX *mem_ctx, const char *str,
                       struct ctdb_address *address);
 uint32_t ctdb_hash(TDB_DATA *key);
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 
index c7e361f9ca9d95a96d03dce50e03659a9f104388..9fc322fde5bf7a91f8d2caf1cad2d5724da6e320 100644 (file)
@@ -44,7 +44,7 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
                                                      struct ctdb_tcp_node);
        struct ctdb_context *ctdb = node->ctdb;
        int error = 0;
-       socklen_t len;
+       socklen_t len = sizeof(error);
 
        if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
            error != 0) {
index 39ecec4dbdd3b0efe376bde03ab3c3a9ae184d05..b3378677035fd4e10b9ccf6433aeeb6cd3869ea4 100644 (file)
@@ -39,7 +39,8 @@ int ctdb_tcp_start(struct ctdb_context *ctdb)
           next event loop */
        for (i=0;i<ctdb->num_nodes;i++) {
                struct ctdb_node *node = *(ctdb->nodes + i);
-               if (ctdb_same_address(&ctdb->address, &node->address)) continue;
+               if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
+                   ctdb_same_address(&ctdb->address, &node->address)) continue;
                event_add_timed(ctdb->ev, node, timeval_zero(), 
                                ctdb_tcp_node_connect, node);
        }
index d6bc2db83e44aeb038c8ecc8df1f13c6eefe0668..67a6bc4b3b456b4b518f6b01800530c40d11f867 100644 (file)
@@ -140,8 +140,6 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
 
        /* tell the ctdb layer above that we have a packet */
        in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
-
-       talloc_free(data);
 }
 
 /*