Merge remote-tracking branches 'regulator/topic/anatop', 'regulator/topic/arizona...
[sfrench/cifs-2.6.git] / net / sunrpc / xprtrdma / svc_rdma_sendto.c
1 /*
2  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
3  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the BSD-type
9  * license below:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions
13  * are met:
14  *
15  *      Redistributions of source code must retain the above copyright
16  *      notice, this list of conditions and the following disclaimer.
17  *
18  *      Redistributions in binary form must reproduce the above
19  *      copyright notice, this list of conditions and the following
20  *      disclaimer in the documentation and/or other materials provided
21  *      with the distribution.
22  *
23  *      Neither the name of the Network Appliance, Inc. nor the names of
24  *      its contributors may be used to endorse or promote products
25  *      derived from this software without specific prior written
26  *      permission.
27  *
28  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
29  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
31  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
32  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
33  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
34  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
35  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
36  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
37  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
38  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
39  *
40  * Author: Tom Tucker <tom@opengridcomputing.com>
41  */
42
43 #include <linux/sunrpc/debug.h>
44 #include <linux/sunrpc/rpc_rdma.h>
45 #include <linux/spinlock.h>
46 #include <asm/unaligned.h>
47 #include <rdma/ib_verbs.h>
48 #include <rdma/rdma_cm.h>
49 #include <linux/sunrpc/svc_rdma.h>
50
51 #define RPCDBG_FACILITY RPCDBG_SVCXPRT
52
53 static u32 xdr_padsize(u32 len)
54 {
55         return (len & 3) ? (4 - (len & 3)) : 0;
56 }
57
58 int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
59                      struct xdr_buf *xdr,
60                      struct svc_rdma_req_map *vec,
61                      bool write_chunk_present)
62 {
63         int sge_no;
64         u32 sge_bytes;
65         u32 page_bytes;
66         u32 page_off;
67         int page_no;
68
69         if (xdr->len !=
70             (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
71                 pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
72                 return -EIO;
73         }
74
75         /* Skip the first sge, this is for the RPCRDMA header */
76         sge_no = 1;
77
78         /* Head SGE */
79         vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
80         vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
81         sge_no++;
82
83         /* pages SGE */
84         page_no = 0;
85         page_bytes = xdr->page_len;
86         page_off = xdr->page_base;
87         while (page_bytes) {
88                 vec->sge[sge_no].iov_base =
89                         page_address(xdr->pages[page_no]) + page_off;
90                 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
91                 page_bytes -= sge_bytes;
92                 vec->sge[sge_no].iov_len = sge_bytes;
93
94                 sge_no++;
95                 page_no++;
96                 page_off = 0; /* reset for next time through loop */
97         }
98
99         /* Tail SGE */
100         if (xdr->tail[0].iov_len) {
101                 unsigned char *base = xdr->tail[0].iov_base;
102                 size_t len = xdr->tail[0].iov_len;
103                 u32 xdr_pad = xdr_padsize(xdr->page_len);
104
105                 if (write_chunk_present && xdr_pad) {
106                         base += xdr_pad;
107                         len -= xdr_pad;
108                 }
109
110                 if (len) {
111                         vec->sge[sge_no].iov_base = base;
112                         vec->sge[sge_no].iov_len = len;
113                         sge_no++;
114                 }
115         }
116
117         dprintk("svcrdma: %s: sge_no %d page_no %d "
118                 "page_base %u page_len %u head_len %zu tail_len %zu\n",
119                 __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
120                 xdr->head[0].iov_len, xdr->tail[0].iov_len);
121
122         vec->count = sge_no;
123         return 0;
124 }
125
126 static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
127                               struct xdr_buf *xdr,
128                               u32 xdr_off, size_t len, int dir)
129 {
130         struct page *page;
131         dma_addr_t dma_addr;
132         if (xdr_off < xdr->head[0].iov_len) {
133                 /* This offset is in the head */
134                 xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
135                 page = virt_to_page(xdr->head[0].iov_base);
136         } else {
137                 xdr_off -= xdr->head[0].iov_len;
138                 if (xdr_off < xdr->page_len) {
139                         /* This offset is in the page list */
140                         xdr_off += xdr->page_base;
141                         page = xdr->pages[xdr_off >> PAGE_SHIFT];
142                         xdr_off &= ~PAGE_MASK;
143                 } else {
144                         /* This offset is in the tail */
145                         xdr_off -= xdr->page_len;
146                         xdr_off += (unsigned long)
147                                 xdr->tail[0].iov_base & ~PAGE_MASK;
148                         page = virt_to_page(xdr->tail[0].iov_base);
149                 }
150         }
151         dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
152                                    min_t(size_t, PAGE_SIZE, len), dir);
153         return dma_addr;
154 }
155
156 /* Parse the RPC Call's transport header.
157  */
158 static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
159                                       struct rpcrdma_write_array **write,
160                                       struct rpcrdma_write_array **reply)
161 {
162         __be32 *p;
163
164         p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
165
166         /* Read list */
167         while (*p++ != xdr_zero)
168                 p += 5;
169
170         /* Write list */
171         if (*p != xdr_zero) {
172                 *write = (struct rpcrdma_write_array *)p;
173                 while (*p++ != xdr_zero)
174                         p += 1 + be32_to_cpu(*p) * 4;
175         } else {
176                 *write = NULL;
177                 p++;
178         }
179
180         /* Reply chunk */
181         if (*p != xdr_zero)
182                 *reply = (struct rpcrdma_write_array *)p;
183         else
184                 *reply = NULL;
185 }
186
187 /* RPC-over-RDMA Version One private extension: Remote Invalidation.
188  * Responder's choice: requester signals it can handle Send With
189  * Invalidate, and responder chooses one rkey to invalidate.
190  *
191  * Find a candidate rkey to invalidate when sending a reply.  Picks the
192  * first rkey it finds in the chunks lists.
193  *
194  * Returns zero if RPC's chunk lists are empty.
195  */
196 static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
197                                  struct rpcrdma_write_array *wr_ary,
198                                  struct rpcrdma_write_array *rp_ary)
199 {
200         struct rpcrdma_read_chunk *rd_ary;
201         struct rpcrdma_segment *arg_ch;
202
203         rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0];
204         if (rd_ary->rc_discrim != xdr_zero)
205                 return be32_to_cpu(rd_ary->rc_target.rs_handle);
206
207         if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
208                 arg_ch = &wr_ary->wc_array[0].wc_target;
209                 return be32_to_cpu(arg_ch->rs_handle);
210         }
211
212         if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
213                 arg_ch = &rp_ary->wc_array[0].wc_target;
214                 return be32_to_cpu(arg_ch->rs_handle);
215         }
216
217         return 0;
218 }
219
220 /* Assumptions:
221  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
222  */
223 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
224                       u32 rmr, u64 to,
225                       u32 xdr_off, int write_len,
226                       struct svc_rdma_req_map *vec)
227 {
228         struct ib_rdma_wr write_wr;
229         struct ib_sge *sge;
230         int xdr_sge_no;
231         int sge_no;
232         int sge_bytes;
233         int sge_off;
234         int bc;
235         struct svc_rdma_op_ctxt *ctxt;
236
237         if (vec->count > RPCSVC_MAXPAGES) {
238                 pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
239                 return -EIO;
240         }
241
242         dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
243                 "write_len=%d, vec->sge=%p, vec->count=%lu\n",
244                 rmr, (unsigned long long)to, xdr_off,
245                 write_len, vec->sge, vec->count);
246
247         ctxt = svc_rdma_get_context(xprt);
248         ctxt->direction = DMA_TO_DEVICE;
249         sge = ctxt->sge;
250
251         /* Find the SGE associated with xdr_off */
252         for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
253              xdr_sge_no++) {
254                 if (vec->sge[xdr_sge_no].iov_len > bc)
255                         break;
256                 bc -= vec->sge[xdr_sge_no].iov_len;
257         }
258
259         sge_off = bc;
260         bc = write_len;
261         sge_no = 0;
262
263         /* Copy the remaining SGE */
264         while (bc != 0) {
265                 sge_bytes = min_t(size_t,
266                           bc, vec->sge[xdr_sge_no].iov_len-sge_off);
267                 sge[sge_no].length = sge_bytes;
268                 sge[sge_no].addr =
269                         dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
270                                     sge_bytes, DMA_TO_DEVICE);
271                 xdr_off += sge_bytes;
272                 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
273                                          sge[sge_no].addr))
274                         goto err;
275                 svc_rdma_count_mappings(xprt, ctxt);
276                 sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
277                 ctxt->count++;
278                 sge_off = 0;
279                 sge_no++;
280                 xdr_sge_no++;
281                 if (xdr_sge_no > vec->count) {
282                         pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
283                         goto err;
284                 }
285                 bc -= sge_bytes;
286                 if (sge_no == xprt->sc_max_sge)
287                         break;
288         }
289
290         /* Prepare WRITE WR */
291         memset(&write_wr, 0, sizeof write_wr);
292         ctxt->cqe.done = svc_rdma_wc_write;
293         write_wr.wr.wr_cqe = &ctxt->cqe;
294         write_wr.wr.sg_list = &sge[0];
295         write_wr.wr.num_sge = sge_no;
296         write_wr.wr.opcode = IB_WR_RDMA_WRITE;
297         write_wr.wr.send_flags = IB_SEND_SIGNALED;
298         write_wr.rkey = rmr;
299         write_wr.remote_addr = to;
300
301         /* Post It */
302         atomic_inc(&rdma_stat_write);
303         if (svc_rdma_send(xprt, &write_wr.wr))
304                 goto err;
305         return write_len - bc;
306  err:
307         svc_rdma_unmap_dma(ctxt);
308         svc_rdma_put_context(ctxt, 0);
309         return -EIO;
310 }
311
312 noinline
313 static int send_write_chunks(struct svcxprt_rdma *xprt,
314                              struct rpcrdma_write_array *wr_ary,
315                              struct rpcrdma_msg *rdma_resp,
316                              struct svc_rqst *rqstp,
317                              struct svc_rdma_req_map *vec)
318 {
319         u32 xfer_len = rqstp->rq_res.page_len;
320         int write_len;
321         u32 xdr_off;
322         int chunk_off;
323         int chunk_no;
324         int nchunks;
325         struct rpcrdma_write_array *res_ary;
326         int ret;
327
328         res_ary = (struct rpcrdma_write_array *)
329                 &rdma_resp->rm_body.rm_chunks[1];
330
331         /* Write chunks start at the pagelist */
332         nchunks = be32_to_cpu(wr_ary->wc_nchunks);
333         for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
334              xfer_len && chunk_no < nchunks;
335              chunk_no++) {
336                 struct rpcrdma_segment *arg_ch;
337                 u64 rs_offset;
338
339                 arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
340                 write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
341
342                 /* Prepare the response chunk given the length actually
343                  * written */
344                 xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
345                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
346                                                 arg_ch->rs_handle,
347                                                 arg_ch->rs_offset,
348                                                 write_len);
349                 chunk_off = 0;
350                 while (write_len) {
351                         ret = send_write(xprt, rqstp,
352                                          be32_to_cpu(arg_ch->rs_handle),
353                                          rs_offset + chunk_off,
354                                          xdr_off,
355                                          write_len,
356                                          vec);
357                         if (ret <= 0)
358                                 goto out_err;
359                         chunk_off += ret;
360                         xdr_off += ret;
361                         xfer_len -= ret;
362                         write_len -= ret;
363                 }
364         }
365         /* Update the req with the number of chunks actually used */
366         svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
367
368         return rqstp->rq_res.page_len;
369
370 out_err:
371         pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
372         return -EIO;
373 }
374
375 noinline
376 static int send_reply_chunks(struct svcxprt_rdma *xprt,
377                              struct rpcrdma_write_array *rp_ary,
378                              struct rpcrdma_msg *rdma_resp,
379                              struct svc_rqst *rqstp,
380                              struct svc_rdma_req_map *vec)
381 {
382         u32 xfer_len = rqstp->rq_res.len;
383         int write_len;
384         u32 xdr_off;
385         int chunk_no;
386         int chunk_off;
387         int nchunks;
388         struct rpcrdma_segment *ch;
389         struct rpcrdma_write_array *res_ary;
390         int ret;
391
392         /* XXX: need to fix when reply lists occur with read-list and or
393          * write-list */
394         res_ary = (struct rpcrdma_write_array *)
395                 &rdma_resp->rm_body.rm_chunks[2];
396
397         /* xdr offset starts at RPC message */
398         nchunks = be32_to_cpu(rp_ary->wc_nchunks);
399         for (xdr_off = 0, chunk_no = 0;
400              xfer_len && chunk_no < nchunks;
401              chunk_no++) {
402                 u64 rs_offset;
403                 ch = &rp_ary->wc_array[chunk_no].wc_target;
404                 write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
405
406                 /* Prepare the reply chunk given the length actually
407                  * written */
408                 xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
409                 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
410                                                 ch->rs_handle, ch->rs_offset,
411                                                 write_len);
412                 chunk_off = 0;
413                 while (write_len) {
414                         ret = send_write(xprt, rqstp,
415                                          be32_to_cpu(ch->rs_handle),
416                                          rs_offset + chunk_off,
417                                          xdr_off,
418                                          write_len,
419                                          vec);
420                         if (ret <= 0)
421                                 goto out_err;
422                         chunk_off += ret;
423                         xdr_off += ret;
424                         xfer_len -= ret;
425                         write_len -= ret;
426                 }
427         }
428         /* Update the req with the number of chunks actually used */
429         svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
430
431         return rqstp->rq_res.len;
432
433 out_err:
434         pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
435         return -EIO;
436 }
437
438 /* This function prepares the portion of the RPCRDMA message to be
439  * sent in the RDMA_SEND. This function is called after data sent via
440  * RDMA has already been transmitted. There are three cases:
441  * - The RPCRDMA header, RPC header, and payload are all sent in a
442  *   single RDMA_SEND. This is the "inline" case.
443  * - The RPCRDMA header and some portion of the RPC header and data
444  *   are sent via this RDMA_SEND and another portion of the data is
445  *   sent via RDMA.
446  * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
447  *   header and data are all transmitted via RDMA.
448  * In all three cases, this function prepares the RPCRDMA header in
449  * sge[0], the 'type' parameter indicates the type to place in the
450  * RPCRDMA header, and the 'byte_count' field indicates how much of
451  * the XDR to include in this RDMA_SEND. NB: The offset of the payload
452  * to send is zero in the XDR.
453  */
454 static int send_reply(struct svcxprt_rdma *rdma,
455                       struct svc_rqst *rqstp,
456                       struct page *page,
457                       struct rpcrdma_msg *rdma_resp,
458                       struct svc_rdma_req_map *vec,
459                       int byte_count,
460                       u32 inv_rkey)
461 {
462         struct svc_rdma_op_ctxt *ctxt;
463         struct ib_send_wr send_wr;
464         u32 xdr_off;
465         int sge_no;
466         int sge_bytes;
467         int page_no;
468         int pages;
469         int ret = -EIO;
470
471         /* Prepare the context */
472         ctxt = svc_rdma_get_context(rdma);
473         ctxt->direction = DMA_TO_DEVICE;
474         ctxt->pages[0] = page;
475         ctxt->count = 1;
476
477         /* Prepare the SGE for the RPCRDMA Header */
478         ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
479         ctxt->sge[0].length =
480             svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
481         ctxt->sge[0].addr =
482             ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
483                             ctxt->sge[0].length, DMA_TO_DEVICE);
484         if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
485                 goto err;
486         svc_rdma_count_mappings(rdma, ctxt);
487
488         ctxt->direction = DMA_TO_DEVICE;
489
490         /* Map the payload indicated by 'byte_count' */
491         xdr_off = 0;
492         for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
493                 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
494                 byte_count -= sge_bytes;
495                 ctxt->sge[sge_no].addr =
496                         dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
497                                     sge_bytes, DMA_TO_DEVICE);
498                 xdr_off += sge_bytes;
499                 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
500                                          ctxt->sge[sge_no].addr))
501                         goto err;
502                 svc_rdma_count_mappings(rdma, ctxt);
503                 ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
504                 ctxt->sge[sge_no].length = sge_bytes;
505         }
506         if (byte_count != 0) {
507                 pr_err("svcrdma: Could not map %d bytes\n", byte_count);
508                 goto err;
509         }
510
511         /* Save all respages in the ctxt and remove them from the
512          * respages array. They are our pages until the I/O
513          * completes.
514          */
515         pages = rqstp->rq_next_page - rqstp->rq_respages;
516         for (page_no = 0; page_no < pages; page_no++) {
517                 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
518                 ctxt->count++;
519                 rqstp->rq_respages[page_no] = NULL;
520         }
521         rqstp->rq_next_page = rqstp->rq_respages + 1;
522
523         if (sge_no > rdma->sc_max_sge) {
524                 pr_err("svcrdma: Too many sges (%d)\n", sge_no);
525                 goto err;
526         }
527         memset(&send_wr, 0, sizeof send_wr);
528         ctxt->cqe.done = svc_rdma_wc_send;
529         send_wr.wr_cqe = &ctxt->cqe;
530         send_wr.sg_list = ctxt->sge;
531         send_wr.num_sge = sge_no;
532         if (inv_rkey) {
533                 send_wr.opcode = IB_WR_SEND_WITH_INV;
534                 send_wr.ex.invalidate_rkey = inv_rkey;
535         } else
536                 send_wr.opcode = IB_WR_SEND;
537         send_wr.send_flags =  IB_SEND_SIGNALED;
538
539         ret = svc_rdma_send(rdma, &send_wr);
540         if (ret)
541                 goto err;
542
543         return 0;
544
545  err:
546         svc_rdma_unmap_dma(ctxt);
547         svc_rdma_put_context(ctxt, 1);
548         return ret;
549 }
550
551 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
552 {
553 }
554
555 int svc_rdma_sendto(struct svc_rqst *rqstp)
556 {
557         struct svc_xprt *xprt = rqstp->rq_xprt;
558         struct svcxprt_rdma *rdma =
559                 container_of(xprt, struct svcxprt_rdma, sc_xprt);
560         struct rpcrdma_msg *rdma_argp;
561         struct rpcrdma_msg *rdma_resp;
562         struct rpcrdma_write_array *wr_ary, *rp_ary;
563         int ret;
564         int inline_bytes;
565         struct page *res_page;
566         struct svc_rdma_req_map *vec;
567         u32 inv_rkey;
568         __be32 *p;
569
570         dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
571
572         /* Get the RDMA request header. The receive logic always
573          * places this at the start of page 0.
574          */
575         rdma_argp = page_address(rqstp->rq_pages[0]);
576         svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
577
578         inv_rkey = 0;
579         if (rdma->sc_snd_w_inv)
580                 inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
581
582         /* Build an req vec for the XDR */
583         vec = svc_rdma_get_req_map(rdma);
584         ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
585         if (ret)
586                 goto err0;
587         inline_bytes = rqstp->rq_res.len;
588
589         /* Create the RDMA response header. xprt->xpt_mutex,
590          * acquired in svc_send(), serializes RPC replies. The
591          * code path below that inserts the credit grant value
592          * into each transport header runs only inside this
593          * critical section.
594          */
595         ret = -ENOMEM;
596         res_page = alloc_page(GFP_KERNEL);
597         if (!res_page)
598                 goto err0;
599         rdma_resp = page_address(res_page);
600
601         p = &rdma_resp->rm_xid;
602         *p++ = rdma_argp->rm_xid;
603         *p++ = rdma_argp->rm_vers;
604         *p++ = rdma->sc_fc_credits;
605         *p++ = rp_ary ? rdma_nomsg : rdma_msg;
606
607         /* Start with empty chunks */
608         *p++ = xdr_zero;
609         *p++ = xdr_zero;
610         *p   = xdr_zero;
611
612         /* Send any write-chunk data and build resp write-list */
613         if (wr_ary) {
614                 ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
615                 if (ret < 0)
616                         goto err1;
617                 inline_bytes -= ret + xdr_padsize(ret);
618         }
619
620         /* Send any reply-list data and update resp reply-list */
621         if (rp_ary) {
622                 ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
623                 if (ret < 0)
624                         goto err1;
625                 inline_bytes -= ret;
626         }
627
628         /* Post a fresh Receive buffer _before_ sending the reply */
629         ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
630         if (ret)
631                 goto err1;
632
633         ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
634                          inline_bytes, inv_rkey);
635         if (ret < 0)
636                 goto err0;
637
638         svc_rdma_put_req_map(rdma, vec);
639         dprintk("svcrdma: send_reply returns %d\n", ret);
640         return ret;
641
642  err1:
643         put_page(res_page);
644  err0:
645         svc_rdma_put_req_map(rdma, vec);
646         pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
647                ret);
648         set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
649         return -ENOTCONN;
650 }
651
652 void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
653                          int status)
654 {
655         struct ib_send_wr err_wr;
656         struct page *p;
657         struct svc_rdma_op_ctxt *ctxt;
658         enum rpcrdma_errcode err;
659         __be32 *va;
660         int length;
661         int ret;
662
663         ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
664         if (ret)
665                 return;
666
667         p = alloc_page(GFP_KERNEL);
668         if (!p)
669                 return;
670         va = page_address(p);
671
672         /* XDR encode an error reply */
673         err = ERR_CHUNK;
674         if (status == -EPROTONOSUPPORT)
675                 err = ERR_VERS;
676         length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
677
678         ctxt = svc_rdma_get_context(xprt);
679         ctxt->direction = DMA_TO_DEVICE;
680         ctxt->count = 1;
681         ctxt->pages[0] = p;
682
683         /* Prepare SGE for local address */
684         ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
685         ctxt->sge[0].length = length;
686         ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
687                                             p, 0, length, DMA_TO_DEVICE);
688         if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
689                 dprintk("svcrdma: Error mapping buffer for protocol error\n");
690                 svc_rdma_put_context(ctxt, 1);
691                 return;
692         }
693         svc_rdma_count_mappings(xprt, ctxt);
694
695         /* Prepare SEND WR */
696         memset(&err_wr, 0, sizeof(err_wr));
697         ctxt->cqe.done = svc_rdma_wc_send;
698         err_wr.wr_cqe = &ctxt->cqe;
699         err_wr.sg_list = ctxt->sge;
700         err_wr.num_sge = 1;
701         err_wr.opcode = IB_WR_SEND;
702         err_wr.send_flags = IB_SEND_SIGNALED;
703
704         /* Post It */
705         ret = svc_rdma_send(xprt, &err_wr);
706         if (ret) {
707                 dprintk("svcrdma: Error %d posting send for protocol error\n",
708                         ret);
709                 svc_rdma_unmap_dma(ctxt);
710                 svc_rdma_put_context(ctxt, 1);
711         }
712 }