xprtrdma: Cull dprintk() call sites
[sfrench/cifs-2.6.git] / net / sunrpc / xprtrdma / backchannel.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2015 Oracle.  All rights reserved.
4  *
5  * Support for backward direction RPCs on RPC/RDMA.
6  */
7
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11 #include <linux/sunrpc/svc_rdma.h>
12
13 #include "xprt_rdma.h"
14 #include <trace/events/rpcrdma.h>
15
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 # define RPCDBG_FACILITY        RPCDBG_TRANS
18 #endif
19
20 #undef RPCRDMA_BACKCHANNEL_DEBUG
21
22 static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt,
23                                  unsigned int count)
24 {
25         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
26         struct rpcrdma_req *req;
27         struct rpc_rqst *rqst;
28         unsigned int i;
29
30         for (i = 0; i < (count << 1); i++) {
31                 struct rpcrdma_regbuf *rb;
32                 size_t size;
33
34                 req = rpcrdma_create_req(r_xprt);
35                 if (IS_ERR(req))
36                         return PTR_ERR(req);
37                 rqst = &req->rl_slot;
38
39                 rqst->rq_xprt = xprt;
40                 INIT_LIST_HEAD(&rqst->rq_bc_list);
41                 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
42                 spin_lock(&xprt->bc_pa_lock);
43                 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
44                 spin_unlock(&xprt->bc_pa_lock);
45
46                 size = r_xprt->rx_data.inline_rsize;
47                 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
48                 if (IS_ERR(rb))
49                         goto out_fail;
50                 req->rl_sendbuf = rb;
51                 xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
52                              min_t(size_t, size, PAGE_SIZE));
53         }
54         return 0;
55
56 out_fail:
57         rpcrdma_req_destroy(req);
58         return -ENOMEM;
59 }
60
61 /**
62  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
63  * @xprt: transport associated with these backchannel resources
64  * @reqs: number of concurrent incoming requests to expect
65  *
66  * Returns 0 on success; otherwise a negative errno
67  */
68 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
69 {
70         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
71         int rc;
72
73         /* The backchannel reply path returns each rpc_rqst to the
74          * bc_pa_list _after_ the reply is sent. If the server is
75          * faster than the client, it can send another backward
76          * direction request before the rpc_rqst is returned to the
77          * list. The client rejects the request in this case.
78          *
79          * Twice as many rpc_rqsts are prepared to ensure there is
80          * always an rpc_rqst available as soon as a reply is sent.
81          */
82         if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
83                 goto out_err;
84
85         rc = rpcrdma_bc_setup_reqs(r_xprt, reqs);
86         if (rc)
87                 goto out_free;
88
89         r_xprt->rx_buf.rb_bc_srv_max_requests = reqs;
90         trace_xprtrdma_cb_setup(r_xprt, reqs);
91         return 0;
92
93 out_free:
94         xprt_rdma_bc_destroy(xprt, reqs);
95
96 out_err:
97         pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
98         return -ENOMEM;
99 }
100
101 /**
102  * xprt_rdma_bc_up - Create transport endpoint for backchannel service
103  * @serv: server endpoint
104  * @net: network namespace
105  *
106  * The "xprt" is an implied argument: it supplies the name of the
107  * backchannel transport class.
108  *
109  * Returns zero on success, negative errno on failure
110  */
111 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
112 {
113         int ret;
114
115         ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
116         if (ret < 0)
117                 return ret;
118         return 0;
119 }
120
121 /**
122  * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
123  * @xprt: transport
124  *
125  * Returns maximum size, in bytes, of a backchannel message
126  */
127 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
128 {
129         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
130         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
131         size_t maxmsg;
132
133         maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
134         maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
135         return maxmsg - RPCRDMA_HDRLEN_MIN;
136 }
137
138 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
139 {
140         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
141         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
142         __be32 *p;
143
144         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
145         xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
146                         req->rl_rdmabuf->rg_base);
147
148         p = xdr_reserve_space(&req->rl_stream, 28);
149         if (unlikely(!p))
150                 return -EIO;
151         *p++ = rqst->rq_xid;
152         *p++ = rpcrdma_version;
153         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
154         *p++ = rdma_msg;
155         *p++ = xdr_zero;
156         *p++ = xdr_zero;
157         *p = xdr_zero;
158
159         if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN,
160                                       &rqst->rq_snd_buf, rpcrdma_noch))
161                 return -EIO;
162
163         trace_xprtrdma_cb_reply(rqst);
164         return 0;
165 }
166
167 /**
168  * xprt_rdma_bc_send_reply - marshal and send a backchannel reply
169  * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf
170  *
171  * Caller holds the transport's write lock.
172  *
173  * Returns:
174  *      %0 if the RPC message has been sent
175  *      %-ENOTCONN if the caller should reconnect and call again
176  *      %-EIO if a permanent error occurred and the request was not
177  *              sent. Do not try to send this message again.
178  */
179 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
180 {
181         struct rpc_xprt *xprt = rqst->rq_xprt;
182         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
183         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
184         int rc;
185
186         if (!xprt_connected(xprt))
187                 return -ENOTCONN;
188
189         if (!xprt_request_get_cong(xprt, rqst))
190                 return -EBADSLT;
191
192         rc = rpcrdma_bc_marshal_reply(rqst);
193         if (rc < 0)
194                 goto failed_marshal;
195
196         if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
197                 goto drop_connection;
198         return 0;
199
200 failed_marshal:
201         if (rc != -ENOTCONN)
202                 return rc;
203 drop_connection:
204         xprt_rdma_close(xprt);
205         return -ENOTCONN;
206 }
207
208 /**
209  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
210  * @xprt: transport associated with these backchannel resources
211  * @reqs: number of incoming requests to destroy; ignored
212  */
213 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
214 {
215         struct rpc_rqst *rqst, *tmp;
216
217         spin_lock(&xprt->bc_pa_lock);
218         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
219                 list_del(&rqst->rq_bc_pa_list);
220                 spin_unlock(&xprt->bc_pa_lock);
221
222                 rpcrdma_req_destroy(rpcr_to_rdmar(rqst));
223
224                 spin_lock(&xprt->bc_pa_lock);
225         }
226         spin_unlock(&xprt->bc_pa_lock);
227 }
228
229 /**
230  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
231  * @rqst: request to release
232  */
233 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
234 {
235         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
236         struct rpc_xprt *xprt = rqst->rq_xprt;
237
238         rpcrdma_recv_buffer_put(req->rl_reply);
239         req->rl_reply = NULL;
240
241         spin_lock(&xprt->bc_pa_lock);
242         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
243         spin_unlock(&xprt->bc_pa_lock);
244 }
245
246 /**
247  * rpcrdma_bc_receive_call - Handle a backward direction call
248  * @r_xprt: transport receiving the call
249  * @rep: receive buffer containing the call
250  *
251  * Operational assumptions:
252  *    o Backchannel credits are ignored, just as the NFS server
253  *      forechannel currently does
254  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
255  *      No replay detection is done at the transport level
256  */
257 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
258                              struct rpcrdma_rep *rep)
259 {
260         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
261         struct svc_serv *bc_serv;
262         struct rpcrdma_req *req;
263         struct rpc_rqst *rqst;
264         struct xdr_buf *buf;
265         size_t size;
266         __be32 *p;
267
268         p = xdr_inline_decode(&rep->rr_stream, 0);
269         size = xdr_stream_remaining(&rep->rr_stream);
270
271 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
272         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
273                 __func__, be32_to_cpup(p), size);
274         pr_info("RPC:       %s: %*ph\n", __func__, size, p);
275 #endif
276
277         /* Grab a free bc rqst */
278         spin_lock(&xprt->bc_pa_lock);
279         if (list_empty(&xprt->bc_pa_list)) {
280                 spin_unlock(&xprt->bc_pa_lock);
281                 goto out_overflow;
282         }
283         rqst = list_first_entry(&xprt->bc_pa_list,
284                                 struct rpc_rqst, rq_bc_pa_list);
285         list_del(&rqst->rq_bc_pa_list);
286         spin_unlock(&xprt->bc_pa_lock);
287
288         /* Prepare rqst */
289         rqst->rq_reply_bytes_recvd = 0;
290         rqst->rq_bytes_sent = 0;
291         rqst->rq_xid = *p;
292
293         rqst->rq_private_buf.len = size;
294
295         buf = &rqst->rq_rcv_buf;
296         memset(buf, 0, sizeof(*buf));
297         buf->head[0].iov_base = p;
298         buf->head[0].iov_len = size;
299         buf->len = size;
300
301         /* The receive buffer has to be hooked to the rpcrdma_req
302          * so that it is not released while the req is pointing
303          * to its buffer, and so that it can be reposted after
304          * the Upper Layer is done decoding it.
305          */
306         req = rpcr_to_rdmar(rqst);
307         req->rl_reply = rep;
308         trace_xprtrdma_cb_call(rqst);
309
310         /* Queue rqst for ULP's callback service */
311         bc_serv = xprt->bc_serv;
312         spin_lock(&bc_serv->sv_cb_lock);
313         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
314         spin_unlock(&bc_serv->sv_cb_lock);
315
316         wake_up(&bc_serv->sv_cb_waitq);
317
318         r_xprt->rx_stats.bcall_count++;
319         return;
320
321 out_overflow:
322         pr_warn("RPC/RDMA backchannel overflow\n");
323         xprt_force_disconnect(xprt);
324         /* This receive buffer gets reposted automatically
325          * when the connection is re-established.
326          */
327         return;
328 }