ibw: modified tridge's code - in my point of view
authorPeter Somogyi <psomogyi@gamax.hu>
Fri, 5 Jan 2007 17:13:35 +0000 (18:13 +0100)
committerPeter Somogyi <psomogyi@gamax.hu>
Fri, 5 Jan 2007 17:13:35 +0000 (18:13 +0100)
ibw_alloc_send and node-centric params are the basics of these important changes.
Also tried to avoid memcpy/memdup where it was possible.

(This used to be ctdb commit 9e8cb9b96c685288c04ee8b69a972f582cd3c904)

ctdb/common/ctdb.c
ctdb/common/ctdb_call.c
ctdb/ib/ibw_ctdb_init.c
ctdb/include/ctdb_private.h
ctdb/tcp/tcp_init.c

index bd1e150d439e2d6427001fcf6c4fd2a6ff7eb6da..1f53074399fd350581eb5223a48283585b52512e 100644 (file)
 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
 {
        int ctdb_tcp_init(struct ctdb_context *ctdb);
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
        int ctdb_ibw_init(struct ctdb_context *ctdb);
 #endif /*HAVE_INFINIBAND*/
 
        if (strcmp(transport, "tcp") == 0) {
                return ctdb_tcp_init(ctdb);
        }
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
        if (strcmp(transport, "ib") == 0) {
                return ctdb_ibw_init(ctdb);
        }
@@ -256,10 +256,15 @@ void ctdb_wait_loop(struct ctdb_context *ctdb)
        }
 }
 
+void ctdb_stopped(struct ctdb_context *ctdb)
+{
+}
+
 static const struct ctdb_upcalls ctdb_upcalls = {
        .recv_pkt       = ctdb_recv_pkt,
        .node_dead      = ctdb_node_dead,
-       .node_connected = ctdb_node_connected
+       .node_connected = ctdb_node_connected,
+       .stopped        = ctdb_stopped
 };
 
 /*
index 81f3cbdea10212c27bb6d24f71e87925a8be6567..a4e5b35c85fa0efe21cddf5d6ae463a14c45481f 100644 (file)
 /*
   queue a packet or die
 */
-static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr)
 {
-       struct ctdb_node *node;
-       node = ctdb->nodes[hdr->destnode];
-       if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
-               ctdb_fatal(ctdb, "Unable to queue packet\n");
+       if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+               ctdb_fatal(node->ctdb, "Unable to queue packet\n");
        }
 }
 
@@ -121,6 +119,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        struct ctdb_reply_error *r;
        char *msg;
        int len;
+       struct ctdb_node *node;
+
+       node = ctdb->nodes[hdr->srcnode];
 
        va_start(ap, fmt);
        msg = talloc_vasprintf(ctdb, fmt, ap);
@@ -130,7 +131,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        va_end(ap);
 
        len = strlen(msg)+1;
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + len;
        r->hdr.operation = CTDB_REPLY_ERROR;
@@ -143,9 +144,8 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
 
        talloc_free(msg);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -157,8 +157,11 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
                                    struct ctdb_ltdb_header *header)
 {
        struct ctdb_reply_redirect *r;
+       struct ctdb_node *node;
 
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+       node = ctdb->nodes[c->hdr.srcnode];
+
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r));
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r);
        r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -167,9 +170,8 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
        r->hdr.reqid     = c->hdr.reqid;
        r->dmaster       = header->dmaster;
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 /*
@@ -186,13 +188,18 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
 {
        struct ctdb_req_dmaster *r;
        int len;
+       struct ctdb_node *node;
+       uint32_t destnode;
+
+       destnode = ctdb_lmaster(ctdb, key);
+       node = ctdb->nodes[destnode];
        
        len = sizeof(*r) + key->dsize + data->dsize;
-       r = ctdb->methods->allocate_pkt(ctdb, len);
+       r = ctdb->methods->allocate_pkt(node, len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length    = len;
        r->hdr.operation = CTDB_REQ_DMASTER;
-       r->hdr.destnode  = ctdb_lmaster(ctdb, key);
+       r->hdr.destnode  = destnode;
        r->hdr.srcnode   = ctdb->vnn;
        r->hdr.reqid     = c->hdr.reqid;
        r->dmaster       = header->laccessor;
@@ -205,14 +212,14 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
                /* we are the lmaster - don't send to ourselves */
                ctdb_request_dmaster(ctdb, &r->hdr);
        } else {
-               ctdb_queue_packet(ctdb, &r->hdr);
+               ctdb_queue_packet(node, &r->hdr);
 
                /* update the ltdb to record the new dmaster */
                header->dmaster = r->hdr.destnode;
                ctdb_ltdb_store(ctdb, *key, header, *data);
        }
 
