Implemented cm usage.
authorPeter Somogyi <psomogyi@gamax.hu>
Wed, 6 Dec 2006 17:49:46 +0000 (18:49 +0100)
committerPeter Somogyi <psomogyi@gamax.hu>
Wed, 6 Dec 2006 17:49:46 +0000 (18:49 +0100)
TODO: implement verbs user logic.

(This used to be ctdb commit afa33107137698500d0aabaf244ea4276c415929)

ctdb/ib/ibwrapper.c
ctdb/ib/ibwrapper.h
ctdb/ib/ibwrapper_internal.h

index 9cc7c9c293b5a22c35e02dc8d0b79ba4fc18dec2..23396f7639be5f8345325512bb6718f4e765034f 100644 (file)
@@ -51,16 +51,44 @@ static int ibw_ctx_priv_destruct(void *ptr)
        ibw_ctx *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
        assert(pctx!=NULL);
 
-       if (pctx->cm_id) {
-               rdma_destroy_id(pctx->cm_id);
-               pctx->cm_id = NULL;
+       /* free memory regions */
+       
+       /* destroy verbs */
+       if (pctx->cq) {
+               ibv_destroy_cq(pctx->cq);
+               pctx->cq = NULL;
+       }
+
+       if (pctx->verbs_channel) {
+               ibv_destroy_comp_channel(pctx->verbs_channel);
+               pctx->verbs_channel = NULL;
        }
+
+       if (pctx->verbs_channel_event) {
+               /* TODO: do we have to do this here? */
+               talloc_free(pctx->verbs_channel_event);
+               pctx->verbs_channel_event = NULL;
+       }
+
+       if (pctx->pd) {
+               ibv_dealloc_pd(pctx->pd);
+               pctx->pd = NULL;
+       }
+
+       /* destroy cm */
        if (pctx->cm_channel) {
                rdma_destroy_event_channel(pctx->cm_channel);
                pctx->cm_channel = NULL;
        }
-
-       /* free memory regions */
+       if (pctx->cm_channel_event) {
+               /* TODO: do we have to do this here? */
+               talloc_free(pctx->cm_channel_event);
+               pctx->cm_channel_event = NULL;
+       }
+       if (pctx->cm_id) {
+               rdma_destroy_id(pctx->cm_id);
+               pctx->cm_id = NULL;
+       }
 }
 
 static int ibw_ctx_destruct(void *ptr)
@@ -97,7 +125,7 @@ static int ibw_conn_destruct(void *ptr)
        return 0;
 }
 
-static ibw_conn *ibw_new_conn(ibw_ctx *ctx)
+static ibw_conn *ibw_conn_new(ibw_ctx *ctx)
 {
        ibw_conn *conn;
        ibw_conn_priv *pconn;
@@ -117,7 +145,27 @@ static ibw_conn *ibw_new_conn(ibw_ctx *ctx)
        return conn;
 }
 
