Added trace messages + several fixes
[metze/ctdb/wip.git] / ib / ibwrapper.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Wrap Infiniband calls.
4  *
5  * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
6  *
7  * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <malloc.h>
34 #include <assert.h>
35 #include <unistd.h>
36
37 #include "includes.h"
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
40
41 #include <rdma/rdma_cma.h>
42
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
45
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
48
49 static void ibw_event_handler_verbs(struct event_context *ev,
50         struct fd_event *fde, uint16_t flags, void *private_data);
51 static int ibw_fill_cq(struct ibw_conn *conn);
52 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
53 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
54
55 static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
56         uint32_t n, struct ibv_mr **ppmr)
57 {
58         void *buf;
59
60         DEBUG(10, ("ibw_alloc_mr(cmid=%u, n=%u)\n", (uint32_t)pconn->cm_id, n));
61         buf = memalign(pctx->pagesize, n);
62         if (!buf) {
63                 sprintf(ibw_lasterr, "couldn't allocate memory\n");
64                 return NULL;
65         }
66
67         *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
68         if (!*ppmr) {
69                 sprintf(ibw_lasterr, "couldn't allocate mr\n");
70                 free(buf);
71                 return NULL;
72         }
73
74         return buf;
75 }
76
77 static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
78 {
79         DEBUG(10, ("ibw_free_mr(%u %u)\n", (uint32_t)*ppbuf, (uint32_t)*ppmr));
80         if (*ppmr!=NULL) {
81                 ibv_dereg_mr(*ppmr);
82                 *ppmr = NULL;
83         }
84         if (*ppbuf) {
85                 free(*ppbuf);
86                 *ppbuf = NULL;
87         }
88 }
89
90 static int ibw_init_memory(struct ibw_conn *conn)
91 {
92         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
93         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
94         struct ibw_opts *opts = &pctx->opts;
95         int     i;
96         struct ibw_wr   *p;
97
98         DEBUG(10, ("ibw_init_memory(cmid: %u)\n", (uint32_t)pconn->cm_id));
99         pconn->buf_send = ibw_alloc_mr(pctx, pconn,
100                 opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
101         if (!pconn->buf_send) {
102                 sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
103                 return -1;
104         }
105
106         pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
107                 opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv);
108         if (!pconn->buf_recv) {
109                 sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
110                 return -1;
111         }
112
113         pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *));
114         assert(pconn->wr_index!=NULL);
115
116         for(i=0; i<opts->max_send_wr; i++) {
117                 p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
118                 p->msg = pconn->buf_send + (i * opts->avg_send_size);
119                 p->wr_id = i + opts->max_recv_wr;
120
121                 DLIST_ADD(pconn->wr_list_avail, p);
122         }
123
124         return 0;
125 }
126
127 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx)
128 {
129         DEBUG(10, ("ibw_ctx_priv_destruct(%u)\n", (uint32_t)pctx));
130
131         if (pctx->pd) {
132                 ibv_dealloc_pd(pctx->pd);
133                 pctx->pd = NULL;
134         }
135
136         /* destroy cm */
137         if (pctx->cm_channel) {
138                 rdma_destroy_event_channel(pctx->cm_channel);
139                 pctx->cm_channel = NULL;
140         }
141         if (pctx->cm_channel_event) {
142                 /* TODO: do we have to do this here? */
143                 talloc_free(pctx->cm_channel_event);
144                 pctx->cm_channel_event = NULL;
145         }
146         if (pctx->cm_id) {
147                 rdma_destroy_id(pctx->cm_id);
148                 pctx->cm_id = NULL;
149         }
150
151         return 0;
152 }
153
154 static int ibw_ctx_destruct(struct ibw_ctx *ctx)
155 {
156         DEBUG(10, ("ibw_ctx_destruct(%u)\n", (uint32_t)ctx));
157         return 0;
158 }
159
160 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
161 {
162         DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %u)\n",
163                 (uint32_t)pconn, (uint32_t)pconn->cm_id));
164
165         /* free memory regions */
166         ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
167         ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
168
169         /* pconn->wr_index is freed by talloc */
170         /* pconn->wr_index[i] are freed by talloc */
171
172         /* destroy verbs */
173         if (pconn->cm_id->qp) {
174                 ibv_destroy_qp(pconn->cm_id->qp);
175                 pconn->cm_id->qp = NULL;
176         }
177         if (pconn->cq) {
178                 ibv_destroy_cq(pconn->cq);
179                 pconn->cq = NULL;
180         }
181         if (pconn->verbs_channel) {
182                 ibv_destroy_comp_channel(pconn->verbs_channel);
183                 pconn->verbs_channel = NULL;
184         }
185         if (pconn->verbs_channel_event) {
186                 /* TODO: do we have to do this here? */
187                 talloc_free(pconn->verbs_channel_event);
188                 pconn->verbs_channel_event = NULL;
189         }
190         if (pconn->cm_id) {
191                 rdma_destroy_id(pconn->cm_id);
192                 pconn->cm_id = NULL;
193         }
194         return 0;
195 }
196
197 static int ibw_conn_destruct(struct ibw_conn *conn)
198 {
199         DEBUG(10, ("ibw_conn_destruct(%u)\n", (uint32_t)conn));
200         
201         /* important here: ctx is a talloc _parent_ */
202         DLIST_REMOVE(conn->ctx->conn_list, conn);
203         return 0;
204 }
205
206 static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
207 {
208         struct ibw_conn *conn;
209         struct ibw_conn_priv *pconn;
210
211         conn = talloc_zero(ctx, struct ibw_conn);
212         assert(conn!=NULL);
213         talloc_set_destructor(conn, ibw_conn_destruct);
214
215         pconn = talloc_zero(ctx, struct ibw_conn_priv);
216         assert(pconn!=NULL);
217         talloc_set_destructor(pconn, ibw_conn_priv_destruct);
218
219         conn->ctx = ctx;
220
221         DLIST_ADD(ctx->conn_list, conn);
222
223         return conn;
224 }
225
226 static int ibw_setup_cq_qp(struct ibw_conn *conn)
227 {
228         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
229         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
230         struct ibv_qp_init_attr init_attr;
231         int rc;
232
233         DEBUG(10, ("ibw_setup_cq_qp(cmid: %u)\n", (uint32_t)pconn->cm_id));
234
235         /* init mr */
236         if (ibw_init_memory(conn))
237                 return -1;
238
239         /* init verbs */
240         pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
241         if (!pconn->verbs_channel) {
242                 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
243                 return -1;
244         }
245         DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
246
247         pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
248                 pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
249
250         /* init cq */
251         pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
252                 pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
253                 conn, pconn->verbs_channel, 0);
254         if (pconn->cq==NULL) {
255                 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
256                 return -1;
257         }
258
259         rc = ibv_req_notify_cq(pconn->cq, 0);
260         if (rc) {
261                 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
262                 return rc;
263         }
264
265         /* init qp */
266         memset(&init_attr, 0, sizeof(init_attr));
267         init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
268         init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
269         init_attr.cap.max_recv_sge = 1;
270         init_attr.cap.max_send_sge = 1;
271         init_attr.qp_type = IBV_QPT_RC;
272         init_attr.send_cq = pconn->cq;
273         init_attr.recv_cq = pconn->cq;
274
275         rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
276         if (rc) {
277                 sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
278                 return rc;
279         }
280         /* elase result is in pconn->cm_id->qp */
281
282         return ibw_fill_cq(conn);
283 }
284
285 static int ibw_refill_cq_recv(struct ibw_conn *conn)
286 {
287         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
288         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
289         int     rc;
290         struct ibv_sge list = {
291                 .addr   = (uintptr_t) NULL,
292                 .length = pctx->opts.recv_bufsize,
293                 .lkey   = pconn->mr_recv->lkey
294         };
295         struct ibv_recv_wr wr = {
296                 .wr_id      = 0,
297                 .sg_list    = &list,
298                 .num_sge    = 1,
299         };
300         struct ibv_recv_wr *bad_wr;
301
302         DEBUG(10, ("ibw_refill_cq_recv(cmid: %u)\n", (uint32_t)pconn->cm_id));
303
304         list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
305         wr.wr_id = pconn->recv_index;
306         pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
307
308         rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
309         if (rc) {
310                 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
311                 DEBUG(0, (ibw_lasterr));
312                 return -2;
313         }
314
315         return 0;
316 }
317
318 static int ibw_fill_cq(struct ibw_conn *conn)
319 {
320         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
321         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
322         int     i, rc;
323         struct ibv_sge list = {
324                 .addr   = (uintptr_t) NULL,
325                 .length = pctx->opts.recv_bufsize,
326                 .lkey   = pconn->mr_recv->lkey
327         };
328         struct ibv_recv_wr wr = {
329                 .wr_id      = 0,
330                 .sg_list    = &list,
331                 .num_sge    = 1,
332         };
333         struct ibv_recv_wr *bad_wr;
334
335         DEBUG(10, ("ibw_fill_cq(cmid: %u)\n", (uint32_t)pconn->cm_id));
336
337         for(i = pctx->opts.max_recv_wr; i!=0; i--) {
338                 list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
339                 wr.wr_id = pconn->recv_index;
340                 pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
341
342                 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
343                 if (rc) {
344                         sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
345                         DEBUG(0, (ibw_lasterr));
346                         return -2;
347                 }
348         }
349
350         return 0;
351 }
352
353 static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
354 {
355         struct rdma_conn_param conn_param;
356         int     rc;
357
358         DEBUG(10, ("ibw_manage_connect(cmid: %u)", (uint32_t)cma_id));
359         rc = ibw_setup_cq_qp(conn);
360         if (rc)
361                 return -1;
362
363         /* cm connect */
364         memset(&conn_param, 0, sizeof conn_param);
365         conn_param.responder_resources = 1;
366         conn_param.initiator_depth = 1;
367         conn_param.retry_count = 10;
368
369         rc = rdma_connect(cma_id, &conn_param);
370         if (rc)
371                 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
372
373         return rc;
374 }
375
376 static void ibw_event_handler_cm(struct event_context *ev,
377         struct fd_event *fde, uint16_t flags, void *private_data)
378 {
379         int     rc;
380         struct ibw_ctx  *ctx = talloc_get_type(private_data, struct ibw_ctx);
381         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
382         struct ibw_conn *conn = NULL;
383         struct ibw_conn_priv *pconn = NULL;
384         struct rdma_cm_id *cma_id = NULL;
385         struct rdma_cm_event *event = NULL;
386
387         assert(ctx!=NULL);
388
389         rc = rdma_get_cm_event(pctx->cm_channel, &event);
390         if (rc) {
391                 ctx->state = IBWS_ERROR;
392                 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
393                 goto error;
394         }
395         cma_id = event->id;
396
397         DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id,
398                   (cma_id == pctx->cm_id) ? "parent" : "child"));
399
400         switch (event->event) {
401         case RDMA_CM_EVENT_ADDR_RESOLVED:
402                 /* continuing from ibw_connect ... */
403                 rc = rdma_resolve_route(cma_id, 2000);
404                 if (rc) {
405                         sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
406                         goto error;
407                 }
408                 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
409                 break;
410
411         case RDMA_CM_EVENT_ROUTE_RESOLVED:
412                 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
413                 assert(cma_id->context!=NULL);
414                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
415
416                 rc = ibw_manage_connect(conn, cma_id);
417                 if (rc)
418                         goto error;
419
420                 break;
421
422         case RDMA_CM_EVENT_CONNECT_REQUEST:
423                 ctx->state = IBWS_CONNECT_REQUEST;
424                 conn = ibw_conn_new(ctx);
425                 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
426                 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
427                 cma_id->context = (void *)conn;
428                 DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
429
430                 conn->state = IBWC_INIT;
431                 pctx->connstate_func(ctx, conn);
432
433                 /* continued at ibw_accept when invoked by the func above */
434                 if (!pconn->is_accepted) {
435                         talloc_free(conn);
436                         DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
437                 } else {
438                         if (ibw_setup_cq_qp(conn))
439                                 goto error;
440                 }
441
442                 /* TODO: clarify whether if it's needed by upper layer: */
443                 ctx->state = IBWS_READY;
444                 pctx->connstate_func(ctx, NULL);
445
446                 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
447                 break;
448
449         case RDMA_CM_EVENT_ESTABLISHED:
450                 /* expected after ibw_accept and ibw_connect[not directly] */
451                 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id->context));
452                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
453                 assert(conn!=NULL); /* important assumption */
454
455                 /* client conn is up */
456                 conn->state = IBWC_CONNECTED;
457
458                 /* both ctx and conn have changed */
459                 pctx->connstate_func(ctx, conn);
460                 break;
461
462         case RDMA_CM_EVENT_ADDR_ERROR:
463         case RDMA_CM_EVENT_ROUTE_ERROR:
464         case RDMA_CM_EVENT_CONNECT_ERROR:
465         case RDMA_CM_EVENT_UNREACHABLE:
466         case RDMA_CM_EVENT_REJECTED:
467                 sprintf(ibw_lasterr, "cma event %d, error %d\n", event->event, event->status);
468                 goto error;
469
470         case RDMA_CM_EVENT_DISCONNECTED:
471                 if (cma_id!=pctx->cm_id) {
472                         DEBUG(0, ("client DISCONNECT event\n"));
473                         conn = talloc_get_type(cma_id->context, struct ibw_conn);
474                         conn->state = IBWC_DISCONNECTED;
475                         pctx->connstate_func(NULL, conn);
476
477                         talloc_free(conn);
478
479                         /* if we are the last... */
480                         if (ctx->conn_list==NULL)
481                                 rdma_disconnect(pctx->cm_id);
482                 } else {
483                         DEBUG(0, ("server DISCONNECT event\n"));
484                         ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
485                         /* talloc_free(ctx) should be called within or after this func */
486                         pctx->connstate_func(ctx, NULL);
487                 }
488                 break;
489
490         case RDMA_CM_EVENT_DEVICE_REMOVAL:
491                 sprintf(ibw_lasterr, "cma detected device removal!\n");
492                 goto error;
493
494         default:
495                 sprintf(ibw_lasterr, "unknown event %d\n", event->event);
496                 goto error;
497         }
498
499         if ((rc=rdma_ack_cm_event(event))) {
500                 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
501                 goto error;
502         }
503
504         return;
505 error:
506         DEBUG(0, ("cm event handler: %s", ibw_lasterr));
507         if (cma_id!=pctx->cm_id) {
508                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
509                 if (conn)
510                         conn->state = IBWC_ERROR;
511                 pctx->connstate_func(NULL, conn);
512         } else {
513                 ctx->state = IBWS_ERROR;
514                 pctx->connstate_func(ctx, NULL);
515         }
516 }
517
518 static void ibw_event_handler_verbs(struct event_context *ev,
519         struct fd_event *fde, uint16_t flags, void *private_data)
520 {
521         struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn);
522         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
523         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
524
525         struct ibv_wc wc;
526         int rc;
527         struct ibv_cq *ev_cq;
528         void          *ev_ctx;
529
530         DEBUG(10, ("ibw_event_handler_verbs(%u)\n", (uint32_t)flags));
531
532         /* TODO: check whether if it's good to have more channels here... */
533         rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
534         if (rc) {
535                 sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
536                 goto error;
537         }
538         if (ev_cq != pconn->cq) {
539                 sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
540                         (unsigned int)ev_cq, (unsigned int)pconn->cq);
541                 goto error;
542         }
543         rc = ibv_req_notify_cq(pconn->cq, 0);
544         if (rc) {
545                 sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
546                 goto error;
547         }
548
549         while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
550                 if (wc.status) {
551                         sprintf(ibw_lasterr, "cq completion failed status %d\n",
552                                 wc.status);
553                         goto error;
554                 }
555
556                 switch(wc.opcode) {
557                 case IBV_WC_SEND:
558                         DEBUG(10, ("send completion\n"));
559                         if (ibw_wc_send(conn, &wc))
560                                 goto error;
561                         break;
562
563                 case IBV_WC_RDMA_WRITE:
564                         DEBUG(10, ("rdma write completion\n"));
565                         break;
566         
567                 case IBV_WC_RDMA_READ:
568                         DEBUG(10, ("rdma read completion\n"));
569                         break;
570
571                 case IBV_WC_RECV:
572                         DEBUG(10, ("recv completion\n"));
573                         if (ibw_wc_recv(conn, &wc))
574                                 goto error;
575                         break;
576
577                 default:
578                         sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
579                         goto error;
580                 }
581         }
582         if (rc!=0) {
583                 sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
584                 goto error;
585         }
586
587         return;
588 error:
589         DEBUG(0, (ibw_lasterr));
590         conn->state = IBWC_ERROR;
591         pctx->connstate_func(NULL, conn);
592 }
593
594 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
595 {
596         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
597         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
598         struct ibw_wr   *p;
599         int     send_index;
600
601         DEBUG(10, ("ibw_wc_send(cmid: %u, wr_id: %u, bl: %u)\n",
602                 (uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
603
604         assert(pconn->cm_id->qp->qp_num==wc->qp_num);
605         assert(wc->wr_id > pctx->opts.max_recv_wr);
606         send_index = wc->wr_id - pctx->opts.max_recv_wr;
607         pconn->wr_sent--;
608
609         if (send_index < pctx->opts.max_send_wr) {
610                 DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id));
611                 p = pconn->wr_index[send_index];
612                 if (p->msg_large)
613                         ibw_free_mr(&p->msg_large, &p->mr_large);
614                 DLIST_REMOVE(pconn->wr_list_used, p);
615                 DLIST_ADD(pconn->wr_list_avail, p);
616         } else { /* "extra" request - not optimized */
617                 DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id));
618                 for(p=pconn->extra_sent; p!=NULL; p=p->next)
619                         if (p->wr_id==(int)wc->wr_id)
620                                 break;
621                 if (p==NULL) {
622                         sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id);
623                                 return -1;
624                 }
625                 ibw_free_mr(&p->msg_large, &p->mr_large);
626                 DLIST_REMOVE(pconn->extra_sent, p);
627                 DLIST_ADD(pconn->extra_avail, p);
628         }
629
630         if (pconn->queue) {
631                 char    *buf;
632
633                 DEBUG(10, ("ibw_wc_send#queue %u", (int)wc->wr_id));
634                 p = pconn->queue;
635                 DLIST_REMOVE(pconn->queue, p);
636
637                 buf = (p->msg_large!=NULL) ? p->msg_large : p->msg;
638                 ibw_send(conn, buf, p, ntohl(*(uint32_t *)buf));
639         }
640
641         return 0;
642 }
643
644 static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
645         struct ibw_part *part, char **pp, uint32_t add_len, int info)
646 {
647         DEBUG(10, ("ibw_append_to_part: cmid=%u, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
648                 (uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info));
649
650         /* allocate more if necessary - it's an "evergrowing" buffer... */
651         if (part->len + add_len > part->bufsize) {
652                 if (part->buf==NULL) {
653                         assert(part->len==0);
654                         part->buf = talloc_size(pconn, add_len);
655                         if (part->buf==NULL) {
656                                 sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
657                                         add_len, info);
658                                 return -1;
659                         }
660                         part->bufsize = add_len;
661                 } else {
662                         part->buf = talloc_realloc_size(pconn,
663                                 part->buf, part->len + add_len);
664                         if (part->buf==NULL) {
665                                 sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
666                                         part->len, add_len, info);
667                                 return -1;
668                         }
669                 }
670                 part->bufsize = part->len + add_len;
671         }
672
673         /* consume pp */
674         memcpy(part->buf + part->len, *pp, add_len);
675         *pp += add_len;
676         part->len += add_len;
677         part->to_read -= add_len;
678
679         return 0;
680 }
681
682 static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
683         struct ibw_part *part, uint32_t threshold)
684 {
685         DEBUG(10, ("ibw_wc_mem_threshold: cmid=%u, (bs=%u, len=%u, tr=%u), thr=%u\n",
686                 (uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, threshold));
687
688         if (part->bufsize > threshold) {
689                 DEBUG(3, ("ibw_wc_mem_threshold: cmid=%u, %u > %u\n",
690                         (uint32_t)pconn->cm_id, part->bufsize, threshold));
691                 talloc_free(part->buf);
692                 part->buf = talloc_size(pconn, threshold);
693                 if (part->buf==NULL) {
694                         sprintf(ibw_lasterr, "talloc_size failed\n");
695                         return -1;
696                 }
697                 part->bufsize = threshold;
698         }
699         return 0;
700 }
701
702 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
703 {
704         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
705         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
706         struct ibw_part *part = &pconn->part;
707         char    *p;
708         uint32_t        remain = wc->byte_len;
709
710         DEBUG(10, ("ibw_wc_recv: cmid=%u, wr_id: %u, bl: %u\n",
711                 (uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, remain));
712
713         assert(pconn->cm_id->qp->qp_num==wc->qp_num);
714         assert((int)wc->wr_id < pctx->opts.max_recv_wr);
715         assert(wc->byte_len <= pctx->opts.recv_bufsize);
716
717         p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize);
718
719         while(remain) {
720                 /* here always true: (part->len!=0 && part->to_read!=0) ||
721                         (part->len==0 && part->to_read==0) */
722                 if (part->len) { /* is there a partial msg to be continued? */
723                         int read_len = (part->to_read<=remain) ? part->to_read : remain;
724                         if (ibw_append_to_part(pconn, part, &p, read_len, 421))
725                                 goto error;
726                         remain -= read_len;
727
728                         if (part->len<=sizeof(uint32_t) && part->to_read==0) {
729                                 assert(part->len==sizeof(uint32_t));
730                                 /* set it again now... */
731                                 part->to_read = ntohl(*((uint32_t *)(part->buf)));
732                                 if (part->to_read<sizeof(uint32_t)) {
733                                         sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
734                                         goto error;
735                                 }
736                                 part->to_read -= sizeof(uint32_t); /* it's already read */
737                         }
738
739                         if (part->to_read==0) {
740                                 pctx->receive_func(conn, part->buf, part->len);
741                                 part->len = 0; /* tells not having partial data (any more) */
742                                 if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
743                                         goto error;
744                         }
745                 } else {
746                         if (remain>=sizeof(uint32_t)) {
747                                 uint32_t msglen = ntohl(*(uint32_t *)p);
748                                 if (msglen<sizeof(uint32_t)) {
749                                         sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
750                                         goto error;
751                                 }
752
753                                 /* mostly awaited case: */
754                                 if (msglen<=remain) {
755                                         pctx->receive_func(conn, p, msglen);
756                                         p += msglen;
757                                         remain -= msglen;
758                                 } else {
759                                         part->to_read = msglen;
760                                         /* part->len is already 0 */
761                                         if (ibw_append_to_part(pconn, part, &p, remain, 422))
762                                                 goto error;
763                                         remain = 0; /* to be continued ... */
764                                         /* part->to_read > 0 here */
765                                 }
766                         } else { /* edge case: */
767                                 part->to_read = sizeof(uint32_t);
768                                 /* part->len is already 0 */
769                                 if (ibw_append_to_part(pconn, part, &p, remain, 423))
770                                         goto error;
771                                 remain = 0;
772                                 /* part->to_read > 0 here */
773                         }
774                 }
775         } /* <remain> is always decreased at least by 1 */
776
777         if (ibw_refill_cq_recv(conn))
778                 goto error;
779
780         return 0;
781
782 error:
783         DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
784         return -1;
785 }
786
787 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
788 {
789         int     i;
790         const char *name, *value;
791
792         DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr));
793
794         opts->max_send_wr = 256;
795         opts->max_recv_wr = 1024;
796         opts->avg_send_size = 1024;
797         opts->recv_bufsize = 256;
798         opts->recv_threshold = 1 * 1024 * 1024;
799
800         for(i=0; i<nattr; i++) {
801                 name = attr[i].name;
802                 value = attr[i].value;
803
804                 assert(name!=NULL && value!=NULL);
805                 if (strcmp(name, "max_send_wr")==0)
806                         opts->max_send_wr = atoi(value);
807                 else if (strcmp(name, "max_recv_wr")==0)
808                         opts->max_recv_wr = atoi(value);
809                 else if (strcmp(name, "avg_send_size")==0)
810                         opts->avg_send_size = atoi(value);
811                 else if (strcmp(name, "recv_bufsize")==0)
812                         opts->recv_bufsize = atoi(value);
813                 else if (strcmp(name, "recv_threshold")==0)
814                         opts->recv_threshold = atoi(value);
815                 else {
816                         sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
817                         return -1;
818                 }
819         }
820         return 0;
821 }
822
823 struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
824         void *ctx_userdata,
825         ibw_connstate_fn_t ibw_connstate,
826         ibw_receive_fn_t ibw_receive,
827         struct event_context *ectx)
828 {
829         struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
830         struct ibw_ctx_priv *pctx;
831         int     rc;
832
833         DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n",
834                 (uint32_t)ctx_userdata, (uint32_t)ectx));
835
836         /* initialize basic data structures */
837         memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
838
839         assert(ctx!=NULL);
840         ibw_lasterr[0] = '\0';
841         talloc_set_destructor(ctx, ibw_ctx_destruct);
842         ctx->ctx_userdata = ctx_userdata;
843
844         pctx = talloc_zero(ctx, struct ibw_ctx_priv);
845         talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
846         ctx->internal = (void *)pctx;
847         assert(pctx!=NULL);
848
849         pctx->connstate_func = ibw_connstate;
850         pctx->receive_func = ibw_receive;
851
852         pctx->ectx = ectx;
853
854         /* process attributes */
855         if (ibw_process_init_attrs(attr, nattr, &pctx->opts))
856                 goto cleanup;
857
858         /* init cm */
859         pctx->cm_channel = rdma_create_event_channel();
860         if (!pctx->cm_channel) {
861                 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
862                 goto cleanup;
863         }
864
865         pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
866                 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
867
868         rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);
869         if (rc) {
870                 rc = errno;
871                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
872                 goto cleanup;
873         }
874         DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
875
876         /* init verbs */
877         pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
878         if (!pctx->pd) {
879                 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
880                 goto cleanup;
881         }
882         DEBUG(10, ("created pd %p\n", pctx->pd));
883
884         pctx->pagesize = sysconf(_SC_PAGESIZE);
885
886         return ctx;
887         /* don't put code here */
888 cleanup:
889         DEBUG(0, (ibw_lasterr));
890
891         if (ctx)
892                 talloc_free(ctx);
893
894         return NULL;
895 }
896
897 int ibw_stop(struct ibw_ctx *ctx)
898 {
899         struct ibw_conn *p;
900
901         DEBUG(10, ("ibw_stop\n"));
902         for(p=ctx->conn_list; p!=NULL; p=p->next) {
903                 if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {
904                         if (ibw_disconnect(p))
905                                 return -1;
906                 }
907         }
908
909         return 0;
910 }
911
912 int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr)
913 {
914         struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;
915         int     rc;
916
917         DEBUG(10, ("ibw_bind: addr=%s, port=%u\n",
918                 inet_ntoa(my_addr->sin_addr), my_addr->sin_port));
919         rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
920         if (rc) {
921                 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
922                 DEBUG(0, (ibw_lasterr));
923                 return rc;
924         }
925         DEBUG(10, ("rdma_bind_addr successful\n"));
926
927         return 0;
928 }
929
930 int ibw_listen(struct ibw_ctx *ctx, int backlog)
931 {
932         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
933         int     rc;
934
935         DEBUG(10, ("ibw_listen\n"));
936         rc = rdma_listen(pctx->cm_id, backlog);
937         if (rc) {
938                 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
939                 DEBUG(0, (ibw_lasterr));
940                 return rc;
941         }       
942
943         return 0;
944 }
945
946 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
947 {
948         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
949         struct rdma_conn_param  conn_param;
950         int     rc;
951
952         DEBUG(10, ("ibw_accept: cmid=%u\n", (uint32_t)pconn->cm_id));
953         conn->conn_userdata = conn_userdata;
954
955         memset(&conn_param, 0, sizeof(struct rdma_conn_param));
956         conn_param.responder_resources = 1;
957         conn_param.initiator_depth = 1;
958         rc = rdma_accept(pconn->cm_id, &conn_param);
959         if (rc) {
960                 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
961                 DEBUG(0, (ibw_lasterr));
962                 return -1;;
963         }
964
965         pconn->is_accepted = 1;
966
967         /* continued at RDMA_CM_EVENT_ESTABLISHED */
968
969         return 0;
970 }
971
972 int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
973 {
974         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
975         struct ibw_conn *conn = NULL;
976         struct ibw_conn_priv *pconn = NULL;
977         int     rc;
978
979         DEBUG(10, ("ibw_connect: cmid=%u, addr=%s, port=%u\n", (uint32_t)pconn->cm_id,
980                 inet_ntoa(serv_addr->sin_addr), serv_addr->sin_port));
981         conn = ibw_conn_new(ctx);
982         conn->conn_userdata = conn_userdata;
983         pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
984
985         rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
986         if (rc) {
987                 rc = errno;
988                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
989                 return rc;
990         }
991
992         rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
993         if (rc) {
994                 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
995                 DEBUG(0, (ibw_lasterr));
996                 return -1;
997         }
998
999         /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
1000
1001         return 0;
1002 }
1003
1004 int ibw_disconnect(struct ibw_conn *conn)
1005 {
1006         int     rc;
1007         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
1008         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
1009
1010         DEBUG(10, ("ibw_disconnect: cmid=%u\n", (uint32_t)pconn->cm_id));
1011
1012         rc = rdma_disconnect(pctx->cm_id);
1013         if (rc) {
1014                 sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
1015                 DEBUG(0, (ibw_lasterr));
1016                 return rc;
1017         }
1018
1019         /* continued at RDMA_CM_EVENT_DISCONNECTED */
1020
1021         return 0;
1022 }
1023
1024 int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
1025 {
1026         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
1027         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
1028         struct ibw_wr *p = pconn->wr_list_avail;
1029
1030         if (p!=NULL) {
1031                 DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%u, n=%d\n", (uint32_t)pconn->cm_id, n));
1032
1033                 DLIST_REMOVE(pconn->wr_list_avail, p);
1034                 DLIST_ADD(pconn->wr_list_used, p);
1035
1036                 if (n + sizeof(long) <= pctx->opts.avg_send_size) {
1037                         *buf = (void *)(p->msg + sizeof(long));
1038                 } else {
1039                         p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
1040                         if (!p->msg_large) {
1041                                 sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");
1042                                 goto error;
1043                         }
1044                         *buf = (void *)(p->msg_large + sizeof(long));
1045                 }
1046         } else {
1047                 DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%u, n=%d\n", (uint32_t)pconn->cm_id, n));
1048                 /* not optimized */
1049                 p = pconn->extra_avail;
1050                 if (!p) {
1051                         p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
1052                         if (p==NULL) {
1053                                 sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)", pconn->extra_max);
1054                                 goto error;
1055                         }
1056                         p->wr_id = pctx->opts.max_send_wr + pconn->extra_max;
1057                         pconn->extra_max++;
1058                         switch(pconn->extra_max) {
1059                                 case 1: DEBUG(2, ("warning: queue performed\n")); break;
1060                                 case 10: DEBUG(0, ("warning: queue reached 10\n")); break;
1061                                 case 100: DEBUG(0, ("warning: queue reached 100\n")); break;
1062                                 case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break;
1063                                 default: break;
1064                         }
1065                 }
1066                 DLIST_REMOVE(pconn->extra_avail, p);
1067
1068                 p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
1069                 if (!p->msg_large) {
1070                         sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed");
1071                         goto error;
1072                 }
1073                 *buf = (void *)(p->msg_large + sizeof(long));
1074         }
1075
1076         *key = (void *)p;
1077
1078         return 0;
1079 error:
1080         DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr));
1081         return -1;
1082 }
1083
1084
1085 int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
1086 {
1087         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
1088         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
1089         struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
1090         int     rc;
1091
1092         *((uint32_t *)buf) = htonl(n);
1093
1094         /* can we send it right now? */
1095         if (pconn->wr_sent<=pctx->opts.max_send_wr) {
1096                 struct ibv_sge list = {
1097                         .addr   = (uintptr_t) NULL,
1098                         .length = n,
1099                         .lkey   = 0
1100                 };
1101                 struct ibv_send_wr wr = {
1102                         .wr_id      = p->wr_id + pctx->opts.max_recv_wr,
1103                         .sg_list    = &list,
1104                         .num_sge    = 1,
1105                         .opcode     = IBV_WR_SEND,
1106                         .send_flags = IBV_SEND_SIGNALED,
1107                 };
1108                 struct ibv_send_wr *bad_wr;
1109
1110                 DEBUG(10, ("ibw_wc_send#1(cmid: %u, wrid: %u, n: %d)\n",
1111                         (uint32_t)pconn->cm_id, (uint32_t)wr.wr_id, n));
1112
1113                 if (p->msg_large==NULL) {
1114                         list.lkey = pconn->mr_send->lkey;
1115                         list.addr = (uintptr_t) p->msg;
1116                 } else {
1117                         assert(p->mr_large!=NULL);
1118                         list.lkey = p->mr_large->lkey;
1119                         list.addr = (uintptr_t) p->msg_large;
1120                 }
1121
1122                 rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
1123                 if (rc) {
1124                         sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n",
1125                                 rc, pconn->wr_sent);
1126                         DEBUG(0, (ibw_lasterr));
1127                 } else
1128                         pconn->wr_sent++;
1129                 return rc;
1130         } /* else put the request into our own queue: */
1131
1132         DEBUG(10, ("ibw_wc_send#2(cmid: %u, n %u)\n", (uint32_t)pconn->cm_id, n));
1133
1134         /* to be sent by ibw_wc_send */
1135         DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
1136
1137         return 0;
1138 }
1139
1140 const char *ibw_getLastError(void)
1141 {
1142         return ibw_lasterr;
1143 }