ib: added external send queue to workaround downtime
authorPeter Somogyi <psomogyi@gamax.hu>
Mon, 26 Feb 2007 10:59:20 +0000 (11:59 +0100)
committerPeter Somogyi <psomogyi@gamax.hu>
Mon, 26 Feb 2007 10:59:20 +0000 (11:59 +0100)
Workaround is because I couldn't find a correct way in ib to reconnect cleanly (with queue kept) when destination is unreachable.
When connection is broken, all internal queue contents are being destroyed and reconnects automatically.
An "external" send queue is kept until the connection is up again for a dest node.

common/ctdb_message.c
ib/ibw_ctdb.c
ib/ibw_ctdb.h
ib/ibw_ctdb_init.c
ib/ibwrapper.c
ib/ibwrapper.h
ib/ibwrapper_test.c

index c8830b2cac5f8febdd1e273220655b5aee713f99..6e79042b85559cb7aee85f2367071e7b039b751e 100644 (file)
@@ -60,8 +60,6 @@ int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
        struct ctdb_req_message *r;
        int len;
 
-       ctdb_connect_wait(ctdb); /* recursion */
-
        len = offsetof(struct ctdb_req_message, data) + data.dsize;
        r = ctdb->methods->allocate_pkt(ctdb, len);
        CTDB_NO_MEMORY(ctdb, r);
index 53293810ca8311c77329416a4e98674eb1f1d1d9..50dd8d68d4310348c98509a73624914438cb9f2c 100644 (file)
 #include "ibwrapper.h"
 #include "ibw_ctdb.h"
 
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node)
+int ctdb_ibw_node_connect(struct ctdb_node *node)
 {
+       struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+       int     rc;
+
+       assert(cn!=NULL);
+       assert(cn->conn!=NULL);
        struct sockaddr_in sock_out;
 
        memset(&sock_out, 0, sizeof(struct sockaddr_in));
@@ -38,12 +43,12 @@ int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node)
        sock_out.sin_port = htons(node->address.port);
        sock_out.sin_family = PF_INET;
 
-       if (ibw_connect(ictx, &sock_out, node)) {
-               DEBUG(0, ("ctdb_ibw_node_connect: ibw_connect failed - retrying in 1 sec...\n"));
+       rc = ibw_connect(cn->conn, &sock_out, node);
+       if (rc) {
+               DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n"));
                /* try again once a second */
                event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), 
                        ctdb_ibw_node_connect_event, node);
-               return -1;
        }
 
        /* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */
@@ -54,9 +59,8 @@ void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *t
        struct timeval t, void *private)
 {
        struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
-       struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
 
-       ctdb_ibw_node_connect(ictx, node);
+       ctdb_ibw_node_connect(node);
 }
 
 int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
@@ -94,14 +98,15 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
                case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */
                        struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
                        if (node!=NULL) { /* after ibw_connect */
-                               node->private = (void *)conn;
+                               struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+
                                node->ctdb->upcalls->node_connected(node);
+                               ctdb_flush_cn_queue(cn);
                        } else { /* after ibw_accept */
                                /* NOP in CTDB case */
                        }
                } break;
                case IBWC_DISCONNECTED: { /* after ibw_disconnect */
-                       /* TODO: have a CTDB upcall */
                        struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
                        if (node!=NULL)
                                node->ctdb->upcalls->node_dead(node);
@@ -110,13 +115,16 @@ int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
                } break;
                case IBWC_ERROR: {
                        struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
-                       if (node!=NULL)
-                               node->private = NULL; /* not to use again */
-
-                       DEBUG(10, ("IBWC_ERROR, reconnecting immediately...\n"));
-                       talloc_free(conn);
-                       event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
-                               ctdb_ibw_node_connect_event, node);
+                       if (node!=NULL) {
+                               struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+                               struct ibw_ctx *ictx = cn->conn->ctx;
+
+                               DEBUG(10, ("IBWC_ERROR, reconnecting...\n"));
+                               talloc_free(cn->conn); /* internal queue content is destroyed */
+                               cn->conn = (void *)ibw_conn_new(ictx, node);
+                               event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
+                                       ctdb_ibw_node_connect_event, node);
+                       }
                } break;
                default:
                        assert(0);
