From 131c41f6f3e08097e7e0fab852b2a64183c695ec Mon Sep 17 00:00:00 2001 From: Peter Somogyi Date: Wed, 20 Dec 2006 19:16:30 +0100 Subject: [PATCH] Added send queue. TODO: check again & reduce. --- ib/ibwrapper.c | 197 +++++++++++++++++++++++++++------------- ib/ibwrapper_internal.h | 5 + 2 files changed, 140 insertions(+), 62 deletions(-) diff --git a/ib/ibwrapper.c b/ib/ibwrapper.c index db6e3036..dec183f9 100644 --- a/ib/ibwrapper.c +++ b/ib/ibwrapper.c @@ -88,31 +88,31 @@ static int ibw_init_memory(struct ibw_conn *conn) { struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - + struct ibw_opts *opts = &pctx->opts; int i; struct ibw_wr *p; pconn->buf_send = ibw_alloc_mr(pctx, pconn, - pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send); + opts->max_send_wr * opts->avg_send_size, &pconn->mr_send); if (!pconn->buf_send) { sprintf(ibw_lasterr, "couldn't allocate work send buf\n"); return -1; } pconn->buf_recv = ibw_alloc_mr(pctx, pconn, - pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv); + opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv); if (!pconn->buf_recv) { sprintf(ibw_lasterr, "couldn't allocate work recv buf\n"); return -1; } - pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *)); + pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *)); assert(pconn->wr_index!=NULL); - for(i=0; iopts.max_send_wr; i++) { + for(i=0; imax_send_wr; i++) { p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr); - p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size); - p->wr_id = i; + p->msg = pconn->buf_send + (i * opts->avg_send_size); + p->wr_id = i + opts->max_recv_wr; DLIST_ADD(pconn->wr_list_avail, p); } @@ -286,7 +286,7 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn) struct ibv_recv_wr *bad_wr; list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; - wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index; + wr.wr_id = pconn->recv_index; pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); @@ -318,7 +318,7 @@ static int ibw_fill_cq(struct ibw_conn *conn) for(i = pctx->opts.max_recv_wr; i!=0; i--) { list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index; - wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index; + wr.wr_id = pconn->recv_index; pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr; rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr); @@ -575,17 +575,58 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p; + int send_index; assert(pconn->cm_id->qp->qp_num==wc->qp_num); - assert(wc->wr_id < pctx->opts.max_send_wr); - - p = pconn->wr_index[wc->wr_id]; - if (p->msg_large) { + assert(wc->wr_id > pctx->opts.max_recv_wr); + send_index = wc->wr_id - pctx->opts.max_recv_wr; + + if (send_index < pctx->opts.max_send_wr) { + DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id)); + p = pconn->wr_index[send_index]; + if (p->msg_large) + ibw_free_mr(&p->msg_large, &p->mr_large); + DLIST_REMOVE(pconn->wr_list_used, p); + DLIST_ADD(pconn->wr_list_avail, p); + } else { + DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id)); + for(p=pconn->queue_sent; p!=NULL; p=p->next) + if (p->wr_id==(int)wc->wr_id) + break; + if (p==NULL) { + sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id); + return -1; + } ibw_free_mr(&p->msg_large, &p->mr_large); + DLIST_REMOVE(pconn->queue_sent, p); + DLIST_ADD(pconn->queue_avail, p); } - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); + if (pconn->queue) { + struct ibv_sge list = { + .addr = (uintptr_t) NULL, + .length = *(uint32_t *)(p->msg_large), + .lkey = 0 + }; + struct ibv_send_wr wr = { + .wr_id = p->wr_id + pctx->opts.max_recv_wr, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }; + struct ibv_send_wr *bad_wr; + int rc; + + p = pconn->queue; + DLIST_REMOVE(pconn->queue, p); + DLIST_ADD(pconn->queue_sent, p); + rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); + if (rc) { + sprintf(ibw_lasterr, "ibv_post_send failed with %d\n", rc); + return -1; + } + } return 0; } @@ -643,19 +684,15 @@ static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) { struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); - int recv_index; + struct ibw_part *part = &pconn->part; char *p; uint32_t remain; - struct ibw_part *part; assert(pconn->cm_id->qp->qp_num==wc->qp_num); - assert((int)wc->wr_id > pctx->opts.max_send_wr); - recv_index = (int)wc->wr_id - pctx->opts.max_send_wr; - assert(recv_index < pctx->opts.max_recv_wr); + assert((int)wc->wr_id < pctx->opts.max_recv_wr); assert(wc->byte_len <= pctx->opts.recv_bufsize); - p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize); - part = &pconn->part; + p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize); remain = wc->byte_len; while(remain) { @@ -723,7 +760,6 @@ static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) error: DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); - conn->state = IBWC_ERROR; return -1; } @@ -956,65 +992,102 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n) struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = pconn->wr_list_avail; - if (p==NULL) { - sprintf(ibw_lasterr, "insufficient wr chunks\n"); - return -1; - } - - DLIST_REMOVE(pconn->wr_list_avail, p); - DLIST_ADD(pconn->wr_list_used, p); + if (p) { + DLIST_REMOVE(pconn->wr_list_avail, p); + DLIST_ADD(pconn->wr_list_used, p); - if (n + sizeof(long) <= pctx->opts.avg_send_size) { - *buf = (void *)(p->msg + sizeof(long)); - *key = (void *)p; + if (n + sizeof(long) <= pctx->opts.avg_send_size) { + *buf = (void *)(p->msg + sizeof(long)); + } else { + p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large); + if (!p->msg_large) { + sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n"); + goto error; + } + *buf = (void *)(p->msg_large + sizeof(long)); + } } else { + /* not optimized */ + p = pconn->queue_avail; + if (!p) { + p = pconn->queue_avail = talloc_zero(pconn, struct ibw_wr); + if (p==NULL) { + sprintf(ibw_lasterr, "talloc_zero failed (qmax: %u)", pconn->queue_max); + goto error; + } + p->wr_id = pconn->queue_max + pctx->opts.max_send_wr; + pconn->queue_max++; + switch(pconn->queue_max) { + case 1: DEBUG(2, ("warning: queue performed\n")); break; + case 10: DEBUG(0, ("warning: queue reached 10\n")); break; + case 100: DEBUG(0, ("warning: queue reached 100\n")); break; + case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break; + default: break; + } + } + DLIST_REMOVE(pconn->queue_avail, p); + p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large); if (!p->msg_large) { - sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n"); - DEBUG(0, (ibw_lasterr)); - return -1; + sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed"); + goto error; } *buf = (void *)(p->msg_large + sizeof(long)); } + *key = (void *)p; + return 0; +error: + DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr)); + return -1; } + int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n) { struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); - struct ibv_sge list = { - .addr = (uintptr_t) NULL, - .length = n, - .lkey = 0 - }; - struct ibv_send_wr wr = { - .wr_id = p->wr_id, - .sg_list = &list, - .num_sge = 1, - .opcode = IBV_WR_SEND, - .send_flags = IBV_SEND_SIGNALED, - }; - struct ibv_send_wr *bad_wr; - if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) { - assert((p->msg + sizeof(long))==(char *)buf); - list.lkey = pconn->mr_send->lkey; - list.addr = (uintptr_t) p->msg; + if (p->msg!=NULL) { + struct ibv_sge list = { + .addr = (uintptr_t) NULL, + .length = n, + .lkey = 0 + }; + struct ibv_send_wr wr = { + .wr_id = p->wr_id + pctx->opts.max_recv_wr, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = IBV_SEND_SIGNALED, + }; + struct ibv_send_wr *bad_wr; + + if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) { + assert((p->msg + sizeof(long))==(char *)buf); + list.lkey = pconn->mr_send->lkey; + list.addr = (uintptr_t) p->msg; + + *((uint32_t *)p->msg) = htonl(n); + } else { + assert((p->msg_large + sizeof(long))==(char *)buf); + assert(p->mr_large!=NULL); + list.lkey = p->mr_large->lkey; + list.addr = (uintptr_t) p->msg_large; + + *((uint32_t *)p->msg_large) = htonl(n); + } + return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); + } /* else: */ - *((uint32_t *)p->msg) = htonl(n); - } else { - assert((p->msg_large + sizeof(long))==(char *)buf); - assert(p->mr_large!=NULL); - list.lkey = p->mr_large->lkey; - list.addr = (uintptr_t) p->msg_large; + *((uint32_t *)p->msg_large) = htonl(n); - *((uint32_t *)p->msg_large) = htonl(n); - } + /* to be sent by ibw_wc_send */ + DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */ - return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); + return 0; } const char *ibw_getLastError(void) diff --git a/ib/ibwrapper_internal.h b/ib/ibwrapper_internal.h index 6e349177..687a5797 100644 --- a/ib/ibwrapper_internal.h +++ b/ib/ibwrapper_internal.h @@ -79,6 +79,11 @@ struct ibw_conn_priv { struct ibw_wr *wr_list_used; struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */ + struct ibw_wr *queue; + struct ibw_wr *queue_sent; + struct ibw_wr *queue_avail; + int queue_max; /* max wr_id in the queue */ + /* buf_recv is a ring buffer */ char *buf_recv; /* max_recv_wr * avg_recv_size */ struct ibv_mr *mr_recv; -- 2.34.1