Merge tag 'for-linus' of git://git.kernel.org/pub/scm/virt/kvm/kvm
[sfrench/cifs-2.6.git] / net / sunrpc / xprt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/xprt.c
4  *
5  *  This is a generic RPC call interface supporting congestion avoidance,
6  *  and asynchronous calls.
7  *
8  *  The interface works like this:
9  *
10  *  -   When a process places a call, it allocates a request slot if
11  *      one is available. Otherwise, it sleeps on the backlog queue
12  *      (xprt_reserve).
13  *  -   Next, the caller puts together the RPC message, stuffs it into
14  *      the request struct, and calls xprt_transmit().
15  *  -   xprt_transmit sends the message and installs the caller on the
16  *      transport's wait list. At the same time, if a reply is expected,
17  *      it installs a timer that is run after the packet's timeout has
18  *      expired.
19  *  -   When a packet arrives, the data_ready handler walks the list of
20  *      pending requests for that transport. If a matching XID is found, the
21  *      caller is woken up, and the timer removed.
22  *  -   When no reply arrives within the timeout interval, the timer is
23  *      fired by the kernel and runs xprt_timer(). It either adjusts the
24  *      timeout values (minor timeout) or wakes up the caller with a status
25  *      of -ETIMEDOUT.
26  *  -   When the caller receives a notification from RPC that a reply arrived,
27  *      it should release the RPC slot, and process the reply.
28  *      If the call timed out, it may choose to retry the operation by
29  *      adjusting the initial timeout value, and simply calling rpc_call
30  *      again.
31  *
32  *  Support for async RPC is done through a set of RPC-specific scheduling
33  *  primitives that `transparently' work for processes as well as async
34  *  tasks that rely on callbacks.
35  *
36  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
37  *
38  *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
39  */
40
41 #include <linux/module.h>
42
43 #include <linux/types.h>
44 #include <linux/interrupt.h>
45 #include <linux/workqueue.h>
46 #include <linux/net.h>
47 #include <linux/ktime.h>
48
49 #include <linux/sunrpc/clnt.h>
50 #include <linux/sunrpc/metrics.h>
51 #include <linux/sunrpc/bc_xprt.h>
52 #include <linux/rcupdate.h>
53 #include <linux/sched/mm.h>
54
55 #include <trace/events/sunrpc.h>
56
57 #include "sunrpc.h"
58
59 /*
60  * Local variables
61  */
62
63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 # define RPCDBG_FACILITY        RPCDBG_XPRT
65 #endif
66
67 /*
68  * Local functions
69  */
70 static void      xprt_init(struct rpc_xprt *xprt, struct net *net);
71 static __be32   xprt_alloc_xid(struct rpc_xprt *xprt);
72 static void      xprt_destroy(struct rpc_xprt *xprt);
73 static void      xprt_request_init(struct rpc_task *task);
74
75 static DEFINE_SPINLOCK(xprt_list_lock);
76 static LIST_HEAD(xprt_list);
77
78 static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
79 {
80         unsigned long timeout = jiffies + req->rq_timeout;
81
82         if (time_before(timeout, req->rq_majortimeo))
83                 return timeout;
84         return req->rq_majortimeo;
85 }
86
87 /**
88  * xprt_register_transport - register a transport implementation
89  * @transport: transport to register
90  *
91  * If a transport implementation is loaded as a kernel module, it can
92  * call this interface to make itself known to the RPC client.
93  *
94  * Returns:
95  * 0:           transport successfully registered
96  * -EEXIST:     transport already registered
97  * -EINVAL:     transport module being unloaded
98  */
99 int xprt_register_transport(struct xprt_class *transport)
100 {
101         struct xprt_class *t;
102         int result;
103
104         result = -EEXIST;
105         spin_lock(&xprt_list_lock);
106         list_for_each_entry(t, &xprt_list, list) {
107                 /* don't register the same transport class twice */
108                 if (t->ident == transport->ident)
109                         goto out;
110         }
111
112         list_add_tail(&transport->list, &xprt_list);
113         printk(KERN_INFO "RPC: Registered %s transport module.\n",
114                transport->name);
115         result = 0;
116
117 out:
118         spin_unlock(&xprt_list_lock);
119         return result;
120 }
121 EXPORT_SYMBOL_GPL(xprt_register_transport);
122
123 /**
124  * xprt_unregister_transport - unregister a transport implementation
125  * @transport: transport to unregister
126  *
127  * Returns:
128  * 0:           transport successfully unregistered
129  * -ENOENT:     transport never registered
130  */
131 int xprt_unregister_transport(struct xprt_class *transport)
132 {
133         struct xprt_class *t;
134         int result;
135
136         result = 0;
137         spin_lock(&xprt_list_lock);
138         list_for_each_entry(t, &xprt_list, list) {
139                 if (t == transport) {
140                         printk(KERN_INFO
141                                 "RPC: Unregistered %s transport module.\n",
142                                 transport->name);
143                         list_del_init(&transport->list);
144                         goto out;
145                 }
146         }
147         result = -ENOENT;
148
149 out:
150         spin_unlock(&xprt_list_lock);
151         return result;
152 }
153 EXPORT_SYMBOL_GPL(xprt_unregister_transport);
154
155 static void
156 xprt_class_release(const struct xprt_class *t)
157 {
158         module_put(t->owner);
159 }
160
161 static const struct xprt_class *
162 xprt_class_find_by_ident_locked(int ident)
163 {
164         const struct xprt_class *t;
165
166         list_for_each_entry(t, &xprt_list, list) {
167                 if (t->ident != ident)
168                         continue;
169                 if (!try_module_get(t->owner))
170                         continue;
171                 return t;
172         }
173         return NULL;
174 }
175
176 static const struct xprt_class *
177 xprt_class_find_by_ident(int ident)
178 {
179         const struct xprt_class *t;
180
181         spin_lock(&xprt_list_lock);
182         t = xprt_class_find_by_ident_locked(ident);
183         spin_unlock(&xprt_list_lock);
184         return t;
185 }
186
187 static const struct xprt_class *
188 xprt_class_find_by_netid_locked(const char *netid)
189 {
190         const struct xprt_class *t;
191         unsigned int i;
192
193         list_for_each_entry(t, &xprt_list, list) {
194                 for (i = 0; t->netid[i][0] != '\0'; i++) {
195                         if (strcmp(t->netid[i], netid) != 0)
196                                 continue;
197                         if (!try_module_get(t->owner))
198                                 continue;
199                         return t;
200                 }
201         }
202         return NULL;
203 }
204
205 static const struct xprt_class *
206 xprt_class_find_by_netid(const char *netid)
207 {
208         const struct xprt_class *t;
209
210         spin_lock(&xprt_list_lock);
211         t = xprt_class_find_by_netid_locked(netid);
212         if (!t) {
213                 spin_unlock(&xprt_list_lock);
214                 request_module("rpc%s", netid);
215                 spin_lock(&xprt_list_lock);
216                 t = xprt_class_find_by_netid_locked(netid);
217         }
218         spin_unlock(&xprt_list_lock);
219         return t;
220 }
221
222 /**
223  * xprt_find_transport_ident - convert a netid into a transport identifier
224  * @netid: transport to load
225  *
226  * Returns:
227  * > 0:         transport identifier
228  * -ENOENT:     transport module not available
229  */
230 int xprt_find_transport_ident(const char *netid)
231 {
232         const struct xprt_class *t;
233         int ret;
234
235         t = xprt_class_find_by_netid(netid);
236         if (!t)
237                 return -ENOENT;
238         ret = t->ident;
239         xprt_class_release(t);
240         return ret;
241 }
242 EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
243
244 static void xprt_clear_locked(struct rpc_xprt *xprt)
245 {
246         xprt->snd_task = NULL;
247         if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
248                 smp_mb__before_atomic();
249                 clear_bit(XPRT_LOCKED, &xprt->state);
250                 smp_mb__after_atomic();
251         } else
252                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
253 }
254
255 /**
256  * xprt_reserve_xprt - serialize write access to transports
257  * @task: task that is requesting access to the transport
258  * @xprt: pointer to the target transport
259  *
260  * This prevents mixing the payload of separate requests, and prevents
261  * transport connects from colliding with writes.  No congestion control
262  * is provided.
263  */
264 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
265 {
266         struct rpc_rqst *req = task->tk_rqstp;
267
268         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
269                 if (task == xprt->snd_task)
270                         goto out_locked;
271                 goto out_sleep;
272         }
273         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
274                 goto out_unlock;
275         xprt->snd_task = task;
276
277 out_locked:
278         trace_xprt_reserve_xprt(xprt, task);
279         return 1;
280
281 out_unlock:
282         xprt_clear_locked(xprt);
283 out_sleep:
284         task->tk_status = -EAGAIN;
285         if  (RPC_IS_SOFT(task))
286                 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
287                                 xprt_request_timeout(req));
288         else
289                 rpc_sleep_on(&xprt->sending, task, NULL);
290         return 0;
291 }
292 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
293
294 static bool
295 xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
296 {
297         return test_bit(XPRT_CWND_WAIT, &xprt->state);
298 }
299
300 static void
301 xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
302 {
303         if (!list_empty(&xprt->xmit_queue)) {
304                 /* Peek at head of queue to see if it can make progress */
305                 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
306                                         rq_xmit)->rq_cong)
307                         return;
308         }
309         set_bit(XPRT_CWND_WAIT, &xprt->state);
310 }
311
312 static void
313 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
314 {
315         if (!RPCXPRT_CONGESTED(xprt))
316                 clear_bit(XPRT_CWND_WAIT, &xprt->state);
317 }
318
319 /*
320  * xprt_reserve_xprt_cong - serialize write access to transports
321  * @task: task that is requesting access to the transport
322  *
323  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
324  * integrated into the decision of whether a request is allowed to be
325  * woken up and given access to the transport.
326  * Note that the lock is only granted if we know there are free slots.
327  */
328 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
329 {
330         struct rpc_rqst *req = task->tk_rqstp;
331
332         if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
333                 if (task == xprt->snd_task)
334                         goto out_locked;
335                 goto out_sleep;
336         }
337         if (req == NULL) {
338                 xprt->snd_task = task;
339                 goto out_locked;
340         }
341         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
342                 goto out_unlock;
343         if (!xprt_need_congestion_window_wait(xprt)) {
344                 xprt->snd_task = task;
345                 goto out_locked;
346         }
347 out_unlock:
348         xprt_clear_locked(xprt);
349 out_sleep:
350         task->tk_status = -EAGAIN;
351         if (RPC_IS_SOFT(task))
352                 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
353                                 xprt_request_timeout(req));
354         else
355                 rpc_sleep_on(&xprt->sending, task, NULL);
356         return 0;
357 out_locked:
358         trace_xprt_reserve_cong(xprt, task);
359         return 1;
360 }
361 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
362
363 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
364 {
365         int retval;
366
367         if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
368                 return 1;
369         spin_lock(&xprt->transport_lock);
370         retval = xprt->ops->reserve_xprt(xprt, task);
371         spin_unlock(&xprt->transport_lock);
372         return retval;
373 }
374
375 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
376 {
377         struct rpc_xprt *xprt = data;
378
379         xprt->snd_task = task;
380         return true;
381 }
382
383 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
384 {
385         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
386                 return;
387         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
388                 goto out_unlock;
389         if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
390                                 __xprt_lock_write_func, xprt))
391                 return;
392 out_unlock:
393         xprt_clear_locked(xprt);
394 }
395
396 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
397 {
398         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
399                 return;
400         if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
401                 goto out_unlock;
402         if (xprt_need_congestion_window_wait(xprt))
403                 goto out_unlock;
404         if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
405                                 __xprt_lock_write_func, xprt))
406                 return;
407 out_unlock:
408         xprt_clear_locked(xprt);
409 }
410
411 /**
412  * xprt_release_xprt - allow other requests to use a transport
413  * @xprt: transport with other tasks potentially waiting
414  * @task: task that is releasing access to the transport
415  *
416  * Note that "task" can be NULL.  No congestion control is provided.
417  */
418 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
419 {
420         if (xprt->snd_task == task) {
421                 xprt_clear_locked(xprt);
422                 __xprt_lock_write_next(xprt);
423         }
424         trace_xprt_release_xprt(xprt, task);
425 }
426 EXPORT_SYMBOL_GPL(xprt_release_xprt);
427
428 /**
429  * xprt_release_xprt_cong - allow other requests to use a transport
430  * @xprt: transport with other tasks potentially waiting
431  * @task: task that is releasing access to the transport
432  *
433  * Note that "task" can be NULL.  Another task is awoken to use the
434  * transport if the transport's congestion window allows it.
435  */
436 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
437 {
438         if (xprt->snd_task == task) {
439                 xprt_clear_locked(xprt);
440                 __xprt_lock_write_next_cong(xprt);
441         }
442         trace_xprt_release_cong(xprt, task);
443 }
444 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
445
446 static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
447 {
448         if (xprt->snd_task != task)
449                 return;
450         spin_lock(&xprt->transport_lock);
451         xprt->ops->release_xprt(xprt, task);
452         spin_unlock(&xprt->transport_lock);
453 }
454
455 /*
456  * Van Jacobson congestion avoidance. Check if the congestion window
457  * overflowed. Put the task to sleep if this is the case.
458  */
459 static int
460 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
461 {
462         if (req->rq_cong)
463                 return 1;
464         trace_xprt_get_cong(xprt, req->rq_task);
465         if (RPCXPRT_CONGESTED(xprt)) {
466                 xprt_set_congestion_window_wait(xprt);
467                 return 0;
468         }
469         req->rq_cong = 1;
470         xprt->cong += RPC_CWNDSCALE;
471         return 1;
472 }
473
474 /*
475  * Adjust the congestion window, and wake up the next task
476  * that has been sleeping due to congestion
477  */
478 static void
479 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
480 {
481         if (!req->rq_cong)
482                 return;
483         req->rq_cong = 0;
484         xprt->cong -= RPC_CWNDSCALE;
485         xprt_test_and_clear_congestion_window_wait(xprt);
486         trace_xprt_put_cong(xprt, req->rq_task);
487         __xprt_lock_write_next_cong(xprt);
488 }
489
490 /**
491  * xprt_request_get_cong - Request congestion control credits
492  * @xprt: pointer to transport
493  * @req: pointer to RPC request
494  *
495  * Useful for transports that require congestion control.
496  */
497 bool
498 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
499 {
500         bool ret = false;
501
502         if (req->rq_cong)
503                 return true;
504         spin_lock(&xprt->transport_lock);
505         ret = __xprt_get_cong(xprt, req) != 0;
506         spin_unlock(&xprt->transport_lock);
507         return ret;
508 }
509 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
510
511 /**
512  * xprt_release_rqst_cong - housekeeping when request is complete
513  * @task: RPC request that recently completed
514  *
515  * Useful for transports that require congestion control.
516  */
517 void xprt_release_rqst_cong(struct rpc_task *task)
518 {
519         struct rpc_rqst *req = task->tk_rqstp;
520
521         __xprt_put_cong(req->rq_xprt, req);
522 }
523 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
524
525 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
526 {
527         if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
528                 __xprt_lock_write_next_cong(xprt);
529 }
530
531 /*
532  * Clear the congestion window wait flag and wake up the next
533  * entry on xprt->sending
534  */
535 static void
536 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
537 {
538         if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
539                 spin_lock(&xprt->transport_lock);
540                 __xprt_lock_write_next_cong(xprt);
541                 spin_unlock(&xprt->transport_lock);
542         }
543 }
544
545 /**
546  * xprt_adjust_cwnd - adjust transport congestion window
547  * @xprt: pointer to xprt
548  * @task: recently completed RPC request used to adjust window
549  * @result: result code of completed RPC request
550  *
551  * The transport code maintains an estimate on the maximum number of out-
552  * standing RPC requests, using a smoothed version of the congestion
553  * avoidance implemented in 44BSD. This is basically the Van Jacobson
554  * congestion algorithm: If a retransmit occurs, the congestion window is
555  * halved; otherwise, it is incremented by 1/cwnd when
556  *
557  *      -       a reply is received and
558  *      -       a full number of requests are outstanding and
559  *      -       the congestion window hasn't been updated recently.
560  */
561 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
562 {
563         struct rpc_rqst *req = task->tk_rqstp;
564         unsigned long cwnd = xprt->cwnd;
565
566         if (result >= 0 && cwnd <= xprt->cong) {
567                 /* The (cwnd >> 1) term makes sure
568                  * the result gets rounded properly. */
569                 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
570                 if (cwnd > RPC_MAXCWND(xprt))
571                         cwnd = RPC_MAXCWND(xprt);
572                 __xprt_lock_write_next_cong(xprt);
573         } else if (result == -ETIMEDOUT) {
574                 cwnd >>= 1;
575                 if (cwnd < RPC_CWNDSCALE)
576                         cwnd = RPC_CWNDSCALE;
577         }
578         dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
579                         xprt->cong, xprt->cwnd, cwnd);
580         xprt->cwnd = cwnd;
581         __xprt_put_cong(xprt, req);
582 }
583 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
584
585 /**
586  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
587  * @xprt: transport with waiting tasks
588  * @status: result code to plant in each task before waking it
589  *
590  */
591 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
592 {
593         if (status < 0)
594                 rpc_wake_up_status(&xprt->pending, status);
595         else
596                 rpc_wake_up(&xprt->pending);
597 }
598 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
599
600 /**
601  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
602  * @xprt: transport
603  *
604  * Note that we only set the timer for the case of RPC_IS_SOFT(), since
605  * we don't in general want to force a socket disconnection due to
606  * an incomplete RPC call transmission.
607  */
608 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
609 {
610         set_bit(XPRT_WRITE_SPACE, &xprt->state);
611 }
612 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
613
614 static bool
615 xprt_clear_write_space_locked(struct rpc_xprt *xprt)
616 {
617         if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
618                 __xprt_lock_write_next(xprt);
619                 dprintk("RPC:       write space: waking waiting task on "
620                                 "xprt %p\n", xprt);
621                 return true;
622         }
623         return false;
624 }
625
626 /**
627  * xprt_write_space - wake the task waiting for transport output buffer space
628  * @xprt: transport with waiting tasks
629  *
630  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
631  */
632 bool xprt_write_space(struct rpc_xprt *xprt)
633 {
634         bool ret;
635
636         if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
637                 return false;
638         spin_lock(&xprt->transport_lock);
639         ret = xprt_clear_write_space_locked(xprt);
640         spin_unlock(&xprt->transport_lock);
641         return ret;
642 }
643 EXPORT_SYMBOL_GPL(xprt_write_space);
644
645 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
646 {
647         s64 delta = ktime_to_ns(ktime_get() - abstime);
648         return likely(delta >= 0) ?
649                 jiffies - nsecs_to_jiffies(delta) :
650                 jiffies + nsecs_to_jiffies(-delta);
651 }
652
653 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
654 {
655         const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
656         unsigned long majortimeo = req->rq_timeout;
657
658         if (to->to_exponential)
659                 majortimeo <<= to->to_retries;
660         else
661                 majortimeo += to->to_increment * to->to_retries;
662         if (majortimeo > to->to_maxval || majortimeo == 0)
663                 majortimeo = to->to_maxval;
664         return majortimeo;
665 }
666
667 static void xprt_reset_majortimeo(struct rpc_rqst *req)
668 {
669         req->rq_majortimeo += xprt_calc_majortimeo(req);
670 }
671
672 static void xprt_reset_minortimeo(struct rpc_rqst *req)
673 {
674         req->rq_minortimeo += req->rq_timeout;
675 }
676
677 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
678 {
679         unsigned long time_init;
680         struct rpc_xprt *xprt = req->rq_xprt;
681
682         if (likely(xprt && xprt_connected(xprt)))
683                 time_init = jiffies;
684         else
685                 time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
686         req->rq_timeout = task->tk_client->cl_timeout->to_initval;
687         req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
688         req->rq_minortimeo = time_init + req->rq_timeout;
689 }
690
691 /**
692  * xprt_adjust_timeout - adjust timeout values for next retransmit
693  * @req: RPC request containing parameters to use for the adjustment
694  *
695  */
696 int xprt_adjust_timeout(struct rpc_rqst *req)
697 {
698         struct rpc_xprt *xprt = req->rq_xprt;
699         const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
700         int status = 0;
701
702         if (time_before(jiffies, req->rq_majortimeo)) {
703                 if (time_before(jiffies, req->rq_minortimeo))
704                         return status;
705                 if (to->to_exponential)
706                         req->rq_timeout <<= 1;
707                 else
708                         req->rq_timeout += to->to_increment;
709                 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
710                         req->rq_timeout = to->to_maxval;
711                 req->rq_retries++;
712         } else {
713                 req->rq_timeout = to->to_initval;
714                 req->rq_retries = 0;
715                 xprt_reset_majortimeo(req);
716                 /* Reset the RTT counters == "slow start" */
717                 spin_lock(&xprt->transport_lock);
718                 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
719                 spin_unlock(&xprt->transport_lock);
720                 status = -ETIMEDOUT;
721         }
722         xprt_reset_minortimeo(req);
723
724         if (req->rq_timeout == 0) {
725                 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
726                 req->rq_timeout = 5 * HZ;
727         }
728         return status;
729 }
730
731 static void xprt_autoclose(struct work_struct *work)
732 {
733         struct rpc_xprt *xprt =
734                 container_of(work, struct rpc_xprt, task_cleanup);
735         unsigned int pflags = memalloc_nofs_save();
736
737         trace_xprt_disconnect_auto(xprt);
738         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
739         xprt->ops->close(xprt);
740         xprt_release_write(xprt, NULL);
741         wake_up_bit(&xprt->state, XPRT_LOCKED);
742         memalloc_nofs_restore(pflags);
743 }
744
745 /**
746  * xprt_disconnect_done - mark a transport as disconnected
747  * @xprt: transport to flag for disconnect
748  *
749  */
750 void xprt_disconnect_done(struct rpc_xprt *xprt)
751 {
752         trace_xprt_disconnect_done(xprt);
753         spin_lock(&xprt->transport_lock);
754         xprt_clear_connected(xprt);
755         xprt_clear_write_space_locked(xprt);
756         xprt_clear_congestion_window_wait_locked(xprt);
757         xprt_wake_pending_tasks(xprt, -ENOTCONN);
758         spin_unlock(&xprt->transport_lock);
759 }
760 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
761
762 /**
763  * xprt_force_disconnect - force a transport to disconnect
764  * @xprt: transport to disconnect
765  *
766  */
767 void xprt_force_disconnect(struct rpc_xprt *xprt)
768 {
769         trace_xprt_disconnect_force(xprt);
770
771         /* Don't race with the test_bit() in xprt_clear_locked() */
772         spin_lock(&xprt->transport_lock);
773         set_bit(XPRT_CLOSE_WAIT, &xprt->state);
774         /* Try to schedule an autoclose RPC call */
775         if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
776                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
777         else if (xprt->snd_task)
778                 rpc_wake_up_queued_task_set_status(&xprt->pending,
779                                 xprt->snd_task, -ENOTCONN);
780         spin_unlock(&xprt->transport_lock);
781 }
782 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
783
784 static unsigned int
785 xprt_connect_cookie(struct rpc_xprt *xprt)
786 {
787         return READ_ONCE(xprt->connect_cookie);
788 }
789
790 static bool
791 xprt_request_retransmit_after_disconnect(struct rpc_task *task)
792 {
793         struct rpc_rqst *req = task->tk_rqstp;
794         struct rpc_xprt *xprt = req->rq_xprt;
795
796         return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
797                 !xprt_connected(xprt);
798 }
799
800 /**
801  * xprt_conditional_disconnect - force a transport to disconnect
802  * @xprt: transport to disconnect
803  * @cookie: 'connection cookie'
804  *
805  * This attempts to break the connection if and only if 'cookie' matches
806  * the current transport 'connection cookie'. It ensures that we don't
807  * try to break the connection more than once when we need to retransmit
808  * a batch of RPC requests.
809  *
810  */
811 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
812 {
813         /* Don't race with the test_bit() in xprt_clear_locked() */
814         spin_lock(&xprt->transport_lock);
815         if (cookie != xprt->connect_cookie)
816                 goto out;
817         if (test_bit(XPRT_CLOSING, &xprt->state))
818                 goto out;
819         set_bit(XPRT_CLOSE_WAIT, &xprt->state);
820         /* Try to schedule an autoclose RPC call */
821         if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
822                 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
823         xprt_wake_pending_tasks(xprt, -EAGAIN);
824 out:
825         spin_unlock(&xprt->transport_lock);
826 }
827
828 static bool
829 xprt_has_timer(const struct rpc_xprt *xprt)
830 {
831         return xprt->idle_timeout != 0;
832 }
833
834 static void
835 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
836         __must_hold(&xprt->transport_lock)
837 {
838         xprt->last_used = jiffies;
839         if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
840                 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
841 }
842
843 static void
844 xprt_init_autodisconnect(struct timer_list *t)
845 {
846         struct rpc_xprt *xprt = from_timer(xprt, t, timer);
847
848         if (!RB_EMPTY_ROOT(&xprt->recv_queue))
849                 return;
850         /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
851         xprt->last_used = jiffies;
852         if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
853                 return;
854         queue_work(xprtiod_workqueue, &xprt->task_cleanup);
855 }
856
857 bool xprt_lock_connect(struct rpc_xprt *xprt,
858                 struct rpc_task *task,
859                 void *cookie)
860 {
861         bool ret = false;
862
863         spin_lock(&xprt->transport_lock);
864         if (!test_bit(XPRT_LOCKED, &xprt->state))
865                 goto out;
866         if (xprt->snd_task != task)
867                 goto out;
868         xprt->snd_task = cookie;
869         ret = true;
870 out:
871         spin_unlock(&xprt->transport_lock);
872         return ret;
873 }
874
875 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
876 {
877         spin_lock(&xprt->transport_lock);
878         if (xprt->snd_task != cookie)
879                 goto out;
880         if (!test_bit(XPRT_LOCKED, &xprt->state))
881                 goto out;
882         xprt->snd_task =NULL;
883         xprt->ops->release_xprt(xprt, NULL);
884         xprt_schedule_autodisconnect(xprt);
885 out:
886         spin_unlock(&xprt->transport_lock);
887         wake_up_bit(&xprt->state, XPRT_LOCKED);
888 }
889
890 /**
891  * xprt_connect - schedule a transport connect operation
892  * @task: RPC task that is requesting the connect
893  *
894  */
895 void xprt_connect(struct rpc_task *task)
896 {
897         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
898
899         trace_xprt_connect(xprt);
900
901         if (!xprt_bound(xprt)) {
902                 task->tk_status = -EAGAIN;
903                 return;
904         }
905         if (!xprt_lock_write(xprt, task))
906                 return;
907
908         if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
909                 trace_xprt_disconnect_cleanup(xprt);
910                 xprt->ops->close(xprt);
911         }
912
913         if (!xprt_connected(xprt)) {
914                 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
915                 rpc_sleep_on_timeout(&xprt->pending, task, NULL,
916                                 xprt_request_timeout(task->tk_rqstp));
917
918                 if (test_bit(XPRT_CLOSING, &xprt->state))
919                         return;
920                 if (xprt_test_and_set_connecting(xprt))
921                         return;
922                 /* Race breaker */
923                 if (!xprt_connected(xprt)) {
924                         xprt->stat.connect_start = jiffies;
925                         xprt->ops->connect(xprt, task);
926                 } else {
927                         xprt_clear_connecting(xprt);
928                         task->tk_status = 0;
929                         rpc_wake_up_queued_task(&xprt->pending, task);
930                 }
931         }
932         xprt_release_write(xprt, task);
933 }
934
935 /**
936  * xprt_reconnect_delay - compute the wait before scheduling a connect
937  * @xprt: transport instance
938  *
939  */
940 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
941 {
942         unsigned long start, now = jiffies;
943
944         start = xprt->stat.connect_start + xprt->reestablish_timeout;
945         if (time_after(start, now))
946                 return start - now;
947         return 0;
948 }
949 EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
950
951 /**
952  * xprt_reconnect_backoff - compute the new re-establish timeout
953  * @xprt: transport instance
954  * @init_to: initial reestablish timeout
955  *
956  */
957 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
958 {
959         xprt->reestablish_timeout <<= 1;
960         if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
961                 xprt->reestablish_timeout = xprt->max_reconnect_timeout;
962         if (xprt->reestablish_timeout < init_to)
963                 xprt->reestablish_timeout = init_to;
964 }
965 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
966
967 enum xprt_xid_rb_cmp {
968         XID_RB_EQUAL,
969         XID_RB_LEFT,
970         XID_RB_RIGHT,
971 };
972 static enum xprt_xid_rb_cmp
973 xprt_xid_cmp(__be32 xid1, __be32 xid2)
974 {
975         if (xid1 == xid2)
976                 return XID_RB_EQUAL;
977         if ((__force u32)xid1 < (__force u32)xid2)
978                 return XID_RB_LEFT;
979         return XID_RB_RIGHT;
980 }
981
982 static struct rpc_rqst *
983 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
984 {
985         struct rb_node *n = xprt->recv_queue.rb_node;
986         struct rpc_rqst *req;
987
988         while (n != NULL) {
989                 req = rb_entry(n, struct rpc_rqst, rq_recv);
990                 switch (xprt_xid_cmp(xid, req->rq_xid)) {
991                 case XID_RB_LEFT:
992                         n = n->rb_left;
993                         break;
994                 case XID_RB_RIGHT:
995                         n = n->rb_right;
996                         break;
997                 case XID_RB_EQUAL:
998                         return req;
999                 }
1000         }
1001         return NULL;
1002 }
1003
1004 static void
1005 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
1006 {
1007         struct rb_node **p = &xprt->recv_queue.rb_node;
1008         struct rb_node *n = NULL;
1009         struct rpc_rqst *req;
1010
1011         while (*p != NULL) {
1012                 n = *p;
1013                 req = rb_entry(n, struct rpc_rqst, rq_recv);
1014                 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
1015                 case XID_RB_LEFT:
1016                         p = &n->rb_left;
1017                         break;
1018                 case XID_RB_RIGHT:
1019                         p = &n->rb_right;
1020                         break;
1021                 case XID_RB_EQUAL:
1022                         WARN_ON_ONCE(new != req);
1023                         return;
1024                 }
1025         }
1026         rb_link_node(&new->rq_recv, n, p);
1027         rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1028 }
1029
1030 static void
1031 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1032 {
1033         rb_erase(&req->rq_recv, &xprt->recv_queue);
1034 }
1035
1036 /**
1037  * xprt_lookup_rqst - find an RPC request corresponding to an XID
1038  * @xprt: transport on which the original request was transmitted
1039  * @xid: RPC XID of incoming reply
1040  *
1041  * Caller holds xprt->queue_lock.
1042  */
1043 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
1044 {
1045         struct rpc_rqst *entry;
1046
1047         entry = xprt_request_rb_find(xprt, xid);
1048         if (entry != NULL) {
1049                 trace_xprt_lookup_rqst(xprt, xid, 0);
1050                 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1051                 return entry;
1052         }
1053
1054         dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
1055                         ntohl(xid));
1056         trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
1057         xprt->stat.bad_xids++;
1058         return NULL;
1059 }
1060 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
1061
1062 static bool
1063 xprt_is_pinned_rqst(struct rpc_rqst *req)
1064 {
1065         return atomic_read(&req->rq_pin) != 0;
1066 }
1067
1068 /**
1069  * xprt_pin_rqst - Pin a request on the transport receive list
1070  * @req: Request to pin
1071  *
1072  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
1073  * so should be holding xprt->queue_lock.
1074  */
1075 void xprt_pin_rqst(struct rpc_rqst *req)
1076 {
1077         atomic_inc(&req->rq_pin);
1078 }
1079 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1080
1081 /**
1082  * xprt_unpin_rqst - Unpin a request on the transport receive list
1083  * @req: Request to pin
1084  *
1085  * Caller should be holding xprt->queue_lock.
1086  */
1087 void xprt_unpin_rqst(struct rpc_rqst *req)
1088 {
1089         if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1090                 atomic_dec(&req->rq_pin);
1091                 return;
1092         }
1093         if (atomic_dec_and_test(&req->rq_pin))
1094                 wake_up_var(&req->rq_pin);
1095 }
1096 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1097
1098 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1099 {
1100         wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1101 }
1102
1103 static bool
1104 xprt_request_data_received(struct rpc_task *task)
1105 {
1106         return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1107                 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1108 }
1109
1110 static bool
1111 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1112 {
1113         return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1114                 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1115 }
1116
1117 /**
1118  * xprt_request_enqueue_receive - Add an request to the receive queue
1119  * @task: RPC task
1120  *
1121  */
1122 void
1123 xprt_request_enqueue_receive(struct rpc_task *task)
1124 {
1125         struct rpc_rqst *req = task->tk_rqstp;
1126         struct rpc_xprt *xprt = req->rq_xprt;
1127
1128         if (!xprt_request_need_enqueue_receive(task, req))
1129                 return;
1130
1131         xprt_request_prepare(task->tk_rqstp);
1132         spin_lock(&xprt->queue_lock);
1133
1134         /* Update the softirq receive buffer */
1135         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1136                         sizeof(req->rq_private_buf));
1137
1138         /* Add request to the receive list */
1139         xprt_request_rb_insert(xprt, req);
1140         set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1141         spin_unlock(&xprt->queue_lock);
1142
1143         /* Turn off autodisconnect */
1144         del_singleshot_timer_sync(&xprt->timer);
1145 }
1146
1147 /**
1148  * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1149  * @task: RPC task
1150  *
1151  * Caller must hold xprt->queue_lock.
1152  */
1153 static void
1154 xprt_request_dequeue_receive_locked(struct rpc_task *task)
1155 {
1156         struct rpc_rqst *req = task->tk_rqstp;
1157
1158         if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1159                 xprt_request_rb_remove(req->rq_xprt, req);
1160 }
1161
1162 /**
1163  * xprt_update_rtt - Update RPC RTT statistics
1164  * @task: RPC request that recently completed
1165  *
1166  * Caller holds xprt->queue_lock.
1167  */
1168 void xprt_update_rtt(struct rpc_task *task)
1169 {
1170         struct rpc_rqst *req = task->tk_rqstp;
1171         struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1172         unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1173         long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1174
1175         if (timer) {
1176                 if (req->rq_ntrans == 1)
1177                         rpc_update_rtt(rtt, timer, m);
1178                 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1179         }
1180 }
1181 EXPORT_SYMBOL_GPL(xprt_update_rtt);
1182
1183 /**
1184  * xprt_complete_rqst - called when reply processing is complete
1185  * @task: RPC request that recently completed
1186  * @copied: actual number of bytes received from the transport
1187  *
1188  * Caller holds xprt->queue_lock.
1189  */
1190 void xprt_complete_rqst(struct rpc_task *task, int copied)
1191 {
1192         struct rpc_rqst *req = task->tk_rqstp;
1193         struct rpc_xprt *xprt = req->rq_xprt;
1194
1195         xprt->stat.recvs++;
1196
1197         req->rq_private_buf.len = copied;
1198         /* Ensure all writes are done before we update */
1199         /* req->rq_reply_bytes_recvd */
1200         smp_wmb();
1201         req->rq_reply_bytes_recvd = copied;
1202         xprt_request_dequeue_receive_locked(task);
1203         rpc_wake_up_queued_task(&xprt->pending, task);
1204 }
1205 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1206
1207 static void xprt_timer(struct rpc_task *task)
1208 {
1209         struct rpc_rqst *req = task->tk_rqstp;
1210         struct rpc_xprt *xprt = req->rq_xprt;
1211
1212         if (task->tk_status != -ETIMEDOUT)
1213                 return;
1214
1215         trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1216         if (!req->rq_reply_bytes_recvd) {
1217                 if (xprt->ops->timer)
1218                         xprt->ops->timer(xprt, task);
1219         } else
1220                 task->tk_status = 0;
1221 }
1222
1223 /**
1224  * xprt_wait_for_reply_request_def - wait for reply
1225  * @task: pointer to rpc_task
1226  *
1227  * Set a request's retransmit timeout based on the transport's
1228  * default timeout parameters.  Used by transports that don't adjust
1229  * the retransmit timeout based on round-trip time estimation,
1230  * and put the task to sleep on the pending queue.
1231  */
1232 void xprt_wait_for_reply_request_def(struct rpc_task *task)
1233 {
1234         struct rpc_rqst *req = task->tk_rqstp;
1235
1236         rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1237                         xprt_request_timeout(req));
1238 }
1239 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1240
1241 /**
1242  * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1243  * @task: pointer to rpc_task
1244  *
1245  * Set a request's retransmit timeout using the RTT estimator,
1246  * and put the task to sleep on the pending queue.
1247  */
1248 void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1249 {
1250         int timer = task->tk_msg.rpc_proc->p_timer;
1251         struct rpc_clnt *clnt = task->tk_client;
1252         struct rpc_rtt *rtt = clnt->cl_rtt;
1253         struct rpc_rqst *req = task->tk_rqstp;
1254         unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1255         unsigned long timeout;
1256
1257         timeout = rpc_calc_rto(rtt, timer);
1258         timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1259         if (timeout > max_timeout || timeout == 0)
1260                 timeout = max_timeout;
1261         rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1262                         jiffies + timeout);
1263 }
1264 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1265
1266 /**
1267  * xprt_request_wait_receive - wait for the reply to an RPC request
1268  * @task: RPC task about to send a request
1269  *
1270  */
1271 void xprt_request_wait_receive(struct rpc_task *task)
1272 {
1273         struct rpc_rqst *req = task->tk_rqstp;
1274         struct rpc_xprt *xprt = req->rq_xprt;
1275
1276         if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1277                 return;
1278         /*
1279          * Sleep on the pending queue if we're expecting a reply.
1280          * The spinlock ensures atomicity between the test of
1281          * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1282          */
1283         spin_lock(&xprt->queue_lock);
1284         if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1285                 xprt->ops->wait_for_reply_request(task);
1286                 /*
1287                  * Send an extra queue wakeup call if the
1288                  * connection was dropped in case the call to
1289                  * rpc_sleep_on() raced.
1290                  */
1291                 if (xprt_request_retransmit_after_disconnect(task))
1292                         rpc_wake_up_queued_task_set_status(&xprt->pending,
1293                                         task, -ENOTCONN);
1294         }
1295         spin_unlock(&xprt->queue_lock);
1296 }
1297
1298 static bool
1299 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1300 {
1301         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1302 }
1303
1304 /**
1305  * xprt_request_enqueue_transmit - queue a task for transmission
1306  * @task: pointer to rpc_task
1307  *
1308  * Add a task to the transmission queue.
1309  */
1310 void
1311 xprt_request_enqueue_transmit(struct rpc_task *task)
1312 {
1313         struct rpc_rqst *pos, *req = task->tk_rqstp;
1314         struct rpc_xprt *xprt = req->rq_xprt;
1315
1316         if (xprt_request_need_enqueue_transmit(task, req)) {
1317                 req->rq_bytes_sent = 0;
1318                 spin_lock(&xprt->queue_lock);
1319                 /*
1320                  * Requests that carry congestion control credits are added
1321                  * to the head of the list to avoid starvation issues.
1322                  */
1323                 if (req->rq_cong) {
1324                         xprt_clear_congestion_window_wait(xprt);
1325                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1326                                 if (pos->rq_cong)
1327                                         continue;
1328                                 /* Note: req is added _before_ pos */
1329                                 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1330                                 INIT_LIST_HEAD(&req->rq_xmit2);
1331                                 goto out;
1332                         }
1333                 } else if (RPC_IS_SWAPPER(task)) {
1334                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1335                                 if (pos->rq_cong || pos->rq_bytes_sent)
1336                                         continue;
1337                                 if (RPC_IS_SWAPPER(pos->rq_task))
1338                                         continue;
1339                                 /* Note: req is added _before_ pos */
1340                                 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1341                                 INIT_LIST_HEAD(&req->rq_xmit2);
1342                                 goto out;
1343                         }
1344                 } else if (!req->rq_seqno) {
1345                         list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1346                                 if (pos->rq_task->tk_owner != task->tk_owner)
1347                                         continue;
1348                                 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1349                                 INIT_LIST_HEAD(&req->rq_xmit);
1350                                 goto out;
1351                         }
1352                 }
1353                 list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1354                 INIT_LIST_HEAD(&req->rq_xmit2);
1355 out:
1356                 atomic_long_inc(&xprt->xmit_queuelen);
1357                 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1358                 spin_unlock(&xprt->queue_lock);
1359         }
1360 }
1361
1362 /**
1363  * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1364  * @task: pointer to rpc_task
1365  *
1366  * Remove a task from the transmission queue
1367  * Caller must hold xprt->queue_lock
1368  */
1369 static void
1370 xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1371 {
1372         struct rpc_rqst *req = task->tk_rqstp;
1373
1374         if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1375                 return;
1376         if (!list_empty(&req->rq_xmit)) {
1377                 list_del(&req->rq_xmit);
1378                 if (!list_empty(&req->rq_xmit2)) {
1379                         struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1380                                         struct rpc_rqst, rq_xmit2);
1381                         list_del(&req->rq_xmit2);
1382                         list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1383                 }
1384         } else
1385                 list_del(&req->rq_xmit2);
1386         atomic_long_dec(&req->rq_xprt->xmit_queuelen);
1387 }
1388
1389 /**
1390  * xprt_request_dequeue_transmit - remove a task from the transmission queue
1391  * @task: pointer to rpc_task
1392  *
1393  * Remove a task from the transmission queue
1394  */
1395 static void
1396 xprt_request_dequeue_transmit(struct rpc_task *task)
1397 {
1398         struct rpc_rqst *req = task->tk_rqstp;
1399         struct rpc_xprt *xprt = req->rq_xprt;
1400
1401         spin_lock(&xprt->queue_lock);
1402         xprt_request_dequeue_transmit_locked(task);
1403         spin_unlock(&xprt->queue_lock);
1404 }
1405
1406 /**
1407  * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1408  * @task: pointer to rpc_task
1409  *
1410  * Remove a task from the transmit and receive queues, and ensure that
1411  * it is not pinned by the receive work item.
1412  */
1413 void
1414 xprt_request_dequeue_xprt(struct rpc_task *task)
1415 {
1416         struct rpc_rqst *req = task->tk_rqstp;
1417         struct rpc_xprt *xprt = req->rq_xprt;
1418
1419         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1420             test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1421             xprt_is_pinned_rqst(req)) {
1422                 spin_lock(&xprt->queue_lock);
1423                 xprt_request_dequeue_transmit_locked(task);
1424                 xprt_request_dequeue_receive_locked(task);
1425                 while (xprt_is_pinned_rqst(req)) {
1426                         set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1427                         spin_unlock(&xprt->queue_lock);
1428                         xprt_wait_on_pinned_rqst(req);
1429                         spin_lock(&xprt->queue_lock);
1430                         clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1431                 }
1432                 spin_unlock(&xprt->queue_lock);
1433         }
1434 }
1435
1436 /**
1437  * xprt_request_prepare - prepare an encoded request for transport
1438  * @req: pointer to rpc_rqst
1439  *
1440  * Calls into the transport layer to do whatever is needed to prepare
1441  * the request for transmission or receive.
1442  */
1443 void
1444 xprt_request_prepare(struct rpc_rqst *req)
1445 {
1446         struct rpc_xprt *xprt = req->rq_xprt;
1447
1448         if (xprt->ops->prepare_request)
1449                 xprt->ops->prepare_request(req);
1450 }
1451
1452 /**
1453  * xprt_request_need_retransmit - Test if a task needs retransmission
1454  * @task: pointer to rpc_task
1455  *
1456  * Test for whether a connection breakage requires the task to retransmit
1457  */
1458 bool
1459 xprt_request_need_retransmit(struct rpc_task *task)
1460 {
1461         return xprt_request_retransmit_after_disconnect(task);
1462 }
1463
1464 /**
1465  * xprt_prepare_transmit - reserve the transport before sending a request
1466  * @task: RPC task about to send a request
1467  *
1468  */
1469 bool xprt_prepare_transmit(struct rpc_task *task)
1470 {
1471         struct rpc_rqst *req = task->tk_rqstp;
1472         struct rpc_xprt *xprt = req->rq_xprt;
1473
1474         if (!xprt_lock_write(xprt, task)) {
1475                 /* Race breaker: someone may have transmitted us */
1476                 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1477                         rpc_wake_up_queued_task_set_status(&xprt->sending,
1478                                         task, 0);
1479                 return false;
1480
1481         }
1482         return true;
1483 }
1484
1485 void xprt_end_transmit(struct rpc_task *task)
1486 {
1487         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1488
1489         xprt_inject_disconnect(xprt);
1490         xprt_release_write(xprt, task);
1491 }
1492
1493 /**
1494  * xprt_request_transmit - send an RPC request on a transport
1495  * @req: pointer to request to transmit
1496  * @snd_task: RPC task that owns the transport lock
1497  *
1498  * This performs the transmission of a single request.
1499  * Note that if the request is not the same as snd_task, then it
1500  * does need to be pinned.
1501  * Returns '0' on success.
1502  */
1503 static int
1504 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1505 {
1506         struct rpc_xprt *xprt = req->rq_xprt;
1507         struct rpc_task *task = req->rq_task;
1508         unsigned int connect_cookie;
1509         int is_retrans = RPC_WAS_SENT(task);
1510         int status;
1511
1512         if (!req->rq_bytes_sent) {
1513                 if (xprt_request_data_received(task)) {
1514                         status = 0;
1515                         goto out_dequeue;
1516                 }
1517                 /* Verify that our message lies in the RPCSEC_GSS window */
1518                 if (rpcauth_xmit_need_reencode(task)) {
1519                         status = -EBADMSG;
1520                         goto out_dequeue;
1521                 }
1522                 if (RPC_SIGNALLED(task)) {
1523                         status = -ERESTARTSYS;
1524                         goto out_dequeue;
1525                 }
1526         }
1527
1528         /*
1529          * Update req->rq_ntrans before transmitting to avoid races with
1530          * xprt_update_rtt(), which needs to know that it is recording a
1531          * reply to the first transmission.
1532          */
1533         req->rq_ntrans++;
1534
1535         trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1536         connect_cookie = xprt->connect_cookie;
1537         status = xprt->ops->send_request(req);
1538         if (status != 0) {
1539                 req->rq_ntrans--;
1540                 trace_xprt_transmit(req, status);
1541                 return status;
1542         }
1543
1544         if (is_retrans) {
1545                 task->tk_client->cl_stats->rpcretrans++;
1546                 trace_xprt_retransmit(req);
1547         }
1548
1549         xprt_inject_disconnect(xprt);
1550
1551         task->tk_flags |= RPC_TASK_SENT;
1552         spin_lock(&xprt->transport_lock);
1553
1554         xprt->stat.sends++;
1555         xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1556         xprt->stat.bklog_u += xprt->backlog.qlen;
1557         xprt->stat.sending_u += xprt->sending.qlen;
1558         xprt->stat.pending_u += xprt->pending.qlen;
1559         spin_unlock(&xprt->transport_lock);
1560
1561         req->rq_connect_cookie = connect_cookie;
1562 out_dequeue:
1563         trace_xprt_transmit(req, status);
1564         xprt_request_dequeue_transmit(task);
1565         rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1566         return status;
1567 }
1568
1569 /**
1570  * xprt_transmit - send an RPC request on a transport
1571  * @task: controlling RPC task
1572  *
1573  * Attempts to drain the transmit queue. On exit, either the transport
1574  * signalled an error that needs to be handled before transmission can
1575  * resume, or @task finished transmitting, and detected that it already
1576  * received a reply.
1577  */
1578 void
1579 xprt_transmit(struct rpc_task *task)
1580 {
1581         struct rpc_rqst *next, *req = task->tk_rqstp;
1582         struct rpc_xprt *xprt = req->rq_xprt;
1583         int counter, status;
1584
1585         spin_lock(&xprt->queue_lock);
1586         counter = 0;
1587         while (!list_empty(&xprt->xmit_queue)) {
1588                 if (++counter == 20)
1589                         break;
1590                 next = list_first_entry(&xprt->xmit_queue,
1591                                 struct rpc_rqst, rq_xmit);
1592                 xprt_pin_rqst(next);
1593                 spin_unlock(&xprt->queue_lock);
1594                 status = xprt_request_transmit(next, task);
1595                 if (status == -EBADMSG && next != req)
1596                         status = 0;
1597                 spin_lock(&xprt->queue_lock);
1598                 xprt_unpin_rqst(next);
1599                 if (status == 0) {
1600                         if (!xprt_request_data_received(task) ||
1601                             test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1602                                 continue;
1603                 } else if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1604                         task->tk_status = status;
1605                 break;
1606         }
1607         spin_unlock(&xprt->queue_lock);
1608 }
1609
1610 static void xprt_complete_request_init(struct rpc_task *task)
1611 {
1612         if (task->tk_rqstp)
1613                 xprt_request_init(task);
1614 }
1615
1616 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1617 {
1618         set_bit(XPRT_CONGESTED, &xprt->state);
1619         rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1620 }
1621 EXPORT_SYMBOL_GPL(xprt_add_backlog);
1622
1623 static bool __xprt_set_rq(struct rpc_task *task, void *data)
1624 {
1625         struct rpc_rqst *req = data;
1626
1627         if (task->tk_rqstp == NULL) {
1628                 memset(req, 0, sizeof(*req));   /* mark unused */
1629                 task->tk_rqstp = req;
1630                 return true;
1631         }
1632         return false;
1633 }
1634
1635 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
1636 {
1637         if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
1638                 clear_bit(XPRT_CONGESTED, &xprt->state);
1639                 return false;
1640         }
1641         return true;
1642 }
1643 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
1644
1645 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1646 {
1647         bool ret = false;
1648
1649         if (!test_bit(XPRT_CONGESTED, &xprt->state))
1650                 goto out;
1651         spin_lock(&xprt->reserve_lock);
1652         if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1653                 xprt_add_backlog(xprt, task);
1654                 ret = true;
1655         }
1656         spin_unlock(&xprt->reserve_lock);
1657 out:
1658         return ret;
1659 }
1660
1661 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1662 {
1663         struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1664
1665         if (xprt->num_reqs >= xprt->max_reqs)
1666                 goto out;
1667         ++xprt->num_reqs;
1668         spin_unlock(&xprt->reserve_lock);
1669         req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
1670         spin_lock(&xprt->reserve_lock);
1671         if (req != NULL)
1672                 goto out;
1673         --xprt->num_reqs;
1674         req = ERR_PTR(-ENOMEM);
1675 out:
1676         return req;
1677 }
1678
1679 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1680 {
1681         if (xprt->num_reqs > xprt->min_reqs) {
1682                 --xprt->num_reqs;
1683                 kfree(req);
1684                 return true;
1685         }
1686         return false;
1687 }
1688
1689 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1690 {
1691         struct rpc_rqst *req;
1692
1693         spin_lock(&xprt->reserve_lock);
1694         if (!list_empty(&xprt->free)) {
1695                 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1696                 list_del(&req->rq_list);
1697                 goto out_init_req;
1698         }
1699         req = xprt_dynamic_alloc_slot(xprt);
1700         if (!IS_ERR(req))
1701                 goto out_init_req;
1702         switch (PTR_ERR(req)) {
1703         case -ENOMEM:
1704                 dprintk("RPC:       dynamic allocation of request slot "
1705                                 "failed! Retrying\n");
1706                 task->tk_status = -ENOMEM;
1707                 break;
1708         case -EAGAIN:
1709                 xprt_add_backlog(xprt, task);
1710                 dprintk("RPC:       waiting for request slot\n");
1711                 fallthrough;
1712         default:
1713                 task->tk_status = -EAGAIN;
1714         }
1715         spin_unlock(&xprt->reserve_lock);
1716         return;
1717 out_init_req:
1718         xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1719                                      xprt->num_reqs);
1720         spin_unlock(&xprt->reserve_lock);
1721
1722         task->tk_status = 0;
1723         task->tk_rqstp = req;
1724 }
1725 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1726
1727 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1728 {
1729         spin_lock(&xprt->reserve_lock);
1730         if (!xprt_wake_up_backlog(xprt, req) &&
1731             !xprt_dynamic_free_slot(xprt, req)) {
1732                 memset(req, 0, sizeof(*req));   /* mark unused */
1733                 list_add(&req->rq_list, &xprt->free);
1734         }
1735         spin_unlock(&xprt->reserve_lock);
1736 }
1737 EXPORT_SYMBOL_GPL(xprt_free_slot);
1738
1739 static void xprt_free_all_slots(struct rpc_xprt *xprt)
1740 {
1741         struct rpc_rqst *req;
1742         while (!list_empty(&xprt->free)) {
1743                 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1744                 list_del(&req->rq_list);
1745                 kfree(req);
1746         }
1747 }
1748
1749 struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1750                 unsigned int num_prealloc,
1751                 unsigned int max_alloc)
1752 {
1753         struct rpc_xprt *xprt;
1754         struct rpc_rqst *req;
1755         int i;
1756
1757         xprt = kzalloc(size, GFP_KERNEL);
1758         if (xprt == NULL)
1759                 goto out;
1760
1761         xprt_init(xprt, net);
1762
1763         for (i = 0; i < num_prealloc; i++) {
1764                 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1765                 if (!req)
1766                         goto out_free;
1767                 list_add(&req->rq_list, &xprt->free);
1768         }
1769         if (max_alloc > num_prealloc)
1770                 xprt->max_reqs = max_alloc;
1771         else
1772                 xprt->max_reqs = num_prealloc;
1773         xprt->min_reqs = num_prealloc;
1774         xprt->num_reqs = num_prealloc;
1775
1776         return xprt;
1777
1778 out_free:
1779         xprt_free(xprt);
1780 out:
1781         return NULL;
1782 }
1783 EXPORT_SYMBOL_GPL(xprt_alloc);
1784
1785 void xprt_free(struct rpc_xprt *xprt)
1786 {
1787         put_net(xprt->xprt_net);
1788         xprt_free_all_slots(xprt);
1789         kfree_rcu(xprt, rcu);
1790 }
1791 EXPORT_SYMBOL_GPL(xprt_free);
1792
1793 static void
1794 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1795 {
1796         req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1797 }
1798
1799 static __be32
1800 xprt_alloc_xid(struct rpc_xprt *xprt)
1801 {
1802         __be32 xid;
1803
1804         spin_lock(&xprt->reserve_lock);
1805         xid = (__force __be32)xprt->xid++;
1806         spin_unlock(&xprt->reserve_lock);
1807         return xid;
1808 }
1809
1810 static void
1811 xprt_init_xid(struct rpc_xprt *xprt)
1812 {
1813         xprt->xid = prandom_u32();
1814 }
1815
1816 static void
1817 xprt_request_init(struct rpc_task *task)
1818 {
1819         struct rpc_xprt *xprt = task->tk_xprt;
1820         struct rpc_rqst *req = task->tk_rqstp;
1821
1822         req->rq_task    = task;
1823         req->rq_xprt    = xprt;
1824         req->rq_buffer  = NULL;
1825         req->rq_xid     = xprt_alloc_xid(xprt);
1826         xprt_init_connect_cookie(req, xprt);
1827         req->rq_snd_buf.len = 0;
1828         req->rq_snd_buf.buflen = 0;
1829         req->rq_rcv_buf.len = 0;
1830         req->rq_rcv_buf.buflen = 0;
1831         req->rq_snd_buf.bvec = NULL;
1832         req->rq_rcv_buf.bvec = NULL;
1833         req->rq_release_snd_buf = NULL;
1834         xprt_init_majortimeo(task, req);
1835
1836         trace_xprt_reserve(req);
1837 }
1838
1839 static void
1840 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1841 {
1842         xprt->ops->alloc_slot(xprt, task);
1843         if (task->tk_rqstp != NULL)
1844                 xprt_request_init(task);
1845 }
1846
1847 /**
1848  * xprt_reserve - allocate an RPC request slot
1849  * @task: RPC task requesting a slot allocation
1850  *
1851  * If the transport is marked as being congested, or if no more
1852  * slots are available, place the task on the transport's
1853  * backlog queue.
1854  */
1855 void xprt_reserve(struct rpc_task *task)
1856 {
1857         struct rpc_xprt *xprt = task->tk_xprt;
1858
1859         task->tk_status = 0;
1860         if (task->tk_rqstp != NULL)
1861                 return;
1862
1863         task->tk_status = -EAGAIN;
1864         if (!xprt_throttle_congested(xprt, task))
1865                 xprt_do_reserve(xprt, task);
1866 }
1867
1868 /**
1869  * xprt_retry_reserve - allocate an RPC request slot
1870  * @task: RPC task requesting a slot allocation
1871  *
1872  * If no more slots are available, place the task on the transport's
1873  * backlog queue.
1874  * Note that the only difference with xprt_reserve is that we now
1875  * ignore the value of the XPRT_CONGESTED flag.
1876  */
1877 void xprt_retry_reserve(struct rpc_task *task)
1878 {
1879         struct rpc_xprt *xprt = task->tk_xprt;
1880
1881         task->tk_status = 0;
1882         if (task->tk_rqstp != NULL)
1883                 return;
1884
1885         task->tk_status = -EAGAIN;
1886         xprt_do_reserve(xprt, task);
1887 }
1888
1889 /**
1890  * xprt_release - release an RPC request slot
1891  * @task: task which is finished with the slot
1892  *
1893  */
1894 void xprt_release(struct rpc_task *task)
1895 {
1896         struct rpc_xprt *xprt;
1897         struct rpc_rqst *req = task->tk_rqstp;
1898
1899         if (req == NULL) {
1900                 if (task->tk_client) {
1901                         xprt = task->tk_xprt;
1902                         xprt_release_write(xprt, task);
1903                 }
1904                 return;
1905         }
1906
1907         xprt = req->rq_xprt;
1908         xprt_request_dequeue_xprt(task);
1909         spin_lock(&xprt->transport_lock);
1910         xprt->ops->release_xprt(xprt, task);
1911         if (xprt->ops->release_request)
1912                 xprt->ops->release_request(task);
1913         xprt_schedule_autodisconnect(xprt);
1914         spin_unlock(&xprt->transport_lock);
1915         if (req->rq_buffer)
1916                 xprt->ops->buf_free(task);
1917         xdr_free_bvec(&req->rq_rcv_buf);
1918         xdr_free_bvec(&req->rq_snd_buf);
1919         if (req->rq_cred != NULL)
1920                 put_rpccred(req->rq_cred);
1921         if (req->rq_release_snd_buf)
1922                 req->rq_release_snd_buf(req);
1923
1924         task->tk_rqstp = NULL;
1925         if (likely(!bc_prealloc(req)))
1926                 xprt->ops->free_slot(xprt, req);
1927         else
1928                 xprt_free_bc_request(req);
1929 }
1930
1931 #ifdef CONFIG_SUNRPC_BACKCHANNEL
1932 void
1933 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1934 {
1935         struct xdr_buf *xbufp = &req->rq_snd_buf;
1936
1937         task->tk_rqstp = req;
1938         req->rq_task = task;
1939         xprt_init_connect_cookie(req, req->rq_xprt);
1940         /*
1941          * Set up the xdr_buf length.
1942          * This also indicates that the buffer is XDR encoded already.
1943          */
1944         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
1945                 xbufp->tail[0].iov_len;
1946 }
1947 #endif
1948
1949 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
1950 {
1951         kref_init(&xprt->kref);
1952
1953         spin_lock_init(&xprt->transport_lock);
1954         spin_lock_init(&xprt->reserve_lock);
1955         spin_lock_init(&xprt->queue_lock);
1956
1957         INIT_LIST_HEAD(&xprt->free);
1958         xprt->recv_queue = RB_ROOT;
1959         INIT_LIST_HEAD(&xprt->xmit_queue);
1960 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1961         spin_lock_init(&xprt->bc_pa_lock);
1962         INIT_LIST_HEAD(&xprt->bc_pa_list);
1963 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1964         INIT_LIST_HEAD(&xprt->xprt_switch);
1965
1966         xprt->last_used = jiffies;
1967         xprt->cwnd = RPC_INITCWND;
1968         xprt->bind_index = 0;
1969
1970         rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1971         rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1972         rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1973         rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1974
1975         xprt_init_xid(xprt);
1976
1977         xprt->xprt_net = get_net(net);
1978 }
1979
1980 /**
1981  * xprt_create_transport - create an RPC transport
1982  * @args: rpc transport creation arguments
1983  *
1984  */
1985 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1986 {
1987         struct rpc_xprt *xprt;
1988         const struct xprt_class *t;
1989
1990         t = xprt_class_find_by_ident(args->ident);
1991         if (!t) {
1992                 dprintk("RPC: transport (%d) not supported\n", args->ident);
1993                 return ERR_PTR(-EIO);
1994         }
1995
1996         xprt = t->setup(args);
1997         xprt_class_release(t);
1998
1999         if (IS_ERR(xprt))
2000                 goto out;
2001         if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
2002                 xprt->idle_timeout = 0;
2003         INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
2004         if (xprt_has_timer(xprt))
2005                 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
2006         else
2007                 timer_setup(&xprt->timer, NULL, 0);
2008
2009         if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
2010                 xprt_destroy(xprt);
2011                 return ERR_PTR(-EINVAL);
2012         }
2013         xprt->servername = kstrdup(args->servername, GFP_KERNEL);
2014         if (xprt->servername == NULL) {
2015                 xprt_destroy(xprt);
2016                 return ERR_PTR(-ENOMEM);
2017         }
2018
2019         rpc_xprt_debugfs_register(xprt);
2020
2021         trace_xprt_create(xprt);
2022 out:
2023         return xprt;
2024 }
2025
2026 static void xprt_destroy_cb(struct work_struct *work)
2027 {
2028         struct rpc_xprt *xprt =
2029                 container_of(work, struct rpc_xprt, task_cleanup);
2030
2031         trace_xprt_destroy(xprt);
2032
2033         rpc_xprt_debugfs_unregister(xprt);
2034         rpc_destroy_wait_queue(&xprt->binding);
2035         rpc_destroy_wait_queue(&xprt->pending);
2036         rpc_destroy_wait_queue(&xprt->sending);
2037         rpc_destroy_wait_queue(&xprt->backlog);
2038         kfree(xprt->servername);
2039         /*
2040          * Destroy any existing back channel
2041          */
2042         xprt_destroy_backchannel(xprt, UINT_MAX);
2043
2044         /*
2045          * Tear down transport state and free the rpc_xprt
2046          */
2047         xprt->ops->destroy(xprt);
2048 }
2049
2050 /**
2051  * xprt_destroy - destroy an RPC transport, killing off all requests.
2052  * @xprt: transport to destroy
2053  *
2054  */
2055 static void xprt_destroy(struct rpc_xprt *xprt)
2056 {
2057         /*
2058          * Exclude transport connect/disconnect handlers and autoclose
2059          */
2060         wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
2061
2062         del_timer_sync(&xprt->timer);
2063
2064         /*
2065          * Destroy sockets etc from the system workqueue so they can
2066          * safely flush receive work running on rpciod.
2067          */
2068         INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
2069         schedule_work(&xprt->task_cleanup);
2070 }
2071
2072 static void xprt_destroy_kref(struct kref *kref)
2073 {
2074         xprt_destroy(container_of(kref, struct rpc_xprt, kref));
2075 }
2076
2077 /**
2078  * xprt_get - return a reference to an RPC transport.
2079  * @xprt: pointer to the transport
2080  *
2081  */
2082 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
2083 {
2084         if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2085                 return xprt;
2086         return NULL;
2087 }
2088 EXPORT_SYMBOL_GPL(xprt_get);
2089
2090 /**
2091  * xprt_put - release a reference to an RPC transport.
2092  * @xprt: pointer to the transport
2093  *
2094  */
2095 void xprt_put(struct rpc_xprt *xprt)
2096 {
2097         if (xprt != NULL)
2098                 kref_put(&xprt->kref, xprt_destroy_kref);
2099 }
2100 EXPORT_SYMBOL_GPL(xprt_put);