Made receiver handle partial packets.
authorPeter Somogyi <psomogyi@gamax.hu>
Wed, 20 Dec 2006 16:42:58 +0000 (17:42 +0100)
committerPeter Somogyi <psomogyi@gamax.hu>
Wed, 20 Dec 2006 16:42:58 +0000 (17:42 +0100)
(This used to be ctdb commit 808fd658552e489825fb22453755e225549ebfcc)

1  2 
ctdb/ib/ibwrapper.c
ctdb/ib/ibwrapper_internal.h

index c04505bc474254effef32d5c3708e9afae885c80,b70b6caad6b8727d214fd99bd0de3c927083bbec..db6e303638bc025934927a7f1930e033c266f1fe
@@@ -49,38 -49,7 +49,40 @@@ static char ibw_lasterr[IBW_LASTERR_BUF
  static void ibw_event_handler_verbs(struct event_context *ev,
        struct fd_event *fde, uint16_t flags, void *private_data);
  static int ibw_fill_cq(struct ibw_conn *conn);
++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
  
 +static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
 +      int n, struct ibv_mr **ppmr)
 +{
 +      void *buf;
 +      buf = memalign(pctx->pagesize, n);
 +      if (!buf) {
 +              sprintf(ibw_lasterr, "couldn't allocate memory\n");
 +              return NULL;
 +      }
 +
 +      *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
 +      if (!*ppmr) {
 +              sprintf(ibw_lasterr, "couldn't allocate mr\n");
 +              free(buf);
 +              return NULL;
 +      }
 +
 +      return buf;
 +}
 +
 +static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
 +{
 +      if (*ppmr!=NULL) {
 +              ibv_dereg_mr(*ppmr);
 +              *ppmr = NULL;
 +      }
 +      if (*ppbuf) {
 +              free(*ppbuf);
 +              *ppbuf = NULL;
 +      }
 +}
  
  static int ibw_init_memory(struct ibw_conn *conn)
  {
@@@ -503,67 -489,62 +505,61 @@@ static void ibw_event_handler_verbs(str
  
        struct ibv_wc wc;
        int rc;
++      struct ibv_cq *ev_cq;
++      void          *ev_ctx;
  
--      rc = ibv_poll_cq(pconn->cq, 1, &wc);
--      if (rc!=1) {
--              sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
++      /* TODO: check whether if it's good to have more channels here... */
++      rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
++      if (rc) {
++              sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
                goto error;
        }
--      if (wc.status) {
--              sprintf(ibw_lasterr, "cq completion failed status %d\n",
--                      wc.status);
++      if (ev_cq != pconn->cq) {
++              sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
++                      (unsigned int)ev_cq, (unsigned int)pconn->cq);
++              goto error;
++      }
++      rc = ibv_req_notify_cq(pconn->cq, 0);
++      if (rc) {
++              sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
                goto error;
        }
  
--      switch(wc.opcode) {
--      case IBV_WC_SEND:
--              {
--                      struct ibw_wr   *p;
--      
--                      DEBUG(10, ("send completion\n"));
--                      assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-                       assert(wc.wr_id < pctx->opts.max_send_wr);
 -                      assert(wc.wr_id < pctx->qsize);
--                      p = pconn->wr_index[wc.wr_id];
-                       if (p->msg_large) {
-                               ibw_free_mr(&p->msg_large, &p->mr_large);
-                       }
--                      DLIST_REMOVE(pconn->wr_list_used, p);
--                      DLIST_ADD(pconn->wr_list_avail, p);
++      while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
++              if (wc.status) {
++                      sprintf(ibw_lasterr, "cq completion failed status %d\n",
++                              wc.status);
++                      goto error;
                }
--              break;
--
--      case IBV_WC_RDMA_WRITE:
--              DEBUG(10, ("rdma write completion\n"));
--              break;
  
--      case IBV_WC_RDMA_READ:
--              DEBUG(10, ("rdma read completion\n"));
--              break;
++              switch(wc.opcode) {
++              case IBV_WC_SEND:
++                      DEBUG(10, ("send completion\n"));
++                      if (ibw_wc_send(conn, &wc))
++                              goto error;
++                      break;
  
--      case IBV_WC_RECV:
--              {
-                       int     recv_index;
 -                      struct ibw_wr   *p;
 -      
 -                      assert(pconn->cm_id->qp->qp_num==wc.qp_num);
 -                      assert(wc.wr_id < pctx->qsize);
 -                      p = pconn->wr_index[wc.wr_id];
 -      
 -                      DLIST_REMOVE(pconn->wr_list_used, p);
 -                      DLIST_ADD(pconn->wr_list_avail, p);
++              case IBV_WC_RDMA_WRITE:
++                      DEBUG(10, ("rdma write completion\n"));
++                      break;
+       
++              case IBV_WC_RDMA_READ:
++                      DEBUG(10, ("rdma read completion\n"));
++                      break;
 +
++              case IBV_WC_RECV:
                        DEBUG(10, ("recv completion\n"));
-                       assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-                       assert((int)wc.wr_id > pctx->opts.max_send_wr);
-                       recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
-                       assert(recv_index < pctx->opts.max_recv_wr);
-                       assert(wc.byte_len <= pctx->opts.recv_bufsize);
- /* TODO: take care of fragmented messages !!! */
-                       pctx->receive_func(conn,
-                               pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
-                               wc.byte_len);
 -                      assert(wc.byte_len <= pctx->max_msg_size);
 -      
 -                      pctx->receive_func(conn, p->msg, wc.byte_len);
--                      if (ibw_refill_cq_recv(conn))
++                      if (ibw_wc_recv(conn, &wc))
                                goto error;
--              }
--              break;
++                      break;
  
--      default:
--              sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
++              default:
++                      sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
++                      goto error;
++              }
++      }
++      if (rc!=0) {
++              sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
                goto error;
        }
  
@@@ -574,6 -555,6 +570,163 @@@ error
        pctx->connstate_func(NULL, conn);
  }
  
++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
++{
++      struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
++      struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
++      struct ibw_wr   *p;
++
++      assert(pconn->cm_id->qp->qp_num==wc->qp_num);
++      assert(wc->wr_id < pctx->opts.max_send_wr);
++
++      p = pconn->wr_index[wc->wr_id];
++      if (p->msg_large) {
++              ibw_free_mr(&p->msg_large, &p->mr_large);
++      }
++
++      DLIST_REMOVE(pconn->wr_list_used, p);
++      DLIST_ADD(pconn->wr_list_avail, p);
++
++      return 0;
++}
++
++static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
++      char **pp, uint32_t add_len, int info)
++{
++      /* allocate more if necessary - it's an "evergrowing" buffer... */
++      if (part->len + add_len > part->bufsize) {
++              if (part->buf==NULL) {
++                      assert(part->len==0);
++                      part->buf = talloc_size(memctx, add_len);
++                      if (part->buf==NULL) {
++                              sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
++                                      add_len, info);
++                              return -1;
++                      }
++                      part->bufsize = add_len;
++              } else {
++                      part->buf = talloc_realloc_size(memctx,
++                              part->buf, part->len + add_len);
++                      if (part->buf==NULL) {
++                              sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
++                                      part->len, add_len, info);
++                              return -1;
++                      }
++              }
++              part->bufsize = part->len + add_len;
++      }
++
++      /* consume pp */
++      memcpy(part->buf + part->len, *pp, add_len);
++      *pp += add_len;
++      part->len += add_len;
++      part->to_read -= add_len;
++
++      return 0;
++}
++
++static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
++{
++      if (part->bufsize > threshold) {
++              talloc_free(part->buf);
++              part->buf = talloc_size(memctx, threshold);
++              if (part->buf==NULL) {
++                      sprintf(ibw_lasterr, "talloc_size failed\n");
++                      return -1;
++              }
++              part->bufsize = threshold;
++      }
++      return 0;
++}
++
++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
++{
++      struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
++      struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
++      int     recv_index;
++      char    *p;
++      uint32_t        remain;
++      struct ibw_part *part;
++
++      assert(pconn->cm_id->qp->qp_num==wc->qp_num);
++      assert((int)wc->wr_id > pctx->opts.max_send_wr);
++      recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
++      assert(recv_index < pctx->opts.max_recv_wr);
++      assert(wc->byte_len <= pctx->opts.recv_bufsize);
++
++      p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
++      part = &pconn->part;
++
++      remain = wc->byte_len;
++      while(remain) {
++              /* here always true: (part->len!=0 && part->to_read!=0) ||
++                      (part->len==0 && part->to_read==0) */
++              if (part->len) { /* is there a partial msg to be continued? */
++                      int read_len = (part->to_read<=remain) ? part->to_read : remain;
++                      if (ibw_append_to_part(pconn, part, &p, read_len, 421))
++                              goto error;
++                      remain -= read_len;
++
++                      if (part->len<=sizeof(uint32_t) && part->to_read==0) {
++                              assert(part->len==sizeof(uint32_t));
++                              /* set it again now... */
++                              part->to_read = *((uint32_t *)(part->buf));
++                              if (part->to_read<sizeof(uint32_t)) {
++                                      sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
++                                      goto error;
++                              }
++                              part->to_read -= sizeof(uint32_t); /* it's already read */
++                      }
++
++                      if (part->to_read==0) {
++                              pctx->receive_func(conn, part->buf, part->len);
++                              part->len = 0; /* tells not having partial data (any more) */
++                              if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
++                                      goto error;
++                      }
++              } else {
++                      if (remain>=sizeof(uint32_t)) {
++                              uint32_t msglen = *(uint32_t *)p;
++                              if (msglen<sizeof(uint32_t)) {
++                                      sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
++                                      goto error;
++                              }
++
++                              /* mostly awaited case: */
++                              if (msglen<=remain) {
++                                      pctx->receive_func(conn, p, msglen);
++                                      p += msglen;
++                                      remain -= msglen;
++                              } else {
++                                      part->to_read = msglen;
++                                      /* part->len is already 0 */
++                                      if (ibw_append_to_part(pconn, part, &p, remain, 422))
++                                              goto error;
++                                      remain = 0; /* to be continued ... */
++                                      /* part->to_read > 0 here */
++                              }
++                      } else { /* edge case: */
++                              part->to_read = sizeof(uint32_t);
++                              /* part->len is already 0 */
++                              if (ibw_append_to_part(pconn, part, &p, remain, 423))
++                                      goto error;
++                              remain = 0;
++                              /* part->to_read > 0 here */
++                      }
++              }
++      } /* <remain> is always decreased at least by 1 */
++
++      if (ibw_refill_cq_recv(conn))
++              goto error;
++
++      return 0;
++
++error:
++      DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
++      conn->state = IBWC_ERROR;
++      return -1;
++}
++
  static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
  {
        int     i;
  
        opts->max_send_wr = 256;
        opts->max_recv_wr = 1024;
 +      opts->avg_send_size = 1024;
 +      opts->recv_bufsize = 256;
++      opts->recv_threshold = 1 * 1024 * 1024;
  
        for(i=0; i<nattr; i++) {
                name = attr[i].name;
                        opts->max_send_wr = atoi(value);
                else if (strcmp(name, "max_recv_wr")==0)
                        opts->max_recv_wr = atoi(value);
 +              else if (strcmp(name, "avg_send_size")==0)
 +                      opts->avg_send_size = atoi(value);
 +              else if (strcmp(name, "recv_bufsize")==0)
 +                      opts->recv_bufsize = atoi(value);
++              else if (strcmp(name, "recv_threshold")==0)
++                      opts->recv_threshold = atoi(value);
                else {
                        sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
                        return -1;
@@@ -843,20 -810,8 +999,20 @@@ int ibw_send(struct ibw_conn *conn, voi
        };
        struct ibv_send_wr *bad_wr;
  
-       if (n + sizeof(long)<=pctx->opts.avg_send_size) {
 -      assert(p->msg==(char *)buf);
 -      assert(n<=pctx->max_msg_size);
++      if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
 +              assert((p->msg + sizeof(long))==(char *)buf);
 +              list.lkey = pconn->mr_send->lkey;
 +              list.addr = (uintptr_t) p->msg;
 +
 +              *((uint32_t *)p->msg) = htonl(n);
 +      } else {
 +              assert((p->msg_large + sizeof(long))==(char *)buf);
 +              assert(p->mr_large!=NULL);
 +              list.lkey = p->mr_large->lkey;
 +              list.addr = (uintptr_t) p->msg_large;
 +
 +              *((uint32_t *)p->msg_large) = htonl(n);
 +      }
  
        return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
  }
index b819c483d3db82e4ca9964434f2814674ad1dd4d,04d82f9565de970303196c47828f6e63f7e01606..6e34917755b45680287b712e39c6c87646bf3bd8
   */
  
  struct ibw_opts {
--      int     max_send_wr;
--      int     max_recv_wr;
-       int     avg_send_size;
-       int     recv_bufsize;
++      uint32_t        max_send_wr;
++      uint32_t        max_recv_wr;
++      uint32_t        avg_send_size;
++      uint32_t        recv_bufsize;
++      uint32_t        recv_threshold;
  };
  
  struct ibw_wr {
@@@ -54,8 -48,10 +55,15 @@@ struct ibw_ctx_priv 
        ibw_receive_fn_t receive_func; /* see ibw_init */
  
        long    pagesize; /* sysconf result for memalign */
 -      int     qsize; /* opts.max_send_wr + opts.max_recv_wr */
 -      int     max_msg_size; /* see ibw_init */
 +};
 +
++struct ibw_part {
++      char *buf; /* talloced memory buffer */
++      uint32_t bufsize; /* allocated size of buf - always grows */
++      uint32_t len; /* message part length */
++      uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */
+ };
  struct ibw_conn_priv {
        struct ibv_comp_channel *verbs_channel;
        struct fd_event *verbs_channel_event;
        struct ibw_wr *wr_list_avail;
        struct ibw_wr *wr_list_used;
        struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
-       int recv_index; /* index of the next recv buffer */
 +
 +      /* buf_recv is a ring buffer */
 +      char *buf_recv; /* max_recv_wr * avg_recv_size */
 +      struct ibv_mr *mr_recv;
++      int recv_index; /* index of the next recv buffer when refilling */
++      struct ibw_part part;
  };