index 14308682b21e5ebacb0fcb38c7e6b2e928d4f87a..24adeb7233e96e2b065b18a2be578bdd6045a2b6 100644 (file)
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
+struct ctdb_ibw_msg {
+       uint8_t *data;
+       uint32_t length;
+       struct ctdb_ibw_msg *prev;
+       struct ctdb_ibw_msg *next;
+};
+
+struct ctdb_ibw_node {
+       struct ibw_conn *conn;
+
+       struct ctdb_ibw_msg *queue;
+       struct ctdb_ibw_msg *queue_last;
+       int     qcnt;
+};
+
 int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn);
 int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n);
 
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node);
+int ctdb_ibw_node_connect(struct ctdb_node *node);
 void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, 
        struct timeval t, void *private);
 
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn);
index 7892463229223b1363ad5a91b791d56582b2076a..9c0300c4f4c4a556071a32cfa70b8614244bd1d3 100644 (file)
@@ -58,7 +58,6 @@ static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
  */
 static int ctdb_ibw_start(struct ctdb_context *ctdb)
 {
-       struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
        int i;
 
        /* listen on our own address */
@@ -71,43 +70,87 @@ static int ctdb_ibw_start(struct ctdb_context *ctdb)
                if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
                        ctdb_same_address(&ctdb->address, &node->address))
                        continue;
-               ctdb_ibw_node_connect(ictx, node);
+               ctdb_ibw_node_connect(node);
        }
 
        return 0;
 }
 
-
 /*
  * initialise ibw portion of a ctdb node 
  */
 static int ctdb_ibw_add_node(struct ctdb_node *node)
 {
-       /* TODO: clarify whether is this necessary for us ?
-          - why not enough doing such thing internally at connect time ? */
-       return 0;
+       struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
+       struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node);
+
+       assert(cn!=NULL);
+       cn->conn = ibw_conn_new(ictx, node);
+       node->private = (void *)cn;
+
+       return (cn->conn!=NULL ? 0 : -1);
+}
+
+static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length)
+{
+       void    *buf, *key;
+
+       if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
+               DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
+               return -1;
+       }
+
+       memcpy(buf, data, length);
+       return ibw_send(conn, buf, key, length);
+}
+
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn)
+{
+       struct ctdb_ibw_msg *p;
+       int     rc = 0;
+
+       while(cn->queue) {
+               p = cn->queue;
+               rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length);
+               if (rc)
+                       return -1; /* will be retried later when conn is up */
+
+               DLIST_REMOVE(cn->queue, p);
+               cn->qcnt--;
+               talloc_free(p); /* it will talloc_free p->data as well */
+       }
+       assert(cn->qcnt==0);
+       /* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */
+
+       return rc;
 }
 
 static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
 {
-       struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+       struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
        int     rc;
-       void    *buf, *key;
 
        assert(length>=sizeof(uint32_t));
+       assert(cn!=NULL);
 
-       if (conn==NULL) {
+       if (cn->conn==NULL) {
                DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n"));
                return -1;
        }
 
-       if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
-               DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
-               return -1;
-       }
+       if (cn->conn->state==IBWC_CONNECTED) {
+               rc = ctdb_ibw_send_pkt(cn->conn, data, length);
+       } else {
+               struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg);
+               p->data = talloc_memdup(p, data, length);
+               p->length = length;
 
-       memcpy(buf, data, length);
-       rc = ibw_send(conn, buf, key, length);
+               DLIST_ADD_AFTER(cn->queue, p, cn->queue_last);
+               cn->queue_last = p;
+               cn->qcnt++;
+
+               rc = 0;
+       }
 
        return rc;
 }