-       talloc_free(r);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -229,7 +236,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        TDB_DATA key, data;
        struct ctdb_ltdb_header header;
        int ret;
+       struct ctdb_node *node;
 
+       node = ctdb->nodes[c->dmaster];
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
@@ -255,7 +264,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 
        /* send the CTDB_REPLY_DMASTER */
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + data.dsize;
        r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -265,9 +274,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        r->datalen       = data.dsize;
        memcpy(&r->data[0], data.dptr, data.dsize);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
-
-       talloc_free(r);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -281,7 +289,9 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_call *r;
        int ret;
        struct ctdb_ltdb_header header;
+       struct ctdb_node *node;
 
+       node = ctdb->nodes[hdr->srcnode];
        key.dptr = c->data;
        key.dsize = c->keylen;
        call_data.dptr = c->data + c->keylen;
@@ -317,7 +327,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        call_data.dsize?&call_data:NULL,
                        &reply_data, c->hdr.srcnode);
 
-       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+       r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + reply_data.dsize;
        r->hdr.operation = CTDB_REPLY_CALL;
@@ -327,10 +337,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        r->datalen       = reply_data.dsize;
        memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
 
-       ctdb_queue_packet(ctdb, &r->hdr);
+       ctdb_queue_packet(node, &r->hdr);
+       ctdb->methods->dealloc_pkt(node, r);
 
        talloc_free(reply_data.dptr);
-       talloc_free(r);
 }
 
 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
@@ -440,7 +450,10 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
        struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
        struct ctdb_call_state *state;
-
+       struct ctdb_node *node;
+#ifdef USE_INFINIBAND
+       uint8_t *r;
+#endif /* USE_INFINIBAND */
        state = idr_find(ctdb->idr, hdr->reqid);
 
        talloc_steal(state, c);
@@ -453,7 +466,18 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        /* send it off again */
        state->node = ctdb->nodes[c->dmaster];
 
-       ctdb_queue_packet(ctdb, &state->c->hdr);
+       node = ctdb->nodes[state->c->hdr.destnode];
+
+#ifdef USE_INFINIBAND
+       r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
+       memcpy(r, &state->c->hdr, state->c->hdr.length);
+#endif /* USE_INFINIBAND */
+
+       ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+       ctdb->methods->dealloc_pkt(node, r);
+#endif /* USE_INFINIBAND */
 }
 
 /*
@@ -520,6 +544,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
        int ret;
        struct ctdb_ltdb_header header;
        TDB_DATA data;
+       struct ctdb_node *node;
 
        /*
          if we are the dmaster for this key then we don't need to
@@ -538,8 +563,9 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
        state = talloc_zero(ctdb, struct ctdb_call_state);
        CTDB_NO_MEMORY_NULL(ctdb, state);
 
+       node = ctdb->nodes[header.dmaster];
        len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
-       state->c = ctdb->methods->allocate_pkt(ctdb, len);
+       state->c = ctdb->methods->allocate_pkt(node, len);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
 
        state->c->hdr.length    = len;
@@ -566,7 +592,12 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
 
        talloc_set_destructor(state, ctdb_call_destructor);
 
-       ctdb_queue_packet(ctdb, &state->c->hdr);
+       ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+       ctdb->methods->dealloc_pkt(node, state->c);
+       state->c = NULL;
+#endif /* USE_INFINIBAND */
 
        event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
                        ctdb_call_timeout, state);
index 1e8b62878d2fb6e7b6110990b557abeb0331d4f5..672dec6d329c77d3a3694ff1aa621416f5d8d8a1 100644 (file)
@@ -29,6 +29,9 @@
 #include "ibwrapper.h"
 #include "ibw_ctdb.h"
 
