dt-bindings: reset: imx7: Fix the spelling of 'indices'
[sfrench/cifs-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/rcupdate.h>
29 #include <linux/utsname.h>
30 #include <linux/workqueue.h>
31 #include <linux/in.h>
32 #include <linux/in6.h>
33 #include <linux/un.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_encode(struct rpc_task *task);
65 static void     call_decode(struct rpc_task *task);
66 static void     call_bind(struct rpc_task *task);
67 static void     call_bind_status(struct rpc_task *task);
68 static void     call_transmit(struct rpc_task *task);
69 static void     call_status(struct rpc_task *task);
70 static void     call_transmit_status(struct rpc_task *task);
71 static void     call_refresh(struct rpc_task *task);
72 static void     call_refreshresult(struct rpc_task *task);
73 static void     call_connect(struct rpc_task *task);
74 static void     call_connect_status(struct rpc_task *task);
75
76 static int      rpc_encode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_decode_header(struct rpc_task *task,
79                                   struct xdr_stream *xdr);
80 static int      rpc_ping(struct rpc_clnt *clnt);
81 static void     rpc_check_timeout(struct rpc_task *task);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         rpc_remove_client_dir(clnt);
106 }
107
108 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
109 {
110         struct net *net = rpc_net_ns(clnt);
111         struct super_block *pipefs_sb;
112
113         pipefs_sb = rpc_get_sb_net(net);
114         if (pipefs_sb) {
115                 __rpc_clnt_remove_pipedir(clnt);
116                 rpc_put_sb_net(net);
117         }
118 }
119
120 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
121                                     struct rpc_clnt *clnt)
122 {
123         static uint32_t clntid;
124         const char *dir_name = clnt->cl_program->pipe_dir_name;
125         char name[15];
126         struct dentry *dir, *dentry;
127
128         dir = rpc_d_lookup_sb(sb, dir_name);
129         if (dir == NULL) {
130                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
131                 return dir;
132         }
133         for (;;) {
134                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
135                 name[sizeof(name) - 1] = '\0';
136                 dentry = rpc_create_client_dir(dir, name, clnt);
137                 if (!IS_ERR(dentry))
138                         break;
139                 if (dentry == ERR_PTR(-EEXIST))
140                         continue;
141                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
142                                 " %s/%s, error %ld\n",
143                                 dir_name, name, PTR_ERR(dentry));
144                 break;
145         }
146         dput(dir);
147         return dentry;
148 }
149
150 static int
151 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
152 {
153         struct dentry *dentry;
154
155         if (clnt->cl_program->pipe_dir_name != NULL) {
156                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
157                 if (IS_ERR(dentry))
158                         return PTR_ERR(dentry);
159         }
160         return 0;
161 }
162
163 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
164 {
165         if (clnt->cl_program->pipe_dir_name == NULL)
166                 return 1;
167
168         switch (event) {
169         case RPC_PIPEFS_MOUNT:
170                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
171                         return 1;
172                 if (atomic_read(&clnt->cl_count) == 0)
173                         return 1;
174                 break;
175         case RPC_PIPEFS_UMOUNT:
176                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
177                         return 1;
178                 break;
179         }
180         return 0;
181 }
182
183 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
184                                    struct super_block *sb)
185 {
186         struct dentry *dentry;
187
188         switch (event) {
189         case RPC_PIPEFS_MOUNT:
190                 dentry = rpc_setup_pipedir_sb(sb, clnt);
191                 if (!dentry)
192                         return -ENOENT;
193                 if (IS_ERR(dentry))
194                         return PTR_ERR(dentry);
195                 break;
196         case RPC_PIPEFS_UMOUNT:
197                 __rpc_clnt_remove_pipedir(clnt);
198                 break;
199         default:
200                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
201                 return -ENOTSUPP;
202         }
203         return 0;
204 }
205
206 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
207                                 struct super_block *sb)
208 {
209         int error = 0;
210
211         for (;; clnt = clnt->cl_parent) {
212                 if (!rpc_clnt_skip_event(clnt, event))
213                         error = __rpc_clnt_handle_event(clnt, event, sb);
214                 if (error || clnt == clnt->cl_parent)
215                         break;
216         }
217         return error;
218 }
219
220 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
221 {
222         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
223         struct rpc_clnt *clnt;
224
225         spin_lock(&sn->rpc_client_lock);
226         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
227                 if (rpc_clnt_skip_event(clnt, event))
228                         continue;
229                 spin_unlock(&sn->rpc_client_lock);
230                 return clnt;
231         }
232         spin_unlock(&sn->rpc_client_lock);
233         return NULL;
234 }
235
236 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
237                             void *ptr)
238 {
239         struct super_block *sb = ptr;
240         struct rpc_clnt *clnt;
241         int error = 0;
242
243         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
244                 error = __rpc_pipefs_event(clnt, event, sb);
245                 if (error)
246                         break;
247         }
248         return error;
249 }
250
251 static struct notifier_block rpc_clients_block = {
252         .notifier_call  = rpc_pipefs_event,
253         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
254 };
255
256 int rpc_clients_notifier_register(void)
257 {
258         return rpc_pipefs_notifier_register(&rpc_clients_block);
259 }
260
261 void rpc_clients_notifier_unregister(void)
262 {
263         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
264 }
265
266 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
267                 struct rpc_xprt *xprt,
268                 const struct rpc_timeout *timeout)
269 {
270         struct rpc_xprt *old;
271
272         spin_lock(&clnt->cl_lock);
273         old = rcu_dereference_protected(clnt->cl_xprt,
274                         lockdep_is_held(&clnt->cl_lock));
275
276         if (!xprt_bound(xprt))
277                 clnt->cl_autobind = 1;
278
279         clnt->cl_timeout = timeout;
280         rcu_assign_pointer(clnt->cl_xprt, xprt);
281         spin_unlock(&clnt->cl_lock);
282
283         return old;
284 }
285
286 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
287 {
288         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
289                         nodename, sizeof(clnt->cl_nodename));
290 }
291
292 static int rpc_client_register(struct rpc_clnt *clnt,
293                                rpc_authflavor_t pseudoflavor,
294                                const char *client_name)
295 {
296         struct rpc_auth_create_args auth_args = {
297                 .pseudoflavor = pseudoflavor,
298                 .target_name = client_name,
299         };
300         struct rpc_auth *auth;
301         struct net *net = rpc_net_ns(clnt);
302         struct super_block *pipefs_sb;
303         int err;
304
305         rpc_clnt_debugfs_register(clnt);
306
307         pipefs_sb = rpc_get_sb_net(net);
308         if (pipefs_sb) {
309                 err = rpc_setup_pipedir(pipefs_sb, clnt);
310                 if (err)
311                         goto out;
312         }
313
314         rpc_register_client(clnt);
315         if (pipefs_sb)
316                 rpc_put_sb_net(net);
317
318         auth = rpcauth_create(&auth_args, clnt);
319         if (IS_ERR(auth)) {
320                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
321                                 pseudoflavor);
322                 err = PTR_ERR(auth);
323                 goto err_auth;
324         }
325         return 0;
326 err_auth:
327         pipefs_sb = rpc_get_sb_net(net);
328         rpc_unregister_client(clnt);
329         __rpc_clnt_remove_pipedir(clnt);
330 out:
331         if (pipefs_sb)
332                 rpc_put_sb_net(net);
333         rpc_clnt_debugfs_unregister(clnt);
334         return err;
335 }
336
337 static DEFINE_IDA(rpc_clids);
338
339 void rpc_cleanup_clids(void)
340 {
341         ida_destroy(&rpc_clids);
342 }
343
344 static int rpc_alloc_clid(struct rpc_clnt *clnt)
345 {
346         int clid;
347
348         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
349         if (clid < 0)
350                 return clid;
351         clnt->cl_clid = clid;
352         return 0;
353 }
354
355 static void rpc_free_clid(struct rpc_clnt *clnt)
356 {
357         ida_simple_remove(&rpc_clids, clnt->cl_clid);
358 }
359
360 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
361                 struct rpc_xprt_switch *xps,
362                 struct rpc_xprt *xprt,
363                 struct rpc_clnt *parent)
364 {
365         const struct rpc_program *program = args->program;
366         const struct rpc_version *version;
367         struct rpc_clnt *clnt = NULL;
368         const struct rpc_timeout *timeout;
369         const char *nodename = args->nodename;
370         int err;
371
372         /* sanity check the name before trying to print it */
373         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
374                         program->name, args->servername, xprt);
375
376         err = rpciod_up();
377         if (err)
378                 goto out_no_rpciod;
379
380         err = -EINVAL;
381         if (args->version >= program->nrvers)
382                 goto out_err;
383         version = program->version[args->version];
384         if (version == NULL)
385                 goto out_err;
386
387         err = -ENOMEM;
388         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
389         if (!clnt)
390                 goto out_err;
391         clnt->cl_parent = parent ? : clnt;
392
393         err = rpc_alloc_clid(clnt);
394         if (err)
395                 goto out_no_clid;
396
397         clnt->cl_cred     = get_cred(args->cred);
398         clnt->cl_procinfo = version->procs;
399         clnt->cl_maxproc  = version->nrprocs;
400         clnt->cl_prog     = args->prognumber ? : program->number;
401         clnt->cl_vers     = version->number;
402         clnt->cl_stats    = program->stats;
403         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
404         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
405         err = -ENOMEM;
406         if (clnt->cl_metrics == NULL)
407                 goto out_no_stats;
408         clnt->cl_program  = program;
409         INIT_LIST_HEAD(&clnt->cl_tasks);
410         spin_lock_init(&clnt->cl_lock);
411
412         timeout = xprt->timeout;
413         if (args->timeout != NULL) {
414                 memcpy(&clnt->cl_timeout_default, args->timeout,
415                                 sizeof(clnt->cl_timeout_default));
416                 timeout = &clnt->cl_timeout_default;
417         }
418
419         rpc_clnt_set_transport(clnt, xprt, timeout);
420         xprt_iter_init(&clnt->cl_xpi, xps);
421         xprt_switch_put(xps);
422
423         clnt->cl_rtt = &clnt->cl_rtt_default;
424         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
425
426         atomic_set(&clnt->cl_count, 1);
427
428         if (nodename == NULL)
429                 nodename = utsname()->nodename;
430         /* save the nodename */
431         rpc_clnt_set_nodename(clnt, nodename);
432
433         err = rpc_client_register(clnt, args->authflavor, args->client_name);
434         if (err)
435                 goto out_no_path;
436         if (parent)
437                 atomic_inc(&parent->cl_count);
438         return clnt;
439
440 out_no_path:
441         rpc_free_iostats(clnt->cl_metrics);
442 out_no_stats:
443         put_cred(clnt->cl_cred);
444         rpc_free_clid(clnt);
445 out_no_clid:
446         kfree(clnt);
447 out_err:
448         rpciod_down();
449 out_no_rpciod:
450         xprt_switch_put(xps);
451         xprt_put(xprt);
452         return ERR_PTR(err);
453 }
454
455 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
456                                         struct rpc_xprt *xprt)
457 {
458         struct rpc_clnt *clnt = NULL;
459         struct rpc_xprt_switch *xps;
460
461         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
462                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
463                 xps = args->bc_xprt->xpt_bc_xps;
464                 xprt_switch_get(xps);
465         } else {
466                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
467                 if (xps == NULL) {
468                         xprt_put(xprt);
469                         return ERR_PTR(-ENOMEM);
470                 }
471                 if (xprt->bc_xprt) {
472                         xprt_switch_get(xps);
473                         xprt->bc_xprt->xpt_bc_xps = xps;
474                 }
475         }
476         clnt = rpc_new_client(args, xps, xprt, NULL);
477         if (IS_ERR(clnt))
478                 return clnt;
479
480         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
481                 int err = rpc_ping(clnt);
482                 if (err != 0) {
483                         rpc_shutdown_client(clnt);
484                         return ERR_PTR(err);
485                 }
486         }
487
488         clnt->cl_softrtry = 1;
489         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
490                 clnt->cl_softrtry = 0;
491                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
492                         clnt->cl_softerr = 1;
493         }
494
495         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
496                 clnt->cl_autobind = 1;
497         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
498                 clnt->cl_noretranstimeo = 1;
499         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
500                 clnt->cl_discrtry = 1;
501         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
502                 clnt->cl_chatty = 1;
503
504         return clnt;
505 }
506
507 /**
508  * rpc_create - create an RPC client and transport with one call
509  * @args: rpc_clnt create argument structure
510  *
511  * Creates and initializes an RPC transport and an RPC client.
512  *
513  * It can ping the server in order to determine if it is up, and to see if
514  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
515  * this behavior so asynchronous tasks can also use rpc_create.
516  */
517 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
518 {
519         struct rpc_xprt *xprt;
520         struct xprt_create xprtargs = {
521                 .net = args->net,
522                 .ident = args->protocol,
523                 .srcaddr = args->saddress,
524                 .dstaddr = args->address,
525                 .addrlen = args->addrsize,
526                 .servername = args->servername,
527                 .bc_xprt = args->bc_xprt,
528         };
529         char servername[48];
530
531         if (args->bc_xprt) {
532                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
533                 xprt = args->bc_xprt->xpt_bc_xprt;
534                 if (xprt) {
535                         xprt_get(xprt);
536                         return rpc_create_xprt(args, xprt);
537                 }
538         }
539
540         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
541                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
542         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
543                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
544         /*
545          * If the caller chooses not to specify a hostname, whip
546          * up a string representation of the passed-in address.
547          */
548         if (xprtargs.servername == NULL) {
549                 struct sockaddr_un *sun =
550                                 (struct sockaddr_un *)args->address;
551                 struct sockaddr_in *sin =
552                                 (struct sockaddr_in *)args->address;
553                 struct sockaddr_in6 *sin6 =
554                                 (struct sockaddr_in6 *)args->address;
555
556                 servername[0] = '\0';
557                 switch (args->address->sa_family) {
558                 case AF_LOCAL:
559                         snprintf(servername, sizeof(servername), "%s",
560                                  sun->sun_path);
561                         break;
562                 case AF_INET:
563                         snprintf(servername, sizeof(servername), "%pI4",
564                                  &sin->sin_addr.s_addr);
565                         break;
566                 case AF_INET6:
567                         snprintf(servername, sizeof(servername), "%pI6",
568                                  &sin6->sin6_addr);
569                         break;
570                 default:
571                         /* caller wants default server name, but
572                          * address family isn't recognized. */
573                         return ERR_PTR(-EINVAL);
574                 }
575                 xprtargs.servername = servername;
576         }
577
578         xprt = xprt_create_transport(&xprtargs);
579         if (IS_ERR(xprt))
580                 return (struct rpc_clnt *)xprt;
581
582         /*
583          * By default, kernel RPC client connects from a reserved port.
584          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
585          * but it is always enabled for rpciod, which handles the connect
586          * operation.
587          */
588         xprt->resvport = 1;
589         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
590                 xprt->resvport = 0;
591
592         return rpc_create_xprt(args, xprt);
593 }
594 EXPORT_SYMBOL_GPL(rpc_create);
595
596 /*
597  * This function clones the RPC client structure. It allows us to share the
598  * same transport while varying parameters such as the authentication
599  * flavour.
600  */
601 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
602                                            struct rpc_clnt *clnt)
603 {
604         struct rpc_xprt_switch *xps;
605         struct rpc_xprt *xprt;
606         struct rpc_clnt *new;
607         int err;
608
609         err = -ENOMEM;
610         rcu_read_lock();
611         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
612         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
613         rcu_read_unlock();
614         if (xprt == NULL || xps == NULL) {
615                 xprt_put(xprt);
616                 xprt_switch_put(xps);
617                 goto out_err;
618         }
619         args->servername = xprt->servername;
620         args->nodename = clnt->cl_nodename;
621
622         new = rpc_new_client(args, xps, xprt, clnt);
623         if (IS_ERR(new)) {
624                 err = PTR_ERR(new);
625                 goto out_err;
626         }
627
628         /* Turn off autobind on clones */
629         new->cl_autobind = 0;
630         new->cl_softrtry = clnt->cl_softrtry;
631         new->cl_softerr = clnt->cl_softerr;
632         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
633         new->cl_discrtry = clnt->cl_discrtry;
634         new->cl_chatty = clnt->cl_chatty;
635         new->cl_principal = clnt->cl_principal;
636         new->cl_cred = get_cred(clnt->cl_cred);
637         return new;
638
639 out_err:
640         dprintk("RPC:       %s: returned error %d\n", __func__, err);
641         return ERR_PTR(err);
642 }
643
644 /**
645  * rpc_clone_client - Clone an RPC client structure
646  *
647  * @clnt: RPC client whose parameters are copied
648  *
649  * Returns a fresh RPC client or an ERR_PTR.
650  */
651 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
652 {
653         struct rpc_create_args args = {
654                 .program        = clnt->cl_program,
655                 .prognumber     = clnt->cl_prog,
656                 .version        = clnt->cl_vers,
657                 .authflavor     = clnt->cl_auth->au_flavor,
658                 .cred           = clnt->cl_cred,
659         };
660         return __rpc_clone_client(&args, clnt);
661 }
662 EXPORT_SYMBOL_GPL(rpc_clone_client);
663
664 /**
665  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
666  *
667  * @clnt: RPC client whose parameters are copied
668  * @flavor: security flavor for new client
669  *
670  * Returns a fresh RPC client or an ERR_PTR.
671  */
672 struct rpc_clnt *
673 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
674 {
675         struct rpc_create_args args = {
676                 .program        = clnt->cl_program,
677                 .prognumber     = clnt->cl_prog,
678                 .version        = clnt->cl_vers,
679                 .authflavor     = flavor,
680                 .cred           = clnt->cl_cred,
681         };
682         return __rpc_clone_client(&args, clnt);
683 }
684 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
685
686 /**
687  * rpc_switch_client_transport: switch the RPC transport on the fly
688  * @clnt: pointer to a struct rpc_clnt
689  * @args: pointer to the new transport arguments
690  * @timeout: pointer to the new timeout parameters
691  *
692  * This function allows the caller to switch the RPC transport for the
693  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
694  * server, for instance.  It assumes that the caller has ensured that
695  * there are no active RPC tasks by using some form of locking.
696  *
697  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
698  * negative errno is returned, and "clnt" continues to use the old
699  * xprt.
700  */
701 int rpc_switch_client_transport(struct rpc_clnt *clnt,
702                 struct xprt_create *args,
703                 const struct rpc_timeout *timeout)
704 {
705         const struct rpc_timeout *old_timeo;
706         rpc_authflavor_t pseudoflavor;
707         struct rpc_xprt_switch *xps, *oldxps;
708         struct rpc_xprt *xprt, *old;
709         struct rpc_clnt *parent;
710         int err;
711
712         xprt = xprt_create_transport(args);
713         if (IS_ERR(xprt)) {
714                 dprintk("RPC:       failed to create new xprt for clnt %p\n",
715                         clnt);
716                 return PTR_ERR(xprt);
717         }
718
719         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
720         if (xps == NULL) {
721                 xprt_put(xprt);
722                 return -ENOMEM;
723         }
724
725         pseudoflavor = clnt->cl_auth->au_flavor;
726
727         old_timeo = clnt->cl_timeout;
728         old = rpc_clnt_set_transport(clnt, xprt, timeout);
729         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
730
731         rpc_unregister_client(clnt);
732         __rpc_clnt_remove_pipedir(clnt);
733         rpc_clnt_debugfs_unregister(clnt);
734
735         /*
736          * A new transport was created.  "clnt" therefore
737          * becomes the root of a new cl_parent tree.  clnt's
738          * children, if it has any, still point to the old xprt.
739          */
740         parent = clnt->cl_parent;
741         clnt->cl_parent = clnt;
742
743         /*
744          * The old rpc_auth cache cannot be re-used.  GSS
745          * contexts in particular are between a single
746          * client and server.
747          */
748         err = rpc_client_register(clnt, pseudoflavor, NULL);
749         if (err)
750                 goto out_revert;
751
752         synchronize_rcu();
753         if (parent != clnt)
754                 rpc_release_client(parent);
755         xprt_switch_put(oldxps);
756         xprt_put(old);
757         dprintk("RPC:       replaced xprt for clnt %p\n", clnt);
758         return 0;
759
760 out_revert:
761         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
762         rpc_clnt_set_transport(clnt, old, old_timeo);
763         clnt->cl_parent = parent;
764         rpc_client_register(clnt, pseudoflavor, NULL);
765         xprt_switch_put(xps);
766         xprt_put(xprt);
767         dprintk("RPC:       failed to switch xprt for clnt %p\n", clnt);
768         return err;
769 }
770 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
771
772 static
773 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
774 {
775         struct rpc_xprt_switch *xps;
776
777         rcu_read_lock();
778         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
779         rcu_read_unlock();
780         if (xps == NULL)
781                 return -EAGAIN;
782         xprt_iter_init_listall(xpi, xps);
783         xprt_switch_put(xps);
784         return 0;
785 }
786
787 /**
788  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
789  * @clnt: pointer to client
790  * @fn: function to apply
791  * @data: void pointer to function data
792  *
793  * Iterates through the list of RPC transports currently attached to the
794  * client and applies the function fn(clnt, xprt, data).
795  *
796  * On error, the iteration stops, and the function returns the error value.
797  */
798 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
799                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
800                 void *data)
801 {
802         struct rpc_xprt_iter xpi;
803         int ret;
804
805         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
806         if (ret)
807                 return ret;
808         for (;;) {
809                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
810
811                 if (!xprt)
812                         break;
813                 ret = fn(clnt, xprt, data);
814                 xprt_put(xprt);
815                 if (ret < 0)
816                         break;
817         }
818         xprt_iter_destroy(&xpi);
819         return ret;
820 }
821 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
822
823 /*
824  * Kill all tasks for the given client.
825  * XXX: kill their descendants as well?
826  */
827 void rpc_killall_tasks(struct rpc_clnt *clnt)
828 {
829         struct rpc_task *rovr;
830
831
832         if (list_empty(&clnt->cl_tasks))
833                 return;
834         dprintk("RPC:       killing all tasks for client %p\n", clnt);
835         /*
836          * Spin lock all_tasks to prevent changes...
837          */
838         spin_lock(&clnt->cl_lock);
839         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
840                 rpc_signal_task(rovr);
841         spin_unlock(&clnt->cl_lock);
842 }
843 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
844
845 /*
846  * Properly shut down an RPC client, terminating all outstanding
847  * requests.
848  */
849 void rpc_shutdown_client(struct rpc_clnt *clnt)
850 {
851         might_sleep();
852
853         dprintk_rcu("RPC:       shutting down %s client for %s\n",
854                         clnt->cl_program->name,
855                         rcu_dereference(clnt->cl_xprt)->servername);
856
857         while (!list_empty(&clnt->cl_tasks)) {
858                 rpc_killall_tasks(clnt);
859                 wait_event_timeout(destroy_wait,
860                         list_empty(&clnt->cl_tasks), 1*HZ);
861         }
862
863         rpc_release_client(clnt);
864 }
865 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
866
867 /*
868  * Free an RPC client
869  */
870 static struct rpc_clnt *
871 rpc_free_client(struct rpc_clnt *clnt)
872 {
873         struct rpc_clnt *parent = NULL;
874
875         dprintk_rcu("RPC:       destroying %s client for %s\n",
876                         clnt->cl_program->name,
877                         rcu_dereference(clnt->cl_xprt)->servername);
878         if (clnt->cl_parent != clnt)
879                 parent = clnt->cl_parent;
880         rpc_clnt_debugfs_unregister(clnt);
881         rpc_clnt_remove_pipedir(clnt);
882         rpc_unregister_client(clnt);
883         rpc_free_iostats(clnt->cl_metrics);
884         clnt->cl_metrics = NULL;
885         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
886         xprt_iter_destroy(&clnt->cl_xpi);
887         rpciod_down();
888         put_cred(clnt->cl_cred);
889         rpc_free_clid(clnt);
890         kfree(clnt);
891         return parent;
892 }
893
894 /*
895  * Free an RPC client
896  */
897 static struct rpc_clnt *
898 rpc_free_auth(struct rpc_clnt *clnt)
899 {
900         if (clnt->cl_auth == NULL)
901                 return rpc_free_client(clnt);
902
903         /*
904          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
905          *       release remaining GSS contexts. This mechanism ensures
906          *       that it can do so safely.
907          */
908         atomic_inc(&clnt->cl_count);
909         rpcauth_release(clnt->cl_auth);
910         clnt->cl_auth = NULL;
911         if (atomic_dec_and_test(&clnt->cl_count))
912                 return rpc_free_client(clnt);
913         return NULL;
914 }
915
916 /*
917  * Release reference to the RPC client
918  */
919 void
920 rpc_release_client(struct rpc_clnt *clnt)
921 {
922         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
923
924         do {
925                 if (list_empty(&clnt->cl_tasks))
926                         wake_up(&destroy_wait);
927                 if (!atomic_dec_and_test(&clnt->cl_count))
928                         break;
929                 clnt = rpc_free_auth(clnt);
930         } while (clnt != NULL);
931 }
932 EXPORT_SYMBOL_GPL(rpc_release_client);
933
934 /**
935  * rpc_bind_new_program - bind a new RPC program to an existing client
936  * @old: old rpc_client
937  * @program: rpc program to set
938  * @vers: rpc program version
939  *
940  * Clones the rpc client and sets up a new RPC program. This is mainly
941  * of use for enabling different RPC programs to share the same transport.
942  * The Sun NFSv2/v3 ACL protocol can do this.
943  */
944 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
945                                       const struct rpc_program *program,
946                                       u32 vers)
947 {
948         struct rpc_create_args args = {
949                 .program        = program,
950                 .prognumber     = program->number,
951                 .version        = vers,
952                 .authflavor     = old->cl_auth->au_flavor,
953                 .cred           = old->cl_cred,
954         };
955         struct rpc_clnt *clnt;
956         int err;
957
958         clnt = __rpc_clone_client(&args, old);
959         if (IS_ERR(clnt))
960                 goto out;
961         err = rpc_ping(clnt);
962         if (err != 0) {
963                 rpc_shutdown_client(clnt);
964                 clnt = ERR_PTR(err);
965         }
966 out:
967         return clnt;
968 }
969 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
970
971 void rpc_task_release_transport(struct rpc_task *task)
972 {
973         struct rpc_xprt *xprt = task->tk_xprt;
974
975         if (xprt) {
976                 task->tk_xprt = NULL;
977                 xprt_put(xprt);
978         }
979 }
980 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
981
982 void rpc_task_release_client(struct rpc_task *task)
983 {
984         struct rpc_clnt *clnt = task->tk_client;
985
986         if (clnt != NULL) {
987                 /* Remove from client task list */
988                 spin_lock(&clnt->cl_lock);
989                 list_del(&task->tk_task);
990                 spin_unlock(&clnt->cl_lock);
991                 task->tk_client = NULL;
992
993                 rpc_release_client(clnt);
994         }
995         rpc_task_release_transport(task);
996 }
997
998 static
999 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1000 {
1001         if (!task->tk_xprt)
1002                 task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
1003 }
1004
1005 static
1006 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1007 {
1008
1009         if (clnt != NULL) {
1010                 rpc_task_set_transport(task, clnt);
1011                 task->tk_client = clnt;
1012                 atomic_inc(&clnt->cl_count);
1013                 if (clnt->cl_softrtry)
1014                         task->tk_flags |= RPC_TASK_SOFT;
1015                 if (clnt->cl_softerr)
1016                         task->tk_flags |= RPC_TASK_TIMEOUT;
1017                 if (clnt->cl_noretranstimeo)
1018                         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1019                 if (atomic_read(&clnt->cl_swapper))
1020                         task->tk_flags |= RPC_TASK_SWAPPER;
1021                 /* Add to the client's list of all tasks */
1022                 spin_lock(&clnt->cl_lock);
1023                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
1024                 spin_unlock(&clnt->cl_lock);
1025         }
1026 }
1027
1028 static void
1029 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1030 {
1031         if (msg != NULL) {
1032                 task->tk_msg.rpc_proc = msg->rpc_proc;
1033                 task->tk_msg.rpc_argp = msg->rpc_argp;
1034                 task->tk_msg.rpc_resp = msg->rpc_resp;
1035                 if (msg->rpc_cred != NULL)
1036                         task->tk_msg.rpc_cred = get_cred(msg->rpc_cred);
1037         }
1038 }
1039
1040 /*
1041  * Default callback for async RPC calls
1042  */
1043 static void
1044 rpc_default_callback(struct rpc_task *task, void *data)
1045 {
1046 }
1047
1048 static const struct rpc_call_ops rpc_default_ops = {
1049         .rpc_call_done = rpc_default_callback,
1050 };
1051
1052 /**
1053  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1054  * @task_setup_data: pointer to task initialisation data
1055  */
1056 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1057 {
1058         struct rpc_task *task;
1059
1060         task = rpc_new_task(task_setup_data);
1061
1062         rpc_task_set_client(task, task_setup_data->rpc_client);
1063         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1064
1065         if (task->tk_action == NULL)
1066                 rpc_call_start(task);
1067
1068         atomic_inc(&task->tk_count);
1069         rpc_execute(task);
1070         return task;
1071 }
1072 EXPORT_SYMBOL_GPL(rpc_run_task);
1073
1074 /**
1075  * rpc_call_sync - Perform a synchronous RPC call
1076  * @clnt: pointer to RPC client
1077  * @msg: RPC call parameters
1078  * @flags: RPC call flags
1079  */
1080 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1081 {
1082         struct rpc_task *task;
1083         struct rpc_task_setup task_setup_data = {
1084                 .rpc_client = clnt,
1085                 .rpc_message = msg,
1086                 .callback_ops = &rpc_default_ops,
1087                 .flags = flags,
1088         };
1089         int status;
1090
1091         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1092         if (flags & RPC_TASK_ASYNC) {
1093                 rpc_release_calldata(task_setup_data.callback_ops,
1094                         task_setup_data.callback_data);
1095                 return -EINVAL;
1096         }
1097
1098         task = rpc_run_task(&task_setup_data);
1099         if (IS_ERR(task))
1100                 return PTR_ERR(task);
1101         status = task->tk_status;
1102         rpc_put_task(task);
1103         return status;
1104 }
1105 EXPORT_SYMBOL_GPL(rpc_call_sync);
1106
1107 /**
1108  * rpc_call_async - Perform an asynchronous RPC call
1109  * @clnt: pointer to RPC client
1110  * @msg: RPC call parameters
1111  * @flags: RPC call flags
1112  * @tk_ops: RPC call ops
1113  * @data: user call data
1114  */
1115 int
1116 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1117                const struct rpc_call_ops *tk_ops, void *data)
1118 {
1119         struct rpc_task *task;
1120         struct rpc_task_setup task_setup_data = {
1121                 .rpc_client = clnt,
1122                 .rpc_message = msg,
1123                 .callback_ops = tk_ops,
1124                 .callback_data = data,
1125                 .flags = flags|RPC_TASK_ASYNC,
1126         };
1127
1128         task = rpc_run_task(&task_setup_data);
1129         if (IS_ERR(task))
1130                 return PTR_ERR(task);
1131         rpc_put_task(task);
1132         return 0;
1133 }
1134 EXPORT_SYMBOL_GPL(rpc_call_async);
1135
1136 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1137 static void call_bc_encode(struct rpc_task *task);
1138
1139 /**
1140  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1141  * rpc_execute against it
1142  * @req: RPC request
1143  */
1144 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1145 {
1146         struct rpc_task *task;
1147         struct rpc_task_setup task_setup_data = {
1148                 .callback_ops = &rpc_default_ops,
1149                 .flags = RPC_TASK_SOFTCONN |
1150                         RPC_TASK_NO_RETRANS_TIMEOUT,
1151         };
1152
1153         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1154         /*
1155          * Create an rpc_task to send the data
1156          */
1157         task = rpc_new_task(&task_setup_data);
1158         xprt_init_bc_request(req, task);
1159
1160         task->tk_action = call_bc_encode;
1161         atomic_inc(&task->tk_count);
1162         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1163         rpc_execute(task);
1164
1165         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1166         return task;
1167 }
1168 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1169
1170 /**
1171  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1172  * @req: RPC request to prepare
1173  * @pages: vector of struct page pointers
1174  * @base: offset in first page where receive should start, in bytes
1175  * @len: expected size of the upper layer data payload, in bytes
1176  * @hdrsize: expected size of upper layer reply header, in XDR words
1177  *
1178  */
1179 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1180                              unsigned int base, unsigned int len,
1181                              unsigned int hdrsize)
1182 {
1183         /* Subtract one to force an extra word of buffer space for the
1184          * payload's XDR pad to fall into the rcv_buf's tail iovec.
1185          */
1186         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign - 1;
1187
1188         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1189         trace_rpc_reply_pages(req);
1190 }
1191 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1192
1193 void
1194 rpc_call_start(struct rpc_task *task)
1195 {
1196         task->tk_action = call_start;
1197 }
1198 EXPORT_SYMBOL_GPL(rpc_call_start);
1199
1200 /**
1201  * rpc_peeraddr - extract remote peer address from clnt's xprt
1202  * @clnt: RPC client structure
1203  * @buf: target buffer
1204  * @bufsize: length of target buffer
1205  *
1206  * Returns the number of bytes that are actually in the stored address.
1207  */
1208 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1209 {
1210         size_t bytes;
1211         struct rpc_xprt *xprt;
1212
1213         rcu_read_lock();
1214         xprt = rcu_dereference(clnt->cl_xprt);
1215
1216         bytes = xprt->addrlen;
1217         if (bytes > bufsize)
1218                 bytes = bufsize;
1219         memcpy(buf, &xprt->addr, bytes);
1220         rcu_read_unlock();
1221
1222         return bytes;
1223 }
1224 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1225
1226 /**
1227  * rpc_peeraddr2str - return remote peer address in printable format
1228  * @clnt: RPC client structure
1229  * @format: address format
1230  *
1231  * NB: the lifetime of the memory referenced by the returned pointer is
1232  * the same as the rpc_xprt itself.  As long as the caller uses this
1233  * pointer, it must hold the RCU read lock.
1234  */
1235 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1236                              enum rpc_display_format_t format)
1237 {
1238         struct rpc_xprt *xprt;
1239
1240         xprt = rcu_dereference(clnt->cl_xprt);
1241
1242         if (xprt->address_strings[format] != NULL)
1243                 return xprt->address_strings[format];
1244         else
1245                 return "unprintable";
1246 }
1247 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1248
1249 static const struct sockaddr_in rpc_inaddr_loopback = {
1250         .sin_family             = AF_INET,
1251         .sin_addr.s_addr        = htonl(INADDR_ANY),
1252 };
1253
1254 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1255         .sin6_family            = AF_INET6,
1256         .sin6_addr              = IN6ADDR_ANY_INIT,
1257 };
1258
1259 /*
1260  * Try a getsockname() on a connected datagram socket.  Using a
1261  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1262  * This conserves the ephemeral port number space.
1263  *
1264  * Returns zero and fills in "buf" if successful; otherwise, a
1265  * negative errno is returned.
1266  */
1267 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1268                         struct sockaddr *buf)
1269 {
1270         struct socket *sock;
1271         int err;
1272
1273         err = __sock_create(net, sap->sa_family,
1274                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1275         if (err < 0) {
1276                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1277                 goto out;
1278         }
1279
1280         switch (sap->sa_family) {
1281         case AF_INET:
1282                 err = kernel_bind(sock,
1283                                 (struct sockaddr *)&rpc_inaddr_loopback,
1284                                 sizeof(rpc_inaddr_loopback));
1285                 break;
1286         case AF_INET6:
1287                 err = kernel_bind(sock,
1288                                 (struct sockaddr *)&rpc_in6addr_loopback,
1289                                 sizeof(rpc_in6addr_loopback));
1290                 break;
1291         default:
1292                 err = -EAFNOSUPPORT;
1293                 goto out;
1294         }
1295         if (err < 0) {
1296                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1297                 goto out_release;
1298         }
1299
1300         err = kernel_connect(sock, sap, salen, 0);
1301         if (err < 0) {
1302                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1303                 goto out_release;
1304         }
1305
1306         err = kernel_getsockname(sock, buf);
1307         if (err < 0) {
1308                 dprintk("RPC:       getsockname failed (%d)\n", err);
1309                 goto out_release;
1310         }
1311
1312         err = 0;
1313         if (buf->sa_family == AF_INET6) {
1314                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1315                 sin6->sin6_scope_id = 0;
1316         }
1317         dprintk("RPC:       %s succeeded\n", __func__);
1318
1319 out_release:
1320         sock_release(sock);
1321 out:
1322         return err;
1323 }
1324
1325 /*
1326  * Scraping a connected socket failed, so we don't have a useable
1327  * local address.  Fallback: generate an address that will prevent
1328  * the server from calling us back.
1329  *
1330  * Returns zero and fills in "buf" if successful; otherwise, a
1331  * negative errno is returned.
1332  */
1333 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1334 {
1335         switch (family) {
1336         case AF_INET:
1337                 if (buflen < sizeof(rpc_inaddr_loopback))
1338                         return -EINVAL;
1339                 memcpy(buf, &rpc_inaddr_loopback,
1340                                 sizeof(rpc_inaddr_loopback));
1341                 break;
1342         case AF_INET6:
1343                 if (buflen < sizeof(rpc_in6addr_loopback))
1344                         return -EINVAL;
1345                 memcpy(buf, &rpc_in6addr_loopback,
1346                                 sizeof(rpc_in6addr_loopback));
1347                 break;
1348         default:
1349                 dprintk("RPC:       %s: address family not supported\n",
1350                         __func__);
1351                 return -EAFNOSUPPORT;
1352         }
1353         dprintk("RPC:       %s: succeeded\n", __func__);
1354         return 0;
1355 }
1356
1357 /**
1358  * rpc_localaddr - discover local endpoint address for an RPC client
1359  * @clnt: RPC client structure
1360  * @buf: target buffer
1361  * @buflen: size of target buffer, in bytes
1362  *
1363  * Returns zero and fills in "buf" and "buflen" if successful;
1364  * otherwise, a negative errno is returned.
1365  *
1366  * This works even if the underlying transport is not currently connected,
1367  * or if the upper layer never previously provided a source address.
1368  *
1369  * The result of this function call is transient: multiple calls in
1370  * succession may give different results, depending on how local
1371  * networking configuration changes over time.
1372  */
1373 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1374 {
1375         struct sockaddr_storage address;
1376         struct sockaddr *sap = (struct sockaddr *)&address;
1377         struct rpc_xprt *xprt;
1378         struct net *net;
1379         size_t salen;
1380         int err;
1381
1382         rcu_read_lock();
1383         xprt = rcu_dereference(clnt->cl_xprt);
1384         salen = xprt->addrlen;
1385         memcpy(sap, &xprt->addr, salen);
1386         net = get_net(xprt->xprt_net);
1387         rcu_read_unlock();
1388
1389         rpc_set_port(sap, 0);
1390         err = rpc_sockname(net, sap, salen, buf);
1391         put_net(net);
1392         if (err != 0)
1393                 /* Couldn't discover local address, return ANYADDR */
1394                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1395         return 0;
1396 }
1397 EXPORT_SYMBOL_GPL(rpc_localaddr);
1398
1399 void
1400 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1401 {
1402         struct rpc_xprt *xprt;
1403
1404         rcu_read_lock();
1405         xprt = rcu_dereference(clnt->cl_xprt);
1406         if (xprt->ops->set_buffer_size)
1407                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1408         rcu_read_unlock();
1409 }
1410 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1411
1412 /**
1413  * rpc_net_ns - Get the network namespace for this RPC client
1414  * @clnt: RPC client to query
1415  *
1416  */
1417 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1418 {
1419         struct net *ret;
1420
1421         rcu_read_lock();
1422         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1423         rcu_read_unlock();
1424         return ret;
1425 }
1426 EXPORT_SYMBOL_GPL(rpc_net_ns);
1427
1428 /**
1429  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1430  * @clnt: RPC client to query
1431  *
1432  * For stream transports, this is one RPC record fragment (see RFC
1433  * 1831), as we don't support multi-record requests yet.  For datagram
1434  * transports, this is the size of an IP packet minus the IP, UDP, and
1435  * RPC header sizes.
1436  */
1437 size_t rpc_max_payload(struct rpc_clnt *clnt)
1438 {
1439         size_t ret;
1440
1441         rcu_read_lock();
1442         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1443         rcu_read_unlock();
1444         return ret;
1445 }
1446 EXPORT_SYMBOL_GPL(rpc_max_payload);
1447
1448 /**
1449  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1450  * @clnt: RPC client to query
1451  */
1452 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1453 {
1454         struct rpc_xprt *xprt;
1455         size_t ret;
1456
1457         rcu_read_lock();
1458         xprt = rcu_dereference(clnt->cl_xprt);
1459         ret = xprt->ops->bc_maxpayload(xprt);
1460         rcu_read_unlock();
1461         return ret;
1462 }
1463 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1464
1465 /**
1466  * rpc_force_rebind - force transport to check that remote port is unchanged
1467  * @clnt: client to rebind
1468  *
1469  */
1470 void rpc_force_rebind(struct rpc_clnt *clnt)
1471 {
1472         if (clnt->cl_autobind) {
1473                 rcu_read_lock();
1474                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1475                 rcu_read_unlock();
1476         }
1477 }
1478 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1479
1480 static int
1481 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1482 {
1483         task->tk_status = 0;
1484         task->tk_rpc_status = 0;
1485         task->tk_action = action;
1486         return 1;
1487 }
1488
1489 /*
1490  * Restart an (async) RPC call. Usually called from within the
1491  * exit handler.
1492  */
1493 int
1494 rpc_restart_call(struct rpc_task *task)
1495 {
1496         return __rpc_restart_call(task, call_start);
1497 }
1498 EXPORT_SYMBOL_GPL(rpc_restart_call);
1499
1500 /*
1501  * Restart an (async) RPC call from the call_prepare state.
1502  * Usually called from within the exit handler.
1503  */
1504 int
1505 rpc_restart_call_prepare(struct rpc_task *task)
1506 {
1507         if (task->tk_ops->rpc_call_prepare != NULL)
1508                 return __rpc_restart_call(task, rpc_prepare_task);
1509         return rpc_restart_call(task);
1510 }
1511 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1512
1513 const char
1514 *rpc_proc_name(const struct rpc_task *task)
1515 {
1516         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1517
1518         if (proc) {
1519                 if (proc->p_name)
1520                         return proc->p_name;
1521                 else
1522                         return "NULL";
1523         } else
1524                 return "no proc";
1525 }
1526
1527 static void
1528 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1529 {
1530         task->tk_rpc_status = rpc_status;
1531         rpc_exit(task, tk_status);
1532 }
1533
1534 static void
1535 rpc_call_rpcerror(struct rpc_task *task, int status)
1536 {
1537         __rpc_call_rpcerror(task, status, status);
1538 }
1539
1540 /*
1541  * 0.  Initial state
1542  *
1543  *     Other FSM states can be visited zero or more times, but
1544  *     this state is visited exactly once for each RPC.
1545  */
1546 static void
1547 call_start(struct rpc_task *task)
1548 {
1549         struct rpc_clnt *clnt = task->tk_client;
1550         int idx = task->tk_msg.rpc_proc->p_statidx;
1551
1552         trace_rpc_request(task);
1553         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1554                         clnt->cl_program->name, clnt->cl_vers,
1555                         rpc_proc_name(task),
1556                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1557
1558         /* Increment call count (version might not be valid for ping) */
1559         if (clnt->cl_program->version[clnt->cl_vers])
1560                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1561         clnt->cl_stats->rpccnt++;
1562         task->tk_action = call_reserve;
1563         rpc_task_set_transport(task, clnt);
1564 }
1565
1566 /*
1567  * 1.   Reserve an RPC call slot
1568  */
1569 static void
1570 call_reserve(struct rpc_task *task)
1571 {
1572         dprint_status(task);
1573
1574         task->tk_status  = 0;
1575         task->tk_action  = call_reserveresult;
1576         xprt_reserve(task);
1577 }
1578
1579 static void call_retry_reserve(struct rpc_task *task);
1580
1581 /*
1582  * 1b.  Grok the result of xprt_reserve()
1583  */
1584 static void
1585 call_reserveresult(struct rpc_task *task)
1586 {
1587         int status = task->tk_status;
1588
1589         dprint_status(task);
1590
1591         /*
1592          * After a call to xprt_reserve(), we must have either
1593          * a request slot or else an error status.
1594          */
1595         task->tk_status = 0;
1596         if (status >= 0) {
1597                 if (task->tk_rqstp) {
1598                         task->tk_action = call_refresh;
1599                         return;
1600                 }
1601
1602                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1603                                 __func__, status);
1604                 rpc_call_rpcerror(task, -EIO);
1605                 return;
1606         }
1607
1608         /*
1609          * Even though there was an error, we may have acquired
1610          * a request slot somehow.  Make sure not to leak it.
1611          */
1612         if (task->tk_rqstp) {
1613                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1614                                 __func__, status);
1615                 xprt_release(task);
1616         }
1617
1618         switch (status) {
1619         case -ENOMEM:
1620                 rpc_delay(task, HZ >> 2);
1621                 /* fall through */
1622         case -EAGAIN:   /* woken up; retry */
1623                 task->tk_action = call_retry_reserve;
1624                 return;
1625         case -EIO:      /* probably a shutdown */
1626                 break;
1627         default:
1628                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1629                                 __func__, status);
1630                 break;
1631         }
1632         rpc_call_rpcerror(task, status);
1633 }
1634
1635 /*
1636  * 1c.  Retry reserving an RPC call slot
1637  */
1638 static void
1639 call_retry_reserve(struct rpc_task *task)
1640 {
1641         dprint_status(task);
1642
1643         task->tk_status  = 0;
1644         task->tk_action  = call_reserveresult;
1645         xprt_retry_reserve(task);
1646 }
1647
1648 /*
1649  * 2.   Bind and/or refresh the credentials
1650  */
1651 static void
1652 call_refresh(struct rpc_task *task)
1653 {
1654         dprint_status(task);
1655
1656         task->tk_action = call_refreshresult;
1657         task->tk_status = 0;
1658         task->tk_client->cl_stats->rpcauthrefresh++;
1659         rpcauth_refreshcred(task);
1660 }
1661
1662 /*
1663  * 2a.  Process the results of a credential refresh
1664  */
1665 static void
1666 call_refreshresult(struct rpc_task *task)
1667 {
1668         int status = task->tk_status;
1669
1670         dprint_status(task);
1671
1672         task->tk_status = 0;
1673         task->tk_action = call_refresh;
1674         switch (status) {
1675         case 0:
1676                 if (rpcauth_uptodatecred(task)) {
1677                         task->tk_action = call_allocate;
1678                         return;
1679                 }
1680                 /* Use rate-limiting and a max number of retries if refresh
1681                  * had status 0 but failed to update the cred.
1682                  */
1683                 /* fall through */
1684         case -ETIMEDOUT:
1685                 rpc_delay(task, 3*HZ);
1686                 /* fall through */
1687         case -EAGAIN:
1688                 status = -EACCES;
1689                 /* fall through */
1690         case -EKEYEXPIRED:
1691                 if (!task->tk_cred_retry)
1692                         break;
1693                 task->tk_cred_retry--;
1694                 dprintk("RPC: %5u %s: retry refresh creds\n",
1695                                 task->tk_pid, __func__);
1696                 return;
1697         }
1698         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1699                                 task->tk_pid, __func__, status);
1700         rpc_call_rpcerror(task, status);
1701 }
1702
1703 /*
1704  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1705  *      (Note: buffer memory is freed in xprt_release).
1706  */
1707 static void
1708 call_allocate(struct rpc_task *task)
1709 {
1710         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1711         struct rpc_rqst *req = task->tk_rqstp;
1712         struct rpc_xprt *xprt = req->rq_xprt;
1713         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1714         int status;
1715
1716         dprint_status(task);
1717
1718         task->tk_status = 0;
1719         task->tk_action = call_encode;
1720
1721         if (req->rq_buffer)
1722                 return;
1723
1724         if (proc->p_proc != 0) {
1725                 BUG_ON(proc->p_arglen == 0);
1726                 if (proc->p_decode != NULL)
1727                         BUG_ON(proc->p_replen == 0);
1728         }
1729
1730         /*
1731          * Calculate the size (in quads) of the RPC call
1732          * and reply headers, and convert both values
1733          * to byte sizes.
1734          */
1735         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1736                            proc->p_arglen;
1737         req->rq_callsize <<= 2;
1738         /*
1739          * Note: the reply buffer must at minimum allocate enough space
1740          * for the 'struct accepted_reply' from RFC5531.
1741          */
1742         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1743                         max_t(size_t, proc->p_replen, 2);
1744         req->rq_rcvsize <<= 2;
1745
1746         status = xprt->ops->buf_alloc(task);
1747         xprt_inject_disconnect(xprt);
1748         if (status == 0)
1749                 return;
1750         if (status != -ENOMEM) {
1751                 rpc_call_rpcerror(task, status);
1752                 return;
1753         }
1754
1755         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1756
1757         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1758                 task->tk_action = call_allocate;
1759                 rpc_delay(task, HZ>>4);
1760                 return;
1761         }
1762
1763         rpc_exit(task, -ERESTARTSYS);
1764 }
1765
1766 static int
1767 rpc_task_need_encode(struct rpc_task *task)
1768 {
1769         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1770                 (!(task->tk_flags & RPC_TASK_SENT) ||
1771                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1772                  xprt_request_need_retransmit(task));
1773 }
1774
1775 static void
1776 rpc_xdr_encode(struct rpc_task *task)
1777 {
1778         struct rpc_rqst *req = task->tk_rqstp;
1779         struct xdr_stream xdr;
1780
1781         xdr_buf_init(&req->rq_snd_buf,
1782                      req->rq_buffer,
1783                      req->rq_callsize);
1784         xdr_buf_init(&req->rq_rcv_buf,
1785                      req->rq_rbuffer,
1786                      req->rq_rcvsize);
1787
1788         req->rq_snd_buf.head[0].iov_len = 0;
1789         xdr_init_encode(&xdr, &req->rq_snd_buf,
1790                         req->rq_snd_buf.head[0].iov_base, req);
1791         if (rpc_encode_header(task, &xdr))
1792                 return;
1793
1794         task->tk_status = rpcauth_wrap_req(task, &xdr);
1795 }
1796
1797 /*
1798  * 3.   Encode arguments of an RPC call
1799  */
1800 static void
1801 call_encode(struct rpc_task *task)
1802 {
1803         if (!rpc_task_need_encode(task))
1804                 goto out;
1805         dprint_status(task);
1806         /* Encode here so that rpcsec_gss can use correct sequence number. */
1807         rpc_xdr_encode(task);
1808         /* Did the encode result in an error condition? */
1809         if (task->tk_status != 0) {
1810                 /* Was the error nonfatal? */
1811                 switch (task->tk_status) {
1812                 case -EAGAIN:
1813                 case -ENOMEM:
1814                         rpc_delay(task, HZ >> 4);
1815                         break;
1816                 case -EKEYEXPIRED:
1817                         if (!task->tk_cred_retry) {
1818                                 rpc_exit(task, task->tk_status);
1819                         } else {
1820                                 task->tk_action = call_refresh;
1821                                 task->tk_cred_retry--;
1822                                 dprintk("RPC: %5u %s: retry refresh creds\n",
1823                                         task->tk_pid, __func__);
1824                         }
1825                         break;
1826                 default:
1827                         rpc_call_rpcerror(task, task->tk_status);
1828                 }
1829                 return;
1830         } else {
1831                 xprt_request_prepare(task->tk_rqstp);
1832         }
1833
1834         /* Add task to reply queue before transmission to avoid races */
1835         if (rpc_reply_expected(task))
1836                 xprt_request_enqueue_receive(task);
1837         xprt_request_enqueue_transmit(task);
1838 out:
1839         task->tk_action = call_transmit;
1840         /* Check that the connection is OK */
1841         if (!xprt_bound(task->tk_xprt))
1842                 task->tk_action = call_bind;
1843         else if (!xprt_connected(task->tk_xprt))
1844                 task->tk_action = call_connect;
1845 }
1846
1847 /*
1848  * Helpers to check if the task was already transmitted, and
1849  * to take action when that is the case.
1850  */
1851 static bool
1852 rpc_task_transmitted(struct rpc_task *task)
1853 {
1854         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1855 }
1856
1857 static void
1858 rpc_task_handle_transmitted(struct rpc_task *task)
1859 {
1860         xprt_end_transmit(task);
1861         task->tk_action = call_transmit_status;
1862 }
1863
1864 /*
1865  * 4.   Get the server port number if not yet set
1866  */
1867 static void
1868 call_bind(struct rpc_task *task)
1869 {
1870         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1871
1872         if (rpc_task_transmitted(task)) {
1873                 rpc_task_handle_transmitted(task);
1874                 return;
1875         }
1876
1877         if (xprt_bound(xprt)) {
1878                 task->tk_action = call_connect;
1879                 return;
1880         }
1881
1882         dprint_status(task);
1883
1884         task->tk_action = call_bind_status;
1885         if (!xprt_prepare_transmit(task))
1886                 return;
1887
1888         xprt->ops->rpcbind(task);
1889 }
1890
1891 /*
1892  * 4a.  Sort out bind result
1893  */
1894 static void
1895 call_bind_status(struct rpc_task *task)
1896 {
1897         int status = -EIO;
1898
1899         if (rpc_task_transmitted(task)) {
1900                 rpc_task_handle_transmitted(task);
1901                 return;
1902         }
1903
1904         if (task->tk_status >= 0) {
1905                 dprint_status(task);
1906                 task->tk_status = 0;
1907                 task->tk_action = call_connect;
1908                 return;
1909         }
1910
1911         trace_rpc_bind_status(task);
1912         switch (task->tk_status) {
1913         case -ENOMEM:
1914                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1915                 rpc_delay(task, HZ >> 2);
1916                 goto retry_timeout;
1917         case -EACCES:
1918                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1919                                 "unavailable\n", task->tk_pid);
1920                 /* fail immediately if this is an RPC ping */
1921                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1922                         status = -EOPNOTSUPP;
1923                         break;
1924                 }
1925                 if (task->tk_rebind_retry == 0)
1926                         break;
1927                 task->tk_rebind_retry--;
1928                 rpc_delay(task, 3*HZ);
1929                 goto retry_timeout;
1930         case -EAGAIN:
1931                 goto retry_timeout;
1932         case -ETIMEDOUT:
1933                 dprintk("RPC: %5u rpcbind request timed out\n",
1934                                 task->tk_pid);
1935                 goto retry_timeout;
1936         case -EPFNOSUPPORT:
1937                 /* server doesn't support any rpcbind version we know of */
1938                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1939                                 task->tk_pid);
1940                 break;
1941         case -EPROTONOSUPPORT:
1942                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1943                                 task->tk_pid);
1944                 goto retry_timeout;
1945         case -ECONNREFUSED:             /* connection problems */
1946         case -ECONNRESET:
1947         case -ECONNABORTED:
1948         case -ENOTCONN:
1949         case -EHOSTDOWN:
1950         case -ENETDOWN:
1951         case -EHOSTUNREACH:
1952         case -ENETUNREACH:
1953         case -ENOBUFS:
1954         case -EPIPE:
1955                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1956                                 task->tk_pid, task->tk_status);
1957                 if (!RPC_IS_SOFTCONN(task)) {
1958                         rpc_delay(task, 5*HZ);
1959                         goto retry_timeout;
1960                 }
1961                 status = task->tk_status;
1962                 break;
1963         default:
1964                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1965                                 task->tk_pid, -task->tk_status);
1966         }
1967
1968         rpc_call_rpcerror(task, status);
1969         return;
1970
1971 retry_timeout:
1972         task->tk_status = 0;
1973         task->tk_action = call_bind;
1974         rpc_check_timeout(task);
1975 }
1976
1977 /*
1978  * 4b.  Connect to the RPC server
1979  */
1980 static void
1981 call_connect(struct rpc_task *task)
1982 {
1983         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1984
1985         if (rpc_task_transmitted(task)) {
1986                 rpc_task_handle_transmitted(task);
1987                 return;
1988         }
1989
1990         if (xprt_connected(xprt)) {
1991                 task->tk_action = call_transmit;
1992                 return;
1993         }
1994
1995         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1996                         task->tk_pid, xprt,
1997                         (xprt_connected(xprt) ? "is" : "is not"));
1998
1999         task->tk_action = call_connect_status;
2000         if (task->tk_status < 0)
2001                 return;
2002         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2003                 rpc_call_rpcerror(task, -ENOTCONN);
2004                 return;
2005         }
2006         if (!xprt_prepare_transmit(task))
2007                 return;
2008         xprt_connect(task);
2009 }
2010
2011 /*
2012  * 4c.  Sort out connect result
2013  */
2014 static void
2015 call_connect_status(struct rpc_task *task)
2016 {
2017         struct rpc_clnt *clnt = task->tk_client;
2018         int status = task->tk_status;
2019
2020         if (rpc_task_transmitted(task)) {
2021                 rpc_task_handle_transmitted(task);
2022                 return;
2023         }
2024
2025         dprint_status(task);
2026
2027         trace_rpc_connect_status(task);
2028         task->tk_status = 0;
2029         switch (status) {
2030         case -ECONNREFUSED:
2031                 /* A positive refusal suggests a rebind is needed. */
2032                 if (RPC_IS_SOFTCONN(task))
2033                         break;
2034                 if (clnt->cl_autobind) {
2035                         rpc_force_rebind(clnt);
2036                         goto out_retry;
2037                 }
2038                 /* fall through */
2039         case -ECONNRESET:
2040         case -ECONNABORTED:
2041         case -ENETDOWN:
2042         case -ENETUNREACH:
2043         case -EHOSTUNREACH:
2044         case -EADDRINUSE:
2045         case -ENOBUFS:
2046         case -EPIPE:
2047                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2048                                             task->tk_rqstp->rq_connect_cookie);
2049                 if (RPC_IS_SOFTCONN(task))
2050                         break;
2051                 /* retry with existing socket, after a delay */
2052                 rpc_delay(task, 3*HZ);
2053                 /* fall through */
2054         case -ENOTCONN:
2055         case -EAGAIN:
2056         case -ETIMEDOUT:
2057                 goto out_retry;
2058         case 0:
2059                 clnt->cl_stats->netreconn++;
2060                 task->tk_action = call_transmit;
2061                 return;
2062         }
2063         rpc_call_rpcerror(task, status);
2064         return;
2065 out_retry:
2066         /* Check for timeouts before looping back to call_bind */
2067         task->tk_action = call_bind;
2068         rpc_check_timeout(task);
2069 }
2070
2071 /*
2072  * 5.   Transmit the RPC request, and wait for reply
2073  */
2074 static void
2075 call_transmit(struct rpc_task *task)
2076 {
2077         if (rpc_task_transmitted(task)) {
2078                 rpc_task_handle_transmitted(task);
2079                 return;
2080         }
2081
2082         dprint_status(task);
2083
2084         task->tk_action = call_transmit_status;
2085         if (!xprt_prepare_transmit(task))
2086                 return;
2087         task->tk_status = 0;
2088         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2089                 if (!xprt_connected(task->tk_xprt)) {
2090                         task->tk_status = -ENOTCONN;
2091                         return;
2092                 }
2093                 xprt_transmit(task);
2094         }
2095         xprt_end_transmit(task);
2096 }
2097
2098 /*
2099  * 5a.  Handle cleanup after a transmission
2100  */
2101 static void
2102 call_transmit_status(struct rpc_task *task)
2103 {
2104         task->tk_action = call_status;
2105
2106         /*
2107          * Common case: success.  Force the compiler to put this
2108          * test first.
2109          */
2110         if (rpc_task_transmitted(task)) {
2111                 task->tk_status = 0;
2112                 xprt_request_wait_receive(task);
2113                 return;
2114         }
2115
2116         switch (task->tk_status) {
2117         default:
2118                 dprint_status(task);
2119                 break;
2120         case -EBADMSG:
2121                 task->tk_status = 0;
2122                 task->tk_action = call_encode;
2123                 break;
2124                 /*
2125                  * Special cases: if we've been waiting on the
2126                  * socket's write_space() callback, or if the
2127                  * socket just returned a connection error,
2128                  * then hold onto the transport lock.
2129                  */
2130         case -ENOBUFS:
2131                 rpc_delay(task, HZ>>2);
2132                 /* fall through */
2133         case -EBADSLT:
2134         case -EAGAIN:
2135                 task->tk_action = call_transmit;
2136                 task->tk_status = 0;
2137                 break;
2138         case -ECONNREFUSED:
2139         case -EHOSTDOWN:
2140         case -ENETDOWN:
2141         case -EHOSTUNREACH:
2142         case -ENETUNREACH:
2143         case -EPERM:
2144                 if (RPC_IS_SOFTCONN(task)) {
2145                         if (!task->tk_msg.rpc_proc->p_proc)
2146                                 trace_xprt_ping(task->tk_xprt,
2147                                                 task->tk_status);
2148                         rpc_call_rpcerror(task, task->tk_status);
2149                         return;
2150                 }
2151                 /* fall through */
2152         case -ECONNRESET:
2153         case -ECONNABORTED:
2154         case -EADDRINUSE:
2155         case -ENOTCONN:
2156         case -EPIPE:
2157                 task->tk_action = call_bind;
2158                 task->tk_status = 0;
2159                 break;
2160         }
2161         rpc_check_timeout(task);
2162 }
2163
2164 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2165 static void call_bc_transmit(struct rpc_task *task);
2166 static void call_bc_transmit_status(struct rpc_task *task);
2167
2168 static void
2169 call_bc_encode(struct rpc_task *task)
2170 {
2171         xprt_request_enqueue_transmit(task);
2172         task->tk_action = call_bc_transmit;
2173 }
2174
2175 /*
2176  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2177  * addition, disconnect on connectivity errors.
2178  */
2179 static void
2180 call_bc_transmit(struct rpc_task *task)
2181 {
2182         task->tk_action = call_bc_transmit_status;
2183         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2184                 if (!xprt_prepare_transmit(task))
2185                         return;
2186                 task->tk_status = 0;
2187                 xprt_transmit(task);
2188         }
2189         xprt_end_transmit(task);
2190 }
2191
2192 static void
2193 call_bc_transmit_status(struct rpc_task *task)
2194 {
2195         struct rpc_rqst *req = task->tk_rqstp;
2196
2197         if (rpc_task_transmitted(task))
2198                 task->tk_status = 0;
2199
2200         dprint_status(task);
2201
2202         switch (task->tk_status) {
2203         case 0:
2204                 /* Success */
2205         case -ENETDOWN:
2206         case -EHOSTDOWN:
2207         case -EHOSTUNREACH:
2208         case -ENETUNREACH:
2209         case -ECONNRESET:
2210         case -ECONNREFUSED:
2211         case -EADDRINUSE:
2212         case -ENOTCONN:
2213         case -EPIPE:
2214                 break;
2215         case -ENOBUFS:
2216                 rpc_delay(task, HZ>>2);
2217                 /* fall through */
2218         case -EBADSLT:
2219         case -EAGAIN:
2220                 task->tk_status = 0;
2221                 task->tk_action = call_bc_transmit;
2222                 return;
2223         case -ETIMEDOUT:
2224                 /*
2225                  * Problem reaching the server.  Disconnect and let the
2226                  * forechannel reestablish the connection.  The server will
2227                  * have to retransmit the backchannel request and we'll
2228                  * reprocess it.  Since these ops are idempotent, there's no
2229                  * need to cache our reply at this time.
2230                  */
2231                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2232                         "error: %d\n", task->tk_status);
2233                 xprt_conditional_disconnect(req->rq_xprt,
2234                         req->rq_connect_cookie);
2235                 break;
2236         default:
2237                 /*
2238                  * We were unable to reply and will have to drop the
2239                  * request.  The server should reconnect and retransmit.
2240                  */
2241                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2242                         "error: %d\n", task->tk_status);
2243                 break;
2244         }
2245         task->tk_action = rpc_exit_task;
2246 }
2247 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2248
2249 /*
2250  * 6.   Sort out the RPC call status
2251  */
2252 static void
2253 call_status(struct rpc_task *task)
2254 {
2255         struct rpc_clnt *clnt = task->tk_client;
2256         int             status;
2257
2258         if (!task->tk_msg.rpc_proc->p_proc)
2259                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2260
2261         dprint_status(task);
2262
2263         status = task->tk_status;
2264         if (status >= 0) {
2265                 task->tk_action = call_decode;
2266                 return;
2267         }
2268
2269         trace_rpc_call_status(task);
2270         task->tk_status = 0;
2271         switch(status) {
2272         case -EHOSTDOWN:
2273         case -ENETDOWN:
2274         case -EHOSTUNREACH:
2275         case -ENETUNREACH:
2276         case -EPERM:
2277                 if (RPC_IS_SOFTCONN(task))
2278                         goto out_exit;
2279                 /*
2280                  * Delay any retries for 3 seconds, then handle as if it
2281                  * were a timeout.
2282                  */
2283                 rpc_delay(task, 3*HZ);
2284                 /* fall through */
2285         case -ETIMEDOUT:
2286                 break;
2287         case -ECONNREFUSED:
2288         case -ECONNRESET:
2289         case -ECONNABORTED:
2290                 rpc_force_rebind(clnt);
2291                 /* fall through */
2292         case -EADDRINUSE:
2293                 rpc_delay(task, 3*HZ);
2294                 /* fall through */
2295         case -EPIPE:
2296         case -ENOTCONN:
2297         case -EAGAIN:
2298                 break;
2299         case -EIO:
2300                 /* shutdown or soft timeout */
2301                 goto out_exit;
2302         default:
2303                 if (clnt->cl_chatty)
2304                         printk("%s: RPC call returned error %d\n",
2305                                clnt->cl_program->name, -status);
2306                 goto out_exit;
2307         }
2308         task->tk_action = call_encode;
2309         rpc_check_timeout(task);
2310         return;
2311 out_exit:
2312         rpc_call_rpcerror(task, status);
2313 }
2314
2315 static bool
2316 rpc_check_connected(const struct rpc_rqst *req)
2317 {
2318         /* No allocated request or transport? return true */
2319         if (!req || !req->rq_xprt)
2320                 return true;
2321         return xprt_connected(req->rq_xprt);
2322 }
2323
2324 static void
2325 rpc_check_timeout(struct rpc_task *task)
2326 {
2327         struct rpc_clnt *clnt = task->tk_client;
2328
2329         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2330                 return;
2331
2332         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
2333         task->tk_timeouts++;
2334
2335         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2336                 rpc_call_rpcerror(task, -ETIMEDOUT);
2337                 return;
2338         }
2339
2340         if (RPC_IS_SOFT(task)) {
2341                 /*
2342                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2343                  * been sent, it should time out only if the transport
2344                  * connection gets terminally broken.
2345                  */
2346                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2347                     rpc_check_connected(task->tk_rqstp))
2348                         return;
2349
2350                 if (clnt->cl_chatty) {
2351                         pr_notice_ratelimited(
2352                                 "%s: server %s not responding, timed out\n",
2353                                 clnt->cl_program->name,
2354                                 task->tk_xprt->servername);
2355                 }
2356                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2357                         rpc_call_rpcerror(task, -ETIMEDOUT);
2358                 else
2359                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2360                 return;
2361         }
2362
2363         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2364                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2365                 if (clnt->cl_chatty) {
2366                         pr_notice_ratelimited(
2367                                 "%s: server %s not responding, still trying\n",
2368                                 clnt->cl_program->name,
2369                                 task->tk_xprt->servername);
2370                 }
2371         }
2372         rpc_force_rebind(clnt);
2373         /*
2374          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2375          * event? RFC2203 requires the server to drop all such requests.
2376          */
2377         rpcauth_invalcred(task);
2378 }
2379
2380 /*
2381  * 7.   Decode the RPC reply
2382  */
2383 static void
2384 call_decode(struct rpc_task *task)
2385 {
2386         struct rpc_clnt *clnt = task->tk_client;
2387         struct rpc_rqst *req = task->tk_rqstp;
2388         struct xdr_stream xdr;
2389
2390         dprint_status(task);
2391
2392         if (!task->tk_msg.rpc_proc->p_decode) {
2393                 task->tk_action = rpc_exit_task;
2394                 return;
2395         }
2396
2397         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2398                 if (clnt->cl_chatty) {
2399                         pr_notice_ratelimited("%s: server %s OK\n",
2400                                 clnt->cl_program->name,
2401                                 task->tk_xprt->servername);
2402                 }
2403                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2404         }
2405
2406         /*
2407          * Ensure that we see all writes made by xprt_complete_rqst()
2408          * before it changed req->rq_reply_bytes_recvd.
2409          */
2410         smp_rmb();
2411         req->rq_rcv_buf.len = req->rq_private_buf.len;
2412
2413         /* Check that the softirq receive buffer is valid */
2414         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2415                                 sizeof(req->rq_rcv_buf)) != 0);
2416
2417         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2418                         req->rq_rcv_buf.head[0].iov_base, req);
2419         switch (rpc_decode_header(task, &xdr)) {
2420         case 0:
2421                 task->tk_action = rpc_exit_task;
2422                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2423                 dprintk("RPC: %5u %s result %d\n",
2424                         task->tk_pid, __func__, task->tk_status);
2425                 return;
2426         case -EAGAIN:
2427                 task->tk_status = 0;
2428                 /* Note: rpc_decode_header() may have freed the RPC slot */
2429                 if (task->tk_rqstp == req) {
2430                         xdr_free_bvec(&req->rq_rcv_buf);
2431                         req->rq_reply_bytes_recvd = 0;
2432                         req->rq_rcv_buf.len = 0;
2433                         if (task->tk_client->cl_discrtry)
2434                                 xprt_conditional_disconnect(req->rq_xprt,
2435                                                             req->rq_connect_cookie);
2436                 }
2437                 task->tk_action = call_encode;
2438                 rpc_check_timeout(task);
2439         }
2440 }
2441
2442 static int
2443 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2444 {
2445         struct rpc_clnt *clnt = task->tk_client;
2446         struct rpc_rqst *req = task->tk_rqstp;
2447         __be32 *p;
2448         int error;
2449
2450         error = -EMSGSIZE;
2451         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2452         if (!p)
2453                 goto out_fail;
2454         *p++ = req->rq_xid;
2455         *p++ = rpc_call;
2456         *p++ = cpu_to_be32(RPC_VERSION);
2457         *p++ = cpu_to_be32(clnt->cl_prog);
2458         *p++ = cpu_to_be32(clnt->cl_vers);
2459         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2460
2461         error = rpcauth_marshcred(task, xdr);
2462         if (error < 0)
2463                 goto out_fail;
2464         return 0;
2465 out_fail:
2466         trace_rpc_bad_callhdr(task);
2467         rpc_exit(task, error);
2468         return error;
2469 }
2470
2471 static noinline int
2472 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2473 {
2474         struct rpc_clnt *clnt = task->tk_client;
2475         int error;
2476         __be32 *p;
2477
2478         /* RFC-1014 says that the representation of XDR data must be a
2479          * multiple of four bytes
2480          * - if it isn't pointer subtraction in the NFS client may give
2481          *   undefined results
2482          */
2483         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2484                 goto out_unparsable;
2485
2486         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2487         if (!p)
2488                 goto out_unparsable;
2489         p++;    /* skip XID */
2490         if (*p++ != rpc_reply)
2491                 goto out_unparsable;
2492         if (*p++ != rpc_msg_accepted)
2493                 goto out_msg_denied;
2494
2495         error = rpcauth_checkverf(task, xdr);
2496         if (error)
2497                 goto out_verifier;
2498
2499         p = xdr_inline_decode(xdr, sizeof(*p));
2500         if (!p)
2501                 goto out_unparsable;
2502         switch (*p) {
2503         case rpc_success:
2504                 return 0;
2505         case rpc_prog_unavail:
2506                 trace_rpc__prog_unavail(task);
2507                 error = -EPFNOSUPPORT;
2508                 goto out_err;
2509         case rpc_prog_mismatch:
2510                 trace_rpc__prog_mismatch(task);
2511                 error = -EPROTONOSUPPORT;
2512                 goto out_err;
2513         case rpc_proc_unavail:
2514                 trace_rpc__proc_unavail(task);
2515                 error = -EOPNOTSUPP;
2516                 goto out_err;
2517         case rpc_garbage_args:
2518         case rpc_system_err:
2519                 trace_rpc__garbage_args(task);
2520                 error = -EIO;
2521                 break;
2522         default:
2523                 goto out_unparsable;
2524         }
2525
2526 out_garbage:
2527         clnt->cl_stats->rpcgarbage++;
2528         if (task->tk_garb_retry) {
2529                 task->tk_garb_retry--;
2530                 task->tk_action = call_encode;
2531                 return -EAGAIN;
2532         }
2533 out_err:
2534         rpc_exit(task, error);
2535         return error;
2536
2537 out_unparsable:
2538         trace_rpc__unparsable(task);
2539         error = -EIO;
2540         goto out_garbage;
2541
2542 out_verifier:
2543         trace_rpc_bad_verifier(task);
2544         goto out_garbage;
2545
2546 out_msg_denied:
2547         error = -EACCES;
2548         p = xdr_inline_decode(xdr, sizeof(*p));
2549         if (!p)
2550                 goto out_unparsable;
2551         switch (*p++) {
2552         case rpc_auth_error:
2553                 break;
2554         case rpc_mismatch:
2555                 trace_rpc__mismatch(task);
2556                 error = -EPROTONOSUPPORT;
2557                 goto out_err;
2558         default:
2559                 goto out_unparsable;
2560         }
2561
2562         p = xdr_inline_decode(xdr, sizeof(*p));
2563         if (!p)
2564                 goto out_unparsable;
2565         switch (*p++) {
2566         case rpc_autherr_rejectedcred:
2567         case rpc_autherr_rejectedverf:
2568         case rpcsec_gsserr_credproblem:
2569         case rpcsec_gsserr_ctxproblem:
2570                 if (!task->tk_cred_retry)
2571                         break;
2572                 task->tk_cred_retry--;
2573                 trace_rpc__stale_creds(task);
2574                 rpcauth_invalcred(task);
2575                 /* Ensure we obtain a new XID! */
2576                 xprt_release(task);
2577                 task->tk_action = call_reserve;
2578                 return -EAGAIN;
2579         case rpc_autherr_badcred:
2580         case rpc_autherr_badverf:
2581                 /* possibly garbled cred/verf? */
2582                 if (!task->tk_garb_retry)
2583                         break;
2584                 task->tk_garb_retry--;
2585                 trace_rpc__bad_creds(task);
2586                 task->tk_action = call_encode;
2587                 return -EAGAIN;
2588         case rpc_autherr_tooweak:
2589                 trace_rpc__auth_tooweak(task);
2590                 pr_warn("RPC: server %s requires stronger authentication.\n",
2591                         task->tk_xprt->servername);
2592                 break;
2593         default:
2594                 goto out_unparsable;
2595         }
2596         goto out_err;
2597 }
2598
2599 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2600                 const void *obj)
2601 {
2602 }
2603
2604 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2605                 void *obj)
2606 {
2607         return 0;
2608 }
2609
2610 static const struct rpc_procinfo rpcproc_null = {
2611         .p_encode = rpcproc_encode_null,
2612         .p_decode = rpcproc_decode_null,
2613 };
2614
2615 static int rpc_ping(struct rpc_clnt *clnt)
2616 {
2617         struct rpc_message msg = {
2618                 .rpc_proc = &rpcproc_null,
2619         };
2620         int err;
2621         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2622                             RPC_TASK_NULLCREDS);
2623         return err;
2624 }
2625
2626 static
2627 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2628                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2629                 const struct rpc_call_ops *ops, void *data)
2630 {
2631         struct rpc_message msg = {
2632                 .rpc_proc = &rpcproc_null,
2633         };
2634         struct rpc_task_setup task_setup_data = {
2635                 .rpc_client = clnt,
2636                 .rpc_xprt = xprt,
2637                 .rpc_message = &msg,
2638                 .rpc_op_cred = cred,
2639                 .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2640                 .callback_data = data,
2641                 .flags = flags | RPC_TASK_NULLCREDS,
2642         };
2643
2644         return rpc_run_task(&task_setup_data);
2645 }
2646
2647 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2648 {
2649         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2650 }
2651 EXPORT_SYMBOL_GPL(rpc_call_null);
2652
2653 struct rpc_cb_add_xprt_calldata {
2654         struct rpc_xprt_switch *xps;
2655         struct rpc_xprt *xprt;
2656 };
2657
2658 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2659 {
2660         struct rpc_cb_add_xprt_calldata *data = calldata;
2661
2662         if (task->tk_status == 0)
2663                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2664 }
2665
2666 static void rpc_cb_add_xprt_release(void *calldata)
2667 {
2668         struct rpc_cb_add_xprt_calldata *data = calldata;
2669
2670         xprt_put(data->xprt);
2671         xprt_switch_put(data->xps);
2672         kfree(data);
2673 }
2674
2675 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2676         .rpc_call_done = rpc_cb_add_xprt_done,
2677         .rpc_release = rpc_cb_add_xprt_release,
2678 };
2679
2680 /**
2681  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2682  * @clnt: pointer to struct rpc_clnt
2683  * @xps: pointer to struct rpc_xprt_switch,
2684  * @xprt: pointer struct rpc_xprt
2685  * @dummy: unused
2686  */
2687 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2688                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2689                 void *dummy)
2690 {
2691         struct rpc_cb_add_xprt_calldata *data;
2692         struct rpc_task *task;
2693
2694         data = kmalloc(sizeof(*data), GFP_NOFS);
2695         if (!data)
2696                 return -ENOMEM;
2697         data->xps = xprt_switch_get(xps);
2698         data->xprt = xprt_get(xprt);
2699
2700         task = rpc_call_null_helper(clnt, xprt, NULL,
2701                         RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC|RPC_TASK_NULLCREDS,
2702                         &rpc_cb_add_xprt_call_ops, data);
2703         if (IS_ERR(task))
2704                 return PTR_ERR(task);
2705         rpc_put_task(task);
2706         return 1;
2707 }
2708 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2709
2710 /**
2711  * rpc_clnt_setup_test_and_add_xprt()
2712  *
2713  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2714  *   1) caller of the test function must dereference the rpc_xprt_switch
2715  *   and the rpc_xprt.
2716  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2717  *   the rpc_call_done routine.
2718  *
2719  * Upon success (return of 1), the test function adds the new
2720  * transport to the rpc_clnt xprt switch
2721  *
2722  * @clnt: struct rpc_clnt to get the new transport
2723  * @xps:  the rpc_xprt_switch to hold the new transport
2724  * @xprt: the rpc_xprt to test
2725  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2726  *        and test function call data
2727  */
2728 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2729                                      struct rpc_xprt_switch *xps,
2730                                      struct rpc_xprt *xprt,
2731                                      void *data)
2732 {
2733         struct rpc_task *task;
2734         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2735         int status = -EADDRINUSE;
2736
2737         xprt = xprt_get(xprt);
2738         xprt_switch_get(xps);
2739
2740         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2741                 goto out_err;
2742
2743         /* Test the connection */
2744         task = rpc_call_null_helper(clnt, xprt, NULL,
2745                                     RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2746                                     NULL, NULL);
2747         if (IS_ERR(task)) {
2748                 status = PTR_ERR(task);
2749                 goto out_err;
2750         }
2751         status = task->tk_status;
2752         rpc_put_task(task);
2753
2754         if (status < 0)
2755                 goto out_err;
2756
2757         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2758         xtest->add_xprt_test(clnt, xprt, xtest->data);
2759
2760         xprt_put(xprt);
2761         xprt_switch_put(xps);
2762
2763         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2764         return 1;
2765 out_err:
2766         xprt_put(xprt);
2767         xprt_switch_put(xps);
2768         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2769                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2770         return status;
2771 }
2772 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2773
2774 /**
2775  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2776  * @clnt: pointer to struct rpc_clnt
2777  * @xprtargs: pointer to struct xprt_create
2778  * @setup: callback to test and/or set up the connection
2779  * @data: pointer to setup function data
2780  *
2781  * Creates a new transport using the parameters set in args and
2782  * adds it to clnt.
2783  * If ping is set, then test that connectivity succeeds before
2784  * adding the new transport.
2785  *
2786  */
2787 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2788                 struct xprt_create *xprtargs,
2789                 int (*setup)(struct rpc_clnt *,
2790                         struct rpc_xprt_switch *,
2791                         struct rpc_xprt *,
2792                         void *),
2793                 void *data)
2794 {
2795         struct rpc_xprt_switch *xps;
2796         struct rpc_xprt *xprt;
2797         unsigned long connect_timeout;
2798         unsigned long reconnect_timeout;
2799         unsigned char resvport;
2800         int ret = 0;
2801
2802         rcu_read_lock();
2803         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2804         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2805         if (xps == NULL || xprt == NULL) {
2806                 rcu_read_unlock();
2807                 return -EAGAIN;
2808         }
2809         resvport = xprt->resvport;
2810         connect_timeout = xprt->connect_timeout;
2811         reconnect_timeout = xprt->max_reconnect_timeout;
2812         rcu_read_unlock();
2813
2814         xprt = xprt_create_transport(xprtargs);
2815         if (IS_ERR(xprt)) {
2816                 ret = PTR_ERR(xprt);
2817                 goto out_put_switch;
2818         }
2819         xprt->resvport = resvport;
2820         if (xprt->ops->set_connect_timeout != NULL)
2821                 xprt->ops->set_connect_timeout(xprt,
2822                                 connect_timeout,
2823                                 reconnect_timeout);
2824
2825         rpc_xprt_switch_set_roundrobin(xps);
2826         if (setup) {
2827                 ret = setup(clnt, xps, xprt, data);
2828                 if (ret != 0)
2829                         goto out_put_xprt;
2830         }
2831         rpc_xprt_switch_add_xprt(xps, xprt);
2832 out_put_xprt:
2833         xprt_put(xprt);
2834 out_put_switch:
2835         xprt_switch_put(xps);
2836         return ret;
2837 }
2838 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2839
2840 struct connect_timeout_data {
2841         unsigned long connect_timeout;
2842         unsigned long reconnect_timeout;
2843 };
2844
2845 static int
2846 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2847                 struct rpc_xprt *xprt,
2848                 void *data)
2849 {
2850         struct connect_timeout_data *timeo = data;
2851
2852         if (xprt->ops->set_connect_timeout)
2853                 xprt->ops->set_connect_timeout(xprt,
2854                                 timeo->connect_timeout,
2855                                 timeo->reconnect_timeout);
2856         return 0;
2857 }
2858
2859 void
2860 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2861                 unsigned long connect_timeout,
2862                 unsigned long reconnect_timeout)
2863 {
2864         struct connect_timeout_data timeout = {
2865                 .connect_timeout = connect_timeout,
2866                 .reconnect_timeout = reconnect_timeout,
2867         };
2868         rpc_clnt_iterate_for_each_xprt(clnt,
2869                         rpc_xprt_set_connect_timeout,
2870                         &timeout);
2871 }
2872 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2873
2874 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2875 {
2876         rcu_read_lock();
2877         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2878         rcu_read_unlock();
2879 }
2880 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2881
2882 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2883 {
2884         rcu_read_lock();
2885         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2886                                  xprt);
2887         rcu_read_unlock();
2888 }
2889 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2890
2891 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2892                                    const struct sockaddr *sap)
2893 {
2894         struct rpc_xprt_switch *xps;
2895         bool ret;
2896
2897         rcu_read_lock();
2898         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2899         ret = rpc_xprt_switch_has_addr(xps, sap);
2900         rcu_read_unlock();
2901         return ret;
2902 }
2903 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2904
2905 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2906 static void rpc_show_header(void)
2907 {
2908         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2909                 "-timeout ---ops--\n");
2910 }
2911
2912 static void rpc_show_task(const struct rpc_clnt *clnt,
2913                           const struct rpc_task *task)
2914 {
2915         const char *rpc_waitq = "none";
2916
2917         if (RPC_IS_QUEUED(task))
2918                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2919
2920         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2921                 task->tk_pid, task->tk_flags, task->tk_status,
2922                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
2923                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2924                 task->tk_action, rpc_waitq);
2925 }
2926
2927 void rpc_show_tasks(struct net *net)
2928 {
2929         struct rpc_clnt *clnt;
2930         struct rpc_task *task;
2931         int header = 0;
2932         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2933
2934         spin_lock(&sn->rpc_client_lock);
2935         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2936                 spin_lock(&clnt->cl_lock);
2937                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2938                         if (!header) {
2939                                 rpc_show_header();
2940                                 header++;
2941                         }
2942                         rpc_show_task(clnt, task);
2943                 }
2944                 spin_unlock(&clnt->cl_lock);
2945         }
2946         spin_unlock(&sn->rpc_client_lock);
2947 }
2948 #endif
2949
2950 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
2951 static int
2952 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
2953                 struct rpc_xprt *xprt,
2954                 void *dummy)
2955 {
2956         return xprt_enable_swap(xprt);
2957 }
2958
2959 int
2960 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
2961 {
2962         if (atomic_inc_return(&clnt->cl_swapper) == 1)
2963                 return rpc_clnt_iterate_for_each_xprt(clnt,
2964                                 rpc_clnt_swap_activate_callback, NULL);
2965         return 0;
2966 }
2967 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
2968
2969 static int
2970 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
2971                 struct rpc_xprt *xprt,
2972                 void *dummy)
2973 {
2974         xprt_disable_swap(xprt);
2975         return 0;
2976 }
2977
2978 void
2979 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
2980 {
2981         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
2982                 rpc_clnt_iterate_for_each_xprt(clnt,
2983                                 rpc_clnt_swap_deactivate_callback, NULL);
2984 }
2985 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
2986 #endif /* CONFIG_SUNRPC_SWAP */