index f3ef0c4c5cd750f5b0286fbc7c7e9f1568bc7514..f7b233954d229ce6faf73747e6b711e3ddf28cdb 100644 (file)
@@ -161,42 +161,57 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx)
 
 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
 {
-       DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n",
-               (uint32_t)pconn, pconn->cm_id));
-
-       /* free memory regions */
-       ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
-       ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+       DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n",
+               pconn, pconn->cm_id));
 
        /* pconn->wr_index is freed by talloc */
        /* pconn->wr_index[i] are freed by talloc */
 
        /* destroy verbs */
-       if (pconn->cm_id->qp) {
-               ibv_destroy_qp(pconn->cm_id->qp);
+       if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) {
+               rdma_destroy_qp(pconn->cm_id);
                pconn->cm_id->qp = NULL;
        }
-       if (pconn->cq) {
+
+       if (pconn->cq!=NULL) {
                ibv_destroy_cq(pconn->cq);
                pconn->cq = NULL;
        }
-       if (pconn->verbs_channel) {
+
+       if (pconn->verbs_channel!=NULL) {
                ibv_destroy_comp_channel(pconn->verbs_channel);
                pconn->verbs_channel = NULL;
        }
+
+       /* must be freed here because its order is important */
        if (pconn->verbs_channel_event) {
-               /* TODO: do we have to do this here? */
                talloc_free(pconn->verbs_channel_event);
                pconn->verbs_channel_event = NULL;
        }
+
+       /* free memory regions */
+       ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+       ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+
        if (pconn->pd) {
                ibv_dealloc_pd(pconn->pd);
                pconn->pd = NULL;
+               DEBUG(10, ("pconn=%p pd deallocated\n", pconn));
        }
+
        if (pconn->cm_id) {
                rdma_destroy_id(pconn->cm_id);
                pconn->cm_id = NULL;
+               DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn));
        }
+
+       return 0;
+}
+
+static int ibw_wr_destruct(struct ibw_wr *wr)
+{
+       if (wr->buf_large!=NULL)
+               ibw_free_mr(&wr->buf_large, &wr->mr_large);
        return 0;
 }
 
@@ -209,16 +224,18 @@ static int ibw_conn_destruct(struct ibw_conn *conn)
        return 0;
 }
 
-static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx)
 {
        struct ibw_conn *conn;
        struct ibw_conn_priv *pconn;
 
-       conn = talloc_zero(ctx, struct ibw_conn);
+       assert(ctx!=NULL);
+
+       conn = talloc_zero(mem_ctx, struct ibw_conn);
        assert(conn!=NULL);
        talloc_set_destructor(conn, ibw_conn_destruct);
 
-       pconn = talloc_zero(ctx, struct ibw_conn_priv);
+       pconn = talloc_zero(conn, struct ibw_conn_priv);
        assert(pconn!=NULL);
        talloc_set_destructor(pconn, ibw_conn_priv_destruct);
 
@@ -248,7 +265,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
        }
        DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
 
-       pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
+       pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */
                pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
 
        pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs);
@@ -371,14 +388,15 @@ static int ibw_fill_cq(struct ibw_conn *conn)
        return 0;
 }
 
-static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
+static int ibw_manage_connect(struct ibw_conn *conn)
 {
        struct rdma_conn_param conn_param;
+       struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
        int     rc;
 
-       DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id));
-       rc = ibw_setup_cq_qp(conn);
-       if (rc)
+       DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id));
+
+       if (ibw_setup_cq_qp(conn))
                return -1;
 
        /* cm connect */
@@ -387,7 +405,7 @@ static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
        conn_param.initiator_depth = 1;
        conn_param.retry_count = 10;
 
-       rc = rdma_connect(cma_id, &conn_param);
+       rc = rdma_connect(pconn->cm_id, &conn_param);
        if (rc)
                sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
 