-static void ibw_process_cm_event(struct event_context *ev,
+static int ibw_manage_connect(struct rdma_cm_id *cma_id)
+{
+       struct rdma_conn_param conn_param;
+       int     rc;
+
+       /* TODO: setup verbs... */
+
+       /* cm connect */
+       memset(&conn_param, 0, sizeof conn_param);
+       conn_param.responder_resources = 1;
+       conn_param.initiator_depth = 1;
+       conn_param.retry_count = 10;
+
+       rc = rdma_connect(cma_id, &conn_param);
+       if (rc)
+               sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
+
+       return rc;
+}
+
+static void ibw_event_handler_cm(struct event_context *ev,
        struct fd_event *fde, uint16_t flags, void *private_data)
 {
        int     rc;
@@ -125,8 +173,9 @@ static void ibw_process_cm_event(struct event_context *ev,
        ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
        ibw_conn *conn = NULL;
        ibw_conn_priv *pconn = NULL;
-       struct rdma_cm_id *id = NULL;
+       struct rdma_cm_id *cma_id = NULL;
        struct rdma_cm_event *event = NULL;
+       int     error = 0;
 
        assert(ctx!=NULL);
 
@@ -137,42 +186,75 @@ static void ibw_process_cm_event(struct event_context *ev,
                DEBUG(0, ibw_lasterr);
                return;
        }
-       id = event->id;
-
-       /* find whose cm_id do we have */
+       cma_id = event->id;
 
-//     DEBUG(10, "cma_event type %d cma_id %p (%s)\n", event->event, event->id,
-//               (event->id == ctx->cm_id) ? "parent" : "child");
+       DEBUG(10, "cma_event type %d cma_id %p (%s)\n", event->event, id,
+                 (cma_id == ctx->cm_id) ? "parent" : "child");
 
        switch (event->event) {
        case RDMA_CM_EVENT_ADDR_RESOLVED:
+               /* continuing from ibw_connect ... */
                assert(pctx->state==IWINT_INIT);
                pctx->state = IWINT_ADDR_RESOLVED;
-               rc = rdma_resolve_route(event->id, 2000);
+               rc = rdma_resolve_route(cma_id, 2000);
                if (rc) {
                        cb->state = ERROR;
                        sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
                        DEBUG(0, ibw_lasterr);
                }
+               /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
                break;
 
        case RDMA_CM_EVENT_ROUTE_RESOLVED:
+               /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
                assert(pctx->state==IWINT_ADDR_RESOLVED);
                pctx->state = IWINT_ROUTE_RESOLVED;
+               conn = talloc_get_type(cma_id->context, ibw_conn);
+               pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+               rc = ibw_manage_connect(cma_id);
+               if (rc)
+                       error = 1;
+
                break;
 
        case RDMA_CM_EVENT_CONNECT_REQUEST:
                ctx->state = IBWS_CONNECT_REQUEST;
-               conn = ibw_new_conn(ctx);
-               pconn = talloc_get_type(conn, ibw_conn_priv);
-               pconn->cm_id = event->id; /* !!! event will be freed but not id */
-               DEBUG(10, "conn->cm_id %p\n", pconn->cm_id);
+               conn = ibw_conn_new(ctx);
+               pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+               pconn->cm_id = cma_id; /* !!! event will be freed but id not */
+               cma_id->context = (void *)conn;
+               DEBUG(10, "pconn->cm_id %p\n", pconn->cm_id);
+
+               conn->state = IBWC_INIT;
+
+               pctx->connstate_func(ctx, conn);
+
+               /* continued at ibw_accept when invoked by the func above */
+               if (!pconn->is_accepted) {
+                       talloc_free(conn);
+                       DEBUG(10, "pconn->cm_id %p wasn't accepted\n", pconn->cm_id);
+               }
+
+               /* TODO: clarify whether if it's needed by upper layer: */
+               ctx->state = IBWS_READY;
+               pctx->connstate_func(ctx, NULL);
+
+               /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
                break;
 
        case RDMA_CM_EVENT_ESTABLISHED:
+               /* expected after ibw_accept and ibw_connect[not directly] */
                DEBUG(0, "ESTABLISHED\n");
                ctx->state = IBWS_READY;
-               /* TODO */
+               conn = talloc_get_type(cma_id->context, ibw_conn);
+               assert(conn!=NULL); /* important assumption */
+               pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+               conn->state = IBWC_CONNECTED;
+
+               /* both ctx and conn have changed */
+               pctx->connstate_func(ctx, conn);
                break;
 
        case RDMA_CM_EVENT_ADDR_ERROR:
@@ -180,30 +262,63 @@ static void ibw_process_cm_event(struct event_context *ev,
        case RDMA_CM_EVENT_CONNECT_ERROR:
        case RDMA_CM_EVENT_UNREACHABLE:
        case RDMA_CM_EVENT_REJECTED:
-               DEBUG(0, "cma event %d, error %d\n", event->event,
-                      event->status);
-               ctx->state = IBWS_ERROR;
+               DEBUG(0, "cma event %d, error %d\n", event->event, event->status);
+               error = 1;
                break;
 
        case RDMA_CM_EVENT_DISCONNECTED:
-               DEBUG(0, "%s DISCONNECT EVENT...\n", cb->server ? "server" : "client");
-               /* TODO */
+               if (cma_id!=ctx->cm_id) {
+                       DEBUG(0, "client DISCONNECT event\n");
+                       conn = talloc_get_type(cma_id->context, ibw_conn);
+                       conn->state = IBWC_DISCONNECTED;
+                       pctx->connstate_func(NULL, conn);
+
+                       talloc_free(conn);
+               } else {
+                       DEBUG(0, "server DISCONNECT event\n");
+                       ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
+                       pctx->connstate_func(ctx, NULL);
+               }
                break;
 
        case RDMA_CM_EVENT_DEVICE_REMOVAL:
                DEBUG(0, "cma detected device removal!\n");
+               error = 1;
                break;
 
        default:
-               DEBUG(0, "oof bad type!\n");
+               DEBUG(0, "unknown event %d\n", event->event);
+               error = 1;
                break;
        }
 
+       if (error) {
+               DEBUG(0, ibw_lasterr);
+               if (cma_id!=ctx->cm_id) {
+                       conn = talloc_get_type(cma_id->context, ibw_conn);
+                       conn->state = IBWC_ERROR;
+                       pctx->connstate_func(NULL, conn);
+               } else {
+                       ctx->state = IBWS_ERROR;
+                       pctx->connstate_func(ctx, NULL);
+               }
+       }
+
        if ((rc=rdma_ack_cm_event(event))) {
-               DEBUG(0, "rdma_ack_cm_event failed with %d\n", rc);
+               sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n");
+               DEBUG(0, ibw_lasterr, rc);
        }
 }
 
+static void ibw_event_handler_verbs(struct event_context *ev,
+       struct fd_event *fde, uint16_t flags, void *private_data)
+{
+       int     rc;
+       ibw_ctx *ctx = talloc_get_type(private_data, ibw_ctx);
+       ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+
+}
+
 static int ibw_process_init_attrs(ibw_initattr *attr, int nattr, ibw_opts *opts)
 {
        int     i;
@@ -260,7 +375,7 @@ ibw_ctx *ibw_init(ibw_initattr *attr, int nattr,
        if (ibw_process_init_attrs(attr, nattr, pctx->opts))
                goto cleanup;
 
-       /* initialize CM stuff */
+       /* init cm */
        pctx->cm_channel = rdma_create_event_channel();
        if (!pctx->cm_channel) {
                ret = errno;
@@ -269,7 +384,7 @@ ibw_ctx *ibw_init(ibw_initattr *attr, int nattr,
        }
 
        pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
-               pctx->cm_channel->fd, EVENT_FD_READ, ibw_process_cm_event, ctx);
+               pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
 
        rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, cb, RDMA_PS_TCP);
        if (rc) {
@@ -279,11 +394,33 @@ ibw_ctx *ibw_init(ibw_initattr *attr, int nattr,
        }
        DEBUG(10, "created cm_id %p\n", pctx->cm_id);
 
+       /* init verbs */
+       pctx->pd = ibv_alloc_pd(pctx->cmid->verbs);
+       if (!pctx->pd) {
+               sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
+               goto cleanup;
+       }
+       DEBUG(10, "created pd %p\n", pctx->pd);
+
+       pctx->verbs_channel = ibv_create_comp_channel(cm_id->verbs);
+       if (!pctx->verbs_channel) {
+               sprintf(stderr, "ibv_create_comp_channel failed %d\n", errno);
+               goto cleanup;
+       }
+       DEBUG_LOG("created channel %p\n", pctx->channel);
+
+       pctx->verbs_channel_event = event_add_fd(pctx->ectx, pctx,
+               pctx->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, ctx);
+
+       pctx->cq = ibv_create_cq(cm_id->verbs, pctx->opts.rx_depth, ctx,
+               ctx->verbs_channel, 0);
+
        /* allocate ib memory regions */
 
        return ctx;
 
 cleanup:
+       DEBUG(0, ibw_lasterr);
        if (ctx)
                talloc_free(ctx);
 
@@ -301,46 +438,97 @@ int ibw_bind(ibw_ctx *ctx, struct sockaddr_in *my_addr)
        ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
        int     rc;
 
-       rc = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &my_addr);
+       rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
        if (rc) {
                sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
+               DEBUG(0, ibw_lasterr);
                return rc;
        }
+       DEBUG(10, "rdma_bind_addr successful\n");
 
        return 0;
 }
 
 int ibw_listen(ibw_ctx *ctx, int backlog)
 {
-       ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
-       
+       ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+       int     rc;
+
+       DEBUG_LOG("rdma_listen...\n");
+       rc = rdma_listen(cb->cm_id, backlog);
+       if (rc) {
+               sprintf(ibw_lasterr, "rdma_listen failed: %d\n", ret);
+               DEBUG(0, ibw_lasterr);
+               return rc;
+       }       
+
        return 0;
 }
 
-int ibw_accept(ibw_ctx *ctx, void *conn_userdata)
+int ibw_accept(ibw_ctx *ctx, ibw_conn *conn, void *conn_userdata)
 {
-       ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
-       
+       ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+       ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+       struct rdma_conn_param  conn_param;
+
+       memset(&conn_param, 0, sizeof(struct rdma_conn_param));
+       conn_param.responder_resources = 1;
+       conn_param.initiator_depth = 1;
+       rc = rdma_accept(pconn->cm_id, &conn_param);
+       if (rc) {
+               sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
+               DEBUG(0, ibw_lasterr);
+               return -1;;
+       }
+
+       pconn->is_accepted = 1;
+
+       /* continued at RDMA_CM_EVENT_ESTABLISHED */
+
        return 0;
 }
 
 int ibw_connect(ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
 {
-       ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
-               
+       ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+       ibw_conn *conn = NULL;
+       int     rc;
+
+       conn = ibw_conn_new(ctx);
+       conn->conn_userdata = conn_userdata;
+       pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+       rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
+       if (rc) {
+               rc = errno;
+               sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
+               return rc;
+       }
+
+       assert(ctx->state==IBWS_READY);
+
+       rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
+       if (rc) {
+               sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
+               DEBUG(0, ibw_lasterr);
+               return -1;
+       }
+
+       /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
+
        return 0;
 }
 
 void ibw_disconnect(ibw_conn *conn)
 {
-       ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
+       ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
        
        return 0;
 }
 
 int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n)
 {
-       ibw_conn_priv *pconn = (ibw_ctx_priv *)ctx->internal;
+       ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
 
        return 0;
 }
index f21956cdba92a1add12d44a819a4b1771a41bbfa..9b9666a60c48ef61a909f9974369d0a60bf45f81 100644 (file)
@@ -148,7 +148,7 @@ int ibw_listen(ibw_ctx *ctx, int backlog);
  *
  * Important: you won't get remote IP address (only internal conn info)
  */
-int ibw_accept(ibw_ctx *ctx, void *conn_userdata);
+int ibw_accept(ibw_ctx *ctx, ibw_conn *conn, void *conn_userdata);
 
 /*
  * Needs a normal internet address here
index df2f14a2c1459c335af1c2e7846bd1c97e977030..8a1a445abb3764fcca40e1614bd1ef261ccda6d2 100644 (file)
@@ -50,13 +50,18 @@ typedef struct _ibw_ctx_priv {
 
        ibw_opts opts;
 
-       struct ibv_context     *context;
-       struct ibv_pd          *pd;
        struct rdma_cm_id       *cm_id; /* server cm id */
 
        struct rdma_event_channel *cm_channel;
        struct fd_event *cm_channel_event;
 
+       struct rdma_event_channel *cm_channel;
+       struct fd_event *cm_channel_event;
+       struct ibv_comp_channel *verbs_channel;
+       struct fd_event *verbs_channel_event;
+
+       struct ibv_pd          *pd;
+
        ibw_connstate_fn_t connstate_func;
        ibw_receive_fn_t receive_func;
 } ibw_ctx_priv;
@@ -66,6 +71,7 @@ typedef struct _ibw_conn_priv {
        struct ibv_qp   *qp;
 
        struct rdma_cm_id *cm_id; /* client's cm id */
+       int     is_accepted;
 } ibw_conn_priv;
 
 /*