+/* not nice; temporary workaround for the current implementation... */
+static void *last_key = NULL;
+
 static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
 {
        struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
@@ -108,14 +111,12 @@ static int ctdb_ibw_add_node(struct ctdb_node *node)
 /*
  * transport packet allocator - allows transport to control memory for packets
  */
-static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
 {
        struct ibw_conn *conn = NULL;
        void *buf = NULL;
-       void *key; /* TODO: expand the param list with this */
 
-       /* TODO2: !!! I need "node" or ibw_conn here */
-       if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
+       if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
                return NULL;
 
        return buf;
@@ -124,20 +125,40 @@ static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
 static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
 {
        struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
-       void *key = NULL; /* TODO: expand the param list with this */
+       int     rc;
+
+       rc = ibw_send(conn, data, last_key, length);
+       last_key = NULL;
+
+       return rc;
+}
 
-       assert(conn!=NULL);
-       return ibw_send(conn, data, key, length);
+static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
+{
+       if (last_key) {
+               struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+       
+               assert(conn!=NULL);
+               ibw_cancel_send_buf(conn, data, last_key);
+       } /* else ibw_send is already using it and will free it after completion */
+}
+
+static int ctdb_ibw_stop(struct ctdb_context *cctx)
+{
+       struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
+
+       assert(ictx!=NULL);
+       return ibw_stop(ictx);
 }
 
 static const struct ctdb_methods ctdb_ibw_methods = {
        .start     = ctdb_ibw_start,
        .add_node  = ctdb_ibw_add_node,
        .queue_pkt = ctdb_ibw_queue_pkt,
-       .allocate_pkt = ctdb_ibw_allocate_pkt
-       
-//     .dealloc_pkt = ctdb_ibw_dealloc_pkt
-//     .stop = ctdb_ibw_stop
+       .allocate_pkt = ctdb_ibw_allocate_pkt,
+
+       .dealloc_pkt = ctdb_ibw_dealloc_pkt,
+       .stop = ctdb_ibw_stop
 };
 
 /*
@@ -146,7 +167,7 @@ static const struct ctdb_methods ctdb_ibw_methods = {
 int ctdb_ibw_init(struct ctdb_context *ctdb)
 {
        struct ibw_ctx *ictx;
-       
+
        ictx = ibw_init(
                NULL, //struct ibw_initattr *attr, /* TODO */
                0, //int nattr, /* TODO */
index a024147c63e39002d044b06a0dff0977e6b3760d..c99bb1a55422c428d870ad96cb15d43141e3c80d 100644 (file)
@@ -55,7 +55,9 @@ struct ctdb_methods {
        int (*start)(struct ctdb_context *); /* start protocol processing */    
        int (*add_node)(struct ctdb_node *); /* setup a new node */     
        int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
-       void *(*allocate_pkt)(struct ctdb_context *, size_t );
+       void *(*allocate_pkt)(struct ctdb_node *, size_t);
+       void (*dealloc_pkt)(struct ctdb_node *, void *data);
+       int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */
 };
 
 /*
@@ -70,6 +72,9 @@ struct ctdb_upcalls {
 
        /* node_connected is called when a connection to a node is established */
        void (*node_connected)(struct ctdb_node *);
+
+       /* protocol has been stopped */
+       void (*stopped)(struct ctdb_context *);
 };
 
 /* main state of the ctdb daemon */
index f261d0c7dac763ede92687770f60a1f02a827fad..be0499e079ced30ec6ec24c4ebcd2302b464c8b3 100644 (file)
@@ -67,21 +67,31 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
 /*
   transport packet allocator - allows transport to control memory for packets
 */
-void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size)
 {
        /* tcp transport needs to round to 8 byte alignment to ensure
           that we can use a length header and 64 bit elements in
           structures */
        size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-       return talloc_size(ctdb, size);
+       return talloc_size(node, size);
 }
 
+void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf)
+{
+       talloc_free(buf);
+}
+
+int ctdb_tcp_stop(struct ctdb_context *ctdb)
+{
+       return 0;
+}
 
 static const struct ctdb_methods ctdb_tcp_methods = {
        .start     = ctdb_tcp_start,
        .add_node  = ctdb_tcp_add_node,
        .queue_pkt = ctdb_tcp_queue_pkt,
-       .allocate_pkt = ctdb_tcp_allocate_pkt
+       .allocate_pkt = ctdb_tcp_allocate_pkt,
+       .dealloc_pkt = ctdb_tcp_dealloc_pkt
 };
 
 /*