@@ -436,7 +454,7 @@ static void ibw_event_handler_cm(struct event_context *ev,
                assert(cma_id->context!=NULL);
                conn = talloc_get_type(cma_id->context, struct ibw_conn);
 
-               rc = ibw_manage_connect(conn, cma_id);
+               rc = ibw_manage_connect(conn);
                if (rc)
                        goto error;
 
@@ -445,7 +463,7 @@ static void ibw_event_handler_cm(struct event_context *ev,
        case RDMA_CM_EVENT_CONNECT_REQUEST:
                DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n"));
                ctx->state = IBWS_CONNECT_REQUEST;
-               conn = ibw_conn_new(ctx);
+               conn = ibw_conn_new(ctx, ctx);
                pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
                pconn->cm_id = cma_id; /* !!! event will be freed but id not */
                cma_id->context = (void *)conn;
@@ -459,6 +477,9 @@ static void ibw_event_handler_cm(struct event_context *ev,
 
                /* continued at ibw_accept when invoked by the func above */
                if (!pconn->is_accepted) {
+                       rc = rdma_reject(cma_id, NULL, 0);
+                       if (rc)
+                               DEBUG(0, ("rdma_reject failed with rc=%d\n", rc));
                        talloc_free(conn);
                        DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
                }
@@ -476,6 +497,8 @@ static void ibw_event_handler_cm(struct event_context *ev,
                conn = talloc_get_type(cma_id->context, struct ibw_conn);
                assert(conn!=NULL); /* important assumption */
 
+               DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id));
+
                /* client conn is up */
                conn->state = IBWC_CONNECTED;
 
@@ -485,22 +508,30 @@ static void ibw_event_handler_cm(struct event_context *ev,
 
        case RDMA_CM_EVENT_ADDR_ERROR:
                sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status);
-               goto error;
        case RDMA_CM_EVENT_ROUTE_ERROR:
                sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status);
-               goto error;
        case RDMA_CM_EVENT_CONNECT_ERROR:
                sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status);
-               goto error;
        case RDMA_CM_EVENT_UNREACHABLE:
                sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status);
-               goto error;
        case RDMA_CM_EVENT_REJECTED:
                sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status);
+               conn = talloc_get_type(cma_id->context, struct ibw_conn);
+               if (conn) {
+                       if ((rc=rdma_ack_cm_event(event)))
+                               DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc));
+                       event = NULL;
+                       pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+                       ibw_conn_priv_destruct(pconn);
+               }
                goto error;
 
        case RDMA_CM_EVENT_DISCONNECTED:
                DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n"));
+               if ((rc=rdma_ack_cm_event(event)))
+                       DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc));
+               event = NULL; /* don't ack more */
+
                if (cma_id!=pctx->cm_id) {
                        DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id));
                        conn = talloc_get_type(cma_id->context, struct ibw_conn);
@@ -518,14 +549,20 @@ static void ibw_event_handler_cm(struct event_context *ev,
                goto error;
        }
 
-       if ((rc=rdma_ack_cm_event(event))) {
+       if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
                sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
                goto error;
        }
 
        return;
 error:
+       if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
+               sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
+               goto error;
+       }
+
        DEBUG(0, ("cm event handler: %s", ibw_lasterr));
+
        if (cma_id!=pctx->cm_id) {
                conn = talloc_get_type(cma_id->context, struct ibw_conn);
                if (conn)
@@ -569,8 +606,8 @@ static void ibw_event_handler_verbs(struct event_context *ev,
 
        while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
                if (wc.status) {
-                       sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n",
-                               wc.status, rc);
+                       sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n",
+                               wc.status, wc.opcode, rc);
                        goto error;
                }
 
@@ -605,11 +642,57 @@ static void ibw_event_handler_verbs(struct event_context *ev,
                goto error;
        }
 
+       ibv_ack_cq_events(pconn->cq, 1);
+
        return;
 error:
+       ibv_ack_cq_events(pconn->cq, 1);
+
        DEBUG(0, (ibw_lasterr));
