fa447e90346292e71110db8f718f467895f7581f
[vlendec/samba-autobuild/.git] / ctdb / ib / ibwrapper.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Wrap Infiniband calls.
4  *
5  * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
6  *
7  * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "ibwrapper.h"
25
26 #include <stdlib.h>
27 #include <string.h>
28 #include <stdio.h>
29 #include <errno.h>
30 #include <sys/types.h>
31 #include <netinet/in.h>
32 #include <sys/socket.h>
33 #include <netdb.h>
34 #include <arpa/inet.h>
35
36 #include <rdma/rdma_cma.h>
37 #include "lib/events/events.h"
38
39 #include "ibwrapper_internal.h"
40 #include "lib/util/dlinklist.h"
41
42 #define IBW_LASTERR_BUFSIZE 512
43 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
44
45 static int ibw_init_memory(ibw_conn *conn)
46 {
47         ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
48         ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
49
50         int     i, num_msg;
51         ibw_wr  *p;
52
53         /* didn't find any reason to split send & recv buffer handling */
54         num_msg = pctx->opts.max_recv_wr + pctx->opts.max_send_wr;
55
56         pconn->buf = memalign(pctx->page_size, pctx->opts.max_msg_size);
57         if (!pconn->buf) {
58                 sprintf(ibw_lasterr, "couldn't allocate work buf\n");
59                 return -1;
60         }
61         pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf, pctx->opts.bufsize, IBV_ACCESS_LOCAL_WRITE);
62         if (!pconn->mr) {
63                 sprintf(ibw_lasterr, "Couldn't allocate mr\n");
64                 return -1;
65         }
66
67         pconn->wr_index = talloc_size(pconn, num_msg * sizeof(ibw_wr *));
68
69         for(i=0; i<num_msg; i++) {
70                 p = pconn->wr_index[i] = talloc_zero(pconn, ibw_wr);
71                 p->msg = pconn->buf + (i * pconn->opts.max_msg_size);
72                 p->wr_id = i;
73
74                 DLIST_ADD(pconn->mr_list_avail, p);
75         }
76
77         return 0;
78 }
79
80 static int ibw_ctx_priv_destruct(void *ptr)
81 {
82         ibw_ctx *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
83         assert(pctx!=NULL);
84
85         if (pctx->verbs_channel) {
86                 ibv_destroy_comp_channel(pctx->verbs_channel);
87                 pctx->verbs_channel = NULL;
88         }
89
90         if (pctx->verbs_channel_event) {
91                 /* TODO: do we have to do this here? */
92                 talloc_free(pctx->verbs_channel_event);
93                 pctx->verbs_channel_event = NULL;
94         }
95
96         if (pctx->pd) {
97                 ibv_dealloc_pd(pctx->pd);
98                 pctx->pd = NULL;
99         }
100
101         /* destroy cm */
102         if (pctx->cm_channel) {
103                 rdma_destroy_event_channel(pctx->cm_channel);
104                 pctx->cm_channel = NULL;
105         }
106         if (pctx->cm_channel_event) {
107                 /* TODO: do we have to do this here? */
108                 talloc_free(pctx->cm_channel_event);
109                 pctx->cm_channel_event = NULL;
110         }
111         if (pctx->cm_id) {
112                 rdma_destroy_id(pctx->cm_id);
113                 pctx->cm_id = NULL;
114         }
115 }
116
117 static int ibw_ctx_destruct(void *ptr)
118 {
119         ibw_ctx *ctx = talloc_get_type(ptr, ibw_ctx);
120         assert(ctx!=NULL);
121
122         return 0;
123 }
124
125 static int ibw_conn_priv_destruct(void *ptr)
126 {
127         ibw_conn *pconn = talloc_get_type(ptr, ibw_conn_priv);
128         assert(pconn!=NULL);
129
130         /* free memory regions */
131         if (pconn->mr) {
132                 ibv_dereg_mr(pconn->mr);
133                 pconn->mr = NULL;
134         }
135         if (pconn->buf) {
136                 free(pconn->buf); /* memalign-ed */
137                 pconn->buf = NULL;
138         }
139
140         /* pconn->wr_index is freed by talloc */
141         /* pconn->wr_index[i] are freed by talloc */
142
143         /* destroy verbs */
144         if (pconn->cm_id->qp) {
145                 ibv_destroy_qp(pconn->qp);
146                 pconn->qp = NULL;
147         }
148         if (pconn->cq) {
149                 ibv_destroy_cq(pconn->cq);
150                 pconn->cq = NULL;
151         }
152         if (pconn->cm_id) {
153                 rdma_destroy_id(pctx->cm_id);
154                 pctx->cm_id = NULL;
155         }
156 }
157
158 static int ibw_conn_destruct(void *ptr)
159 {
160         ibw_conn *conn = talloc_get_type(ptr, ibw_conn);
161         ibw_ctx *ctx;
162
163         assert(conn!=NULL);
164         ctx = ibw_conn->ctx;
165         assert(ctx!=NULL);
166
167         DLIST_REMOVE(ctx->conn_list, conn);
168         return 0;
169 }
170
171 static ibw_conn *ibw_conn_new(ibw_ctx *ctx)
172 {
173         ibw_conn *conn;
174         ibw_conn_priv *pconn;
175
176         conn = talloc_zero(ctx, ibw_conn);
177         assert(conn!=NULL);
178         talloc_set_destructor(conn, ibw_conn_destruct);
179
180         pconn = talloc_zero(ctx, ibw_conn_priv);
181         assert(pconn!=NULL);
182         talloc_set_destructor(pconn, ibw_conn_priv_destruct);
183
184         conn->ctx = ctx;
185
186         DLIST_ADD(ctx->conn_list, conn);
187
188         return conn;
189 }
190
191 static int ibw_setup_cq_qp(ibw_conn *conn)
192 {
193         ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
194         ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
195         struct ibv_qp_init_attr init_attr;
196         int rc;
197
198         if (ibw_init_memory(conn))
199                 return -1;
200
201         pctx->cq = ibv_create_cq(conn->cm_id->verbs, pctx->opts.max_send_wr + pctx->opts.max_recv_wr,
202                 ctx, ctx->verbs_channel, 0);
203         if (cq==NULL) {
204                 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
205                 return -1;
206         }
207
208         rc = ibv_req_notify_cq(pctx->cq, 0);
209         if (rc) {
210                 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
211                 return rc;
212         }
213
214         memset(&init_attr, 0, sizeof(init_attr));
215         init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
216         init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
217         init_attr.cap.max_recv_sge = 1;
218         init_attr.cap.max_send_sge = 1;
219         init_attr.qp_type = IBV_QPT_RC;
220         init_attr.send_cq = ctx->cq;
221         init_attr.recv_cq = ctx->cq;
222
223         rc = rdma_create_qp(conn->cm_id, pctx->pd, &init_attr);
224         if (rc) {
225                 sprintf(ibw_lasterr, "rdma_create_qp (%d) failed with %d\n", is_server, rc);
226                 return rc;
227         }
228         /* elase result is in pconn->cm_id->qp */
229
230         return rc;
231 }
232
233 static void ibw_refill_cq(ibw_conn *conn)
234 {
235 }
236
237 static int ibw_manage_connect(ibw_conn *conn, struct rdma_cm_id *cma_id)
238 {
239         struct rdma_conn_param conn_param;
240         int     rc;
241
242         rc = ibw_setup_cq_qp(conn);
243         if (rc)
244                 return -1;
245
246         /* cm connect */
247         memset(&conn_param, 0, sizeof conn_param);
248         conn_param.responder_resources = 1;
249         conn_param.initiator_depth = 1;
250         conn_param.retry_count = 10;
251
252         rc = rdma_connect(cma_id, &conn_param);
253         if (rc)
254                 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
255
256         return rc;
257 }
258
259 static void ibw_event_handler_cm(struct event_context *ev,
260         struct fd_event *fde, uint16_t flags, void *private_data)
261 {
262         int     rc;
263         ibw_ctx *ctx = talloc_get_type(private_data, ibw_ctx);
264         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
265         ibw_conn *conn = NULL;
266         ibw_conn_priv *pconn = NULL;
267         struct rdma_cm_id *cma_id = NULL;
268         struct rdma_cm_event *event = NULL;
269         int     error = 0;
270
271         assert(ctx!=NULL);
272
273         rc = rdma_get_cm_event(pctx->cm_channel, &event);
274         if (rc) {
275                 ctx->state = IBWS_ERROR;
276                 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
277                 DEBUG(0, ibw_lasterr);
278                 return;
279         }
280         cma_id = event->id;
281
282         DEBUG(10, "cma_event type %d cma_id %p (%s)\n", event->event, id,
283                   (cma_id == ctx->cm_id) ? "parent" : "child");
284
285         switch (event->event) {
286         case RDMA_CM_EVENT_ADDR_RESOLVED:
287                 /* continuing from ibw_connect ... */
288                 assert(pctx->state==IWINT_INIT);
289                 pctx->state = IWINT_ADDR_RESOLVED;
290                 rc = rdma_resolve_route(cma_id, 2000);
291                 if (rc) {
292                         ctx->state = ERROR;
293                         sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
294                         DEBUG(0, ibw_lasterr);
295                 }
296                 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
297                 break;
298
299         case RDMA_CM_EVENT_ROUTE_RESOLVED:
300                 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
301                 assert(pctx->state==IWINT_ADDR_RESOLVED);
302                 pctx->state = IWINT_ROUTE_RESOLVED;
303                 conn = talloc_get_type(cma_id->context, ibw_conn);
304                 pconn = talloc_get_type(conn->internal, ibw_conn_priv);
305
306                 rc = ibw_manage_connect(conn, cma_id);
307                 if (rc)
308                         error = 1;
309
310                 break;
311
312         case RDMA_CM_EVENT_CONNECT_REQUEST:
313                 ctx->state = IBWS_CONNECT_REQUEST;
314                 conn = ibw_conn_new(ctx);
315                 pconn = talloc_get_type(conn->internal, ibw_conn_priv);
316                 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
317                 cma_id->context = (void *)conn;
318                 DEBUG(10, "pconn->cm_id %p\n", pconn->cm_id);
319
320                 conn->state = IBWC_INIT;
321
322                 pctx->connstate_func(ctx, conn);
323
324                 /* continued at ibw_accept when invoked by the func above */
325                 if (!pconn->is_accepted) {
326                         talloc_free(conn);
327                         DEBUG(10, "pconn->cm_id %p wasn't accepted\n", pconn->cm_id);
328                 } else {
329                         if (ibw_setup_cq_qp(ctx, conn))
330                                 error = 1;
331                 }
332
333                 /* TODO: clarify whether if it's needed by upper layer: */
334                 ctx->state = IBWS_READY;
335                 pctx->connstate_func(ctx, NULL);
336
337                 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
338                 break;
339
340         case RDMA_CM_EVENT_ESTABLISHED:
341                 /* expected after ibw_accept and ibw_connect[not directly] */
342                 DEBUG(0, "ESTABLISHED (conn: %u)\n", cma_id->context);
343                 conn = talloc_get_type(cma_id->context, ibw_conn);
344                 assert(conn!=NULL); /* important assumption */
345                 pconn = talloc_get_type(conn->internal, ibw_conn_priv);
346
347                 /* client conn is up */
348                 conn->state = IBWC_CONNECTED;
349
350                 /* both ctx and conn have changed */
351                 pctx->connstate_func(ctx, conn);
352                 break;
353
354         case RDMA_CM_EVENT_ADDR_ERROR:
355         case RDMA_CM_EVENT_ROUTE_ERROR:
356         case RDMA_CM_EVENT_CONNECT_ERROR:
357         case RDMA_CM_EVENT_UNREACHABLE:
358         case RDMA_CM_EVENT_REJECTED:
359                 DEBUG(0, "cma event %d, error %d\n", event->event, event->status);
360                 error = 1;
361                 break;
362
363         case RDMA_CM_EVENT_DISCONNECTED:
364                 if (cma_id!=ctx->cm_id) {
365                         DEBUG(0, "client DISCONNECT event\n");
366                         conn = talloc_get_type(cma_id->context, ibw_conn);
367                         conn->state = IBWC_DISCONNECTED;
368                         pctx->connstate_func(NULL, conn);
369
370                         talloc_free(conn);
371
372                         if (ctx->conn_list==NULL)
373                                 rdma_disconnect(ctx->cm_id);
374                 } else {
375                         DEBUG(0, "server DISCONNECT event\n");
376                         ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
377                         /* talloc_free(ctx) should be called within or after this func */
378                         pctx->connstate_func(ctx, NULL);
379                 }
380                 break;
381
382         case RDMA_CM_EVENT_DEVICE_REMOVAL:
383                 DEBUG(0, "cma detected device removal!\n");
384                 error = 1;
385                 break;
386
387         default:
388                 DEBUG(0, "unknown event %d\n", event->event);
389                 error = 1;
390                 break;
391         }
392
393         if (error) {
394                 DEBUG(0, ibw_lasterr);
395                 if (cma_id!=ctx->cm_id) {
396                         conn = talloc_get_type(cma_id->context, ibw_conn);
397                         conn->state = IBWC_ERROR;
398                         pctx->connstate_func(NULL, conn);
399                 } else {
400                         ctx->state = IBWS_ERROR;
401                         pctx->connstate_func(ctx, NULL);
402                 }
403         }
404
405         if ((rc=rdma_ack_cm_event(event))) {
406                 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n");
407                 DEBUG(0, ibw_lasterr, rc);
408         }
409 }
410
411 static void ibw_event_handler_verbs(struct event_context *ev,
412         struct fd_event *fde, uint16_t flags, void *private_data)
413 {
414         ibw_ctx *ctx = talloc_get_type(private_data, ibw_ctx);
415         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
416
417
418 }
419
420 static int ibw_process_init_attrs(ibw_initattr *attr, int nattr, ibw_opts *opts)
421 {
422         int     i, mtu;
423         char *name, *value;
424
425         opts->max_send_wr = 256;
426         opts->max_recv_wr = 1024;
427         opts->max_msg_size = 1024;
428
429         for(i=0; i<nattr; i++) {
430                 name = attr[i].name;
431                 value = attr[i].value;
432
433                 assert(name!=NULL && value!=NULL);
434                 if (strcmp(name, "max_send_wr")==0)
435                         opts->max_send_wr = atoi(value);
436                 else if (strcmp(name, "max_recv_wr")==0)
437                         opts->max_recv_wr = atoi(value);
438                 else if (strcmp(name, "max_msg_size")==0)
439                         opts->bufsize = atoi(value);
440                 else {
441                         sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
442                         return -1;
443                 }
444         }
445         return 0;
446 }
447
448 ibw_ctx *ibw_init(ibw_initattr *attr, int nattr,
449         void *ctx_userdata,
450         ibw_connstate_fn_t ibw_connstate,
451         ibw_receive_fn_t ibw_receive,
452         event_content *ectx)
453 {
454         ibw_ctx *ctx = talloc_zero(NULL, ibw_ctx);
455         ibw_ctx_priv *pctx;
456         int     rc;
457
458         /* initialize basic data structures */
459         memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
460
461         assert(ctx!=NULL);
462         ibw_lasterr[0] = '\0';
463         talloc_set_destructor(ctx, ibw_ctx_destruct);
464         ctx->userdata = userdata;
465
466         pctx = talloc_zero(ctx, ibw_ctx_priv);
467         talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
468         ctx->internal = (void *)pctx;
469         assert(pctx!=NULL);
470
471         pctx->connstate_func = ibw_connstate;
472         pctx->receive_func = ibw_receive;
473
474         pctx->ectx = ectx;
475
476         /* process attributes */
477         if (ibw_process_init_attrs(attr, nattr, pctx->opts))
478                 goto cleanup;
479
480         /* init cm */
481         pctx->cm_channel = rdma_create_event_channel();
482         if (!pctx->cm_channel) {
483                 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
484                 goto cleanup;
485         }
486
487         pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
488                 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
489
490         rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, cb, RDMA_PS_TCP);
491         if (rc) {
492                 rc = errno;
493                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
494                 goto cleanup;
495         }
496         DEBUG(10, "created cm_id %p\n", pctx->cm_id);
497
498         /* init verbs */
499         pctx->pd = ibv_alloc_pd(pctx->cmid->verbs);
500         if (!pctx->pd) {
501                 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
502                 goto cleanup;
503         }
504         DEBUG(10, "created pd %p\n", pctx->pd);
505
506         pctx->verbs_channel = ibv_create_comp_channel(cm_id->verbs);
507         if (!pctx->verbs_channel) {
508                 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
509                 goto cleanup;
510         }
511         DEBUG(10, "created channel %p\n", pctx->channel);
512
513         pctx->verbs_channel_event = event_add_fd(pctx->ectx, pctx,
514                 pctx->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, ctx);
515
516         pctx->pagesize = sysconf(_SC_PAGESIZE);
517
518         return ctx;
519         /* don't put code here */
520 cleanup:
521         DEBUG(0, ibw_lasterr);
522
523         if (ctx)
524                 talloc_free(ctx);
525
526         return NULL;
527 }
528
529 int ibw_stop(ibw_ctx *ctx)
530 {
531         ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
532
533 }
534
535 int ibw_bind(ibw_ctx *ctx, struct sockaddr_in *my_addr)
536 {
537         ibw_ctx_priv *pctx = (ibw_ctx_priv *)ctx->internal;
538         int     rc;
539
540         rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
541         if (rc) {
542                 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
543                 DEBUG(0, ibw_lasterr);
544                 return rc;
545         }
546         DEBUG(10, "rdma_bind_addr successful\n");
547
548         return 0;
549 }
550
551 int ibw_listen(ibw_ctx *ctx, int backlog)
552 {
553         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
554         int     rc;
555
556         DEBUG_LOG("rdma_listen...\n");
557         rc = rdma_listen(pctx->cm_id, backlog);
558         if (rc) {
559                 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", ret);
560                 DEBUG(0, ibw_lasterr);
561                 return rc;
562         }       
563
564         return 0;
565 }
566
567 int ibw_accept(ibw_ctx *ctx, ibw_conn *conn, void *conn_userdata)
568 {
569         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
570         ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
571         struct rdma_conn_param  conn_param;
572
573         conn->conn_userdata = conn_userdata;
574
575         memset(&conn_param, 0, sizeof(struct rdma_conn_param));
576         conn_param.responder_resources = 1;
577         conn_param.initiator_depth = 1;
578         rc = rdma_accept(pconn->cm_id, &conn_param);
579         if (rc) {
580                 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
581                 DEBUG(0, ibw_lasterr);
582                 return -1;;
583         }
584
585         pconn->is_accepted = 1;
586
587         /* continued at RDMA_CM_EVENT_ESTABLISHED */
588
589         return 0;
590 }
591
592 int ibw_connect(ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
593 {
594         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
595         ibw_conn *conn = NULL;
596         int     rc;
597
598         conn = ibw_conn_new(ctx);
599         conn->conn_userdata = conn_userdata;
600         pconn = talloc_get_type(conn->internal, ibw_conn_priv);
601
602         rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
603         if (rc) {
604                 rc = errno;
605                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
606                 return rc;
607         }
608
609         assert(ctx->state==IBWS_READY);
610
611         rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
612         if (rc) {
613                 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
614                 DEBUG(0, ibw_lasterr);
615                 return -1;
616         }
617
618         /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
619
620         return 0;
621 }
622
623 void ibw_disconnect(ibw_conn *conn)
624 {
625         ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
626         ibw_ctx *ctx = conn->ctx;
627         ibw_ctx_priv *pctx = talloc_get_type(ctx->internal);
628
629         rdma_disconnect(pctx->cm_id);
630
631         /* continued at RDMA_CM_EVENT_DISCONNECTED */
632
633         return 0;
634 }
635
636 int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize)
637 {
638         ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
639         ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
640         ibw_wr *p = pctx->wr_list_avail;
641
642         if (p==NULL) {
643                 sprintf(ibw_last_err, "insufficient wr chunks\n");
644                 return -1;
645         }
646
647         *maxsize = pctx->opts.max_msg_size;
648
649         DLIST_REMOVE(pctx->wr_list_avail, p);
650         DLIST_ADD(pctx->wr_list_used, p);
651
652         *buf = (void *)p->msg;
653         *key = (void *)p;
654
655         return pctx->buf;
656 }
657
658 int ibw_send(ibw_conn *conn, void *buf, void *key, int n)
659 {
660         ibw_ctx_priv pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
661         ibw_wr *p = talloc_get_type(key, ibw_wr);
662         struct ibv_sge list = {
663                 .addr   = (uintptr_t) p->msg,
664                 .length = n,
665                 .lkey   = pctx->mr->lkey
666         };
667         struct ibv_send_wr wr = {
668                 .wr_id      = p->wr_id,
669                 .sg_list    = &list,
670                 .num_sge    = 1,
671                 .opcode     = IBV_WR_SEND,
672                 .send_flags = IBV_SEND_SIGNALED,
673         };
674         struct ibv_send_wr *bad_wr;
675
676         assert(p->msg==(char *)buf);
677
678         p->conn = conn; /* set it only now */
679
680         return ibv_post_send(conn->qp, &wr, &bad_wr);
681 }
682
683 const char *ibw_getLastError()
684 {
685         return ibw_lasterr;
686 }