-       conn->state = IBWC_ERROR;
-       pctx->connstate_func(NULL, conn);
+       
+       if (conn->state!=IBWC_ERROR) {
+               conn->state = IBWC_ERROR;
+               pctx->connstate_func(NULL, conn);
+       }
+}
+
+static int ibw_process_queue(struct ibw_conn *conn)
+{
+       struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+       struct ibw_ctx_priv *pctx;
+       struct ibw_wr   *p;
+       int     rc;
+       uint32_t        msg_size;
+
+       if (pconn->queue==NULL)
+               return 0; /* NOP */
+
+       p = pconn->queue;
+
+       /* we must have at least 1 fragment to send */
+       assert(p->queued_ref_cnt>0);
+       p->queued_ref_cnt--;
+
+       pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+       msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
+
+       assert(p->queued_msg!=NULL);
+       assert(msg_size!=0);
+
+       DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n",
+               p->queued_ref_cnt, msg_size));
+
+       rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
+
+       /* was this the last fragment? */
+       if (p->queued_ref_cnt) {
+               p->queued_msg += pctx->opts.recv_bufsize;
+       } else {
+               DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
+               p->queued_msg = NULL;
+       }
+
+       return rc;
 }
 
 static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
@@ -618,7 +701,6 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
        struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
        struct ibw_wr   *p;
        int     send_index;
-       int     rc = 0;
 
        DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
                pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
@@ -662,30 +744,7 @@ static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
                }
        }
 
-       if (pconn->queue) {
-               uint32_t        msg_size;
-               
-               DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id));
-               
-               p = pconn->queue;
-
-               assert(p->queued_ref_cnt>0);
-               p->queued_ref_cnt--;
-
-               msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
-               assert(p->queued_msg!=NULL);
-               assert(msg_size!=0);
-               rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
-               if (p->queued_ref_cnt) {
-                       p->queued_msg += pctx->opts.recv_bufsize;
-               } else {
-                       DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
-                       p->queued_msg = NULL;
-               }
-       }
-
-       return rc;
+       return ibw_process_queue(conn);
 }
 
 static int ibw_append_to_part(struct ibw_conn_priv *pconn,
@@ -874,8 +933,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
        struct ibw_ctx_priv *pctx;
        int     rc;
 
-       DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n",
-               (uint32_t)ctx_userdata, (uint32_t)ectx));
+       DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx));
 
        /* initialize basic data structures */
        memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
@@ -1010,19 +1068,25 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
        return 0;
 }
 
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata)
 {
-       struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
-       struct ibw_conn *conn = NULL;
+       struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
        struct ibw_conn_priv *pconn = NULL;
        int     rc;
 
-       conn = ibw_conn_new(ctx);
+       assert(conn!=NULL);
+
        conn->conn_userdata = conn_userdata;
        pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
        DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr),
                ntohs(serv_addr->sin_port)));
 
+       /* clean previous - probably half - initialization */
+       if (ibw_conn_priv_destruct(pconn)) {
+               DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id));
+               return -1;
+       }
+
        /* init cm */
        rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
        if (rc) {
@@ -1053,11 +1117,23 @@ int ibw_disconnect(struct ibw_conn *conn)
 
        DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));
 
-       rc = rdma_disconnect(pconn->cm_id);
-       if (rc) {
-               sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
-               DEBUG(0, (ibw_lasterr));
-               return rc;
+       assert(pconn!=NULL);
+
+       switch(conn->state) {
+       case IBWC_ERROR:
+               ibw_conn_priv_destruct(pconn); /* do this here right now */
+               break;
+       case IBWC_CONNECTED:
+               rc = rdma_disconnect(pconn->cm_id);
+               if (rc) {
+                       sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
+                       DEBUG(0, (ibw_lasterr));
+                       return rc;
+               }
+               break;
+       default:
+               DEBUG(9, ("invalid state for disconnect: %d\n", conn->state));
+               break;
        }
 
        return 0;
@@ -1092,6 +1168,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
                p = pconn->extra_avail;
                if (!p) {
                        p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
+                       talloc_set_destructor(p, ibw_wr_destruct);
                        if (p==NULL) {
                                sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);
                                goto error;
@@ -1174,6 +1251,8 @@ static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, u
 
        DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));
 
+       /* TODO: clarify how to continue when state==IBWC_STOPPED */
+
        /* to be sent by ibw_wc_send */
        /* regardless "normal" or [a part of] "large" packet */
        if (!p->queued_ref_cnt) {
index 52018cb7cb561ea74babf4301e84c4d335d333ec..36385d6f46b5df8e3d40422d8a7e204877c05f8d 100644 (file)
@@ -153,6 +153,14 @@ int ibw_listen(struct ibw_ctx *ctx, int backlog);
  */
 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata);
 
+/*
+ * Create a new connection structure
+ * available for queueing ibw_send
+ *
+ * <parent> is needed to be notified by talloc destruct action.
+ */
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx);
+
 /*
  * Needs a normal internet address here
  * can be called within IBWS_READY|IBWS_CONNECT_REQUEST
@@ -162,7 +170,7 @@ int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata);
  * You have +1 waiting here: you will get ibw_conn (having the
  * same <conn_userdata> member) structure in ibw_connstate_fn_t.
  */
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata);
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata);
 
 /*
  * Sends out a disconnect request.
index 2fa590588cedafc39d1c1bbbe802f2f5aa0fde11..2ab4c97158ba6d553be39b4027c9e38ca995e8e4 100644 (file)
@@ -81,11 +81,13 @@ enum testopcode {
 
 int ibwtest_connect_everybody(struct ibwtest_ctx *tcx)
 {
-       struct ibwtest_conn     *pconn = talloc_zero(tcx, struct ibwtest_conn);
+       struct ibw_conn         *conn;
+       struct ibwtest_conn     *tconn = talloc_zero(tcx, struct ibwtest_conn);
        int     i;
 
        for(i=0; i<tcx->naddrs; i++) {
-               if (ibw_connect(tcx->ibwctx, &tcx->addrs[i], pconn)) {
+               conn = ibw_conn_new(tcx->ibwctx, tconn);
+               if (ibw_connect(conn, &tcx->addrs[i], tconn)) {
                        fprintf(stderr, "ibw_connect error at %d\n", i);
                        return -1;
                }
@@ -237,7 +239,7 @@ int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *c
 int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
 {
        struct ibwtest_ctx      *tcx = NULL; /* userdata */
-       struct ibwtest_conn     *pconn = NULL; /* userdata */
+       struct ibwtest_conn     *tconn = NULL; /* userdata */
 
        if (ctx) {
                tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx);
@@ -251,8 +253,8 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
                        break;
                case IBWS_CONNECT_REQUEST:
                        DEBUG(10, ("test IBWS_CONNECT_REQUEST\n"));
-                       pconn = talloc_zero(conn, struct ibwtest_conn);
-                       if (ibw_accept(ctx, conn, pconn)) {
+                       tconn = talloc_zero(conn, struct ibwtest_conn);
+                       if (ibw_accept(ctx, conn, tconn)) {
                                DEBUG(0, ("error accepting the connect request\n"));
                        }
                        break;
@@ -271,7 +273,7 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
        }
 
        if (conn) {
-               pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+               tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
                switch(conn->state) {
                case IBWC_INIT:
                        DEBUG(10, ("test IBWC_INIT\n"));
@@ -300,22 +302,22 @@ int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
 
 int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
 {
-       struct ibwtest_conn *pconn;
+       struct ibwtest_conn *tconn;
        enum testopcode op;
        struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
        int     rc = 0;
 
        assert(conn!=NULL);
        assert(n>=sizeof(uint32_t)+1);
-       pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+       tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
 
        op = (enum testopcode)((char *)buf)[sizeof(uint32_t)];
        if (op==TESTOP_SEND_ID) {
-               pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1);
+               tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1);
        }
        if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) {
                DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op,
-                       pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
+                       tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
        }
 
        if (tcx->is_server) {
@@ -327,7 +329,7 @@ int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
                                op,
                                n - sizeof(uint32_t) - 2,
                                (uint32_t)sum,
-                               pconn->id ? pconn->id : "NULL"));
+                               tconn->id ? tconn->id : "NULL"));
                        if (sum!=((unsigned char *)buf)[n-1]) {
                                DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n",
                                        (uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1]));