Merge master.kernel.org:/pub/scm/linux/kernel/git/davem/net-2.6
[sfrench/cifs-2.6.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  */
17
18 #include <linux/types.h>
19 #include <linux/slab.h>
20 #include <linux/capability.h>
21 #include <linux/sched.h>
22 #include <linux/pagemap.h>
23 #include <linux/errno.h>
24 #include <linux/socket.h>
25 #include <linux/in.h>
26 #include <linux/net.h>
27 #include <linux/mm.h>
28 #include <linux/udp.h>
29 #include <linux/tcp.h>
30 #include <linux/sunrpc/clnt.h>
31 #include <linux/sunrpc/sched.h>
32 #include <linux/file.h>
33
34 #include <net/sock.h>
35 #include <net/checksum.h>
36 #include <net/udp.h>
37 #include <net/tcp.h>
38
39 /*
40  * xprtsock tunables
41  */
42 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
43 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
44
45 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
46 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
47
48 /*
49  * How many times to try sending a request on a socket before waiting
50  * for the socket buffer to clear.
51  */
52 #define XS_SENDMSG_RETRY        (10U)
53
54 /*
55  * Time out for an RPC UDP socket connect.  UDP socket connects are
56  * synchronous, but we set a timeout anyway in case of resource
57  * exhaustion on the local host.
58  */
59 #define XS_UDP_CONN_TO          (5U * HZ)
60
61 /*
62  * Wait duration for an RPC TCP connection to be established.  Solaris
63  * NFS over TCP uses 60 seconds, for example, which is in line with how
64  * long a server takes to reboot.
65  */
66 #define XS_TCP_CONN_TO          (60U * HZ)
67
68 /*
69  * Wait duration for a reply from the RPC portmapper.
70  */
71 #define XS_BIND_TO              (60U * HZ)
72
73 /*
74  * Delay if a UDP socket connect error occurs.  This is most likely some
75  * kind of resource problem on the local host.
76  */
77 #define XS_UDP_REEST_TO         (2U * HZ)
78
79 /*
80  * The reestablish timeout allows clients to delay for a bit before attempting
81  * to reconnect to a server that just dropped our connection.
82  *
83  * We implement an exponential backoff when trying to reestablish a TCP
84  * transport connection with the server.  Some servers like to drop a TCP
85  * connection when they are overworked, so we start with a short timeout and
86  * increase over time if the server is down or not responding.
87  */
88 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
89 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
90
91 /*
92  * TCP idle timeout; client drops the transport socket if it is idle
93  * for this long.  Note that we also timeout UDP sockets to prevent
94  * holding port numbers when there is no RPC traffic.
95  */
96 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
97
98 #ifdef RPC_DEBUG
99 # undef  RPC_DEBUG_DATA
100 # define RPCDBG_FACILITY        RPCDBG_TRANS
101 #endif
102
103 #ifdef RPC_DEBUG_DATA
104 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
105 {
106         u8 *buf = (u8 *) packet;
107         int j;
108
109         dprintk("RPC:      %s\n", msg);
110         for (j = 0; j < count && j < 128; j += 4) {
111                 if (!(j & 31)) {
112                         if (j)
113                                 dprintk("\n");
114                         dprintk("0x%04x ", j);
115                 }
116                 dprintk("%02x%02x%02x%02x ",
117                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
118         }
119         dprintk("\n");
120 }
121 #else
122 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
123 {
124         /* NOP */
125 }
126 #endif
127
128 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
129
130 static inline int xs_send_head(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, unsigned int len)
131 {
132         struct kvec iov = {
133                 .iov_base       = xdr->head[0].iov_base + base,
134                 .iov_len        = len - base,
135         };
136         struct msghdr msg = {
137                 .msg_name       = addr,
138                 .msg_namelen    = addrlen,
139                 .msg_flags      = XS_SENDMSG_FLAGS,
140         };
141
142         if (xdr->len > len)
143                 msg.msg_flags |= MSG_MORE;
144
145         if (likely(iov.iov_len))
146                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
147         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
148 }
149
150 static int xs_send_tail(struct socket *sock, struct xdr_buf *xdr, unsigned int base, unsigned int len)
151 {
152         struct kvec iov = {
153                 .iov_base       = xdr->tail[0].iov_base + base,
154                 .iov_len        = len - base,
155         };
156         struct msghdr msg = {
157                 .msg_flags      = XS_SENDMSG_FLAGS,
158         };
159
160         return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
161 }
162
163 /**
164  * xs_sendpages - write pages directly to a socket
165  * @sock: socket to send on
166  * @addr: UDP only -- address of destination
167  * @addrlen: UDP only -- length of destination address
168  * @xdr: buffer containing this request
169  * @base: starting position in the buffer
170  *
171  */
172 static inline int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
173 {
174         struct page **ppage = xdr->pages;
175         unsigned int len, pglen = xdr->page_len;
176         int err, ret = 0;
177
178         if (unlikely(!sock))
179                 return -ENOTCONN;
180
181         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
182
183         len = xdr->head[0].iov_len;
184         if (base < len || (addr != NULL && base == 0)) {
185                 err = xs_send_head(sock, addr, addrlen, xdr, base, len);
186                 if (ret == 0)
187                         ret = err;
188                 else if (err > 0)
189                         ret += err;
190                 if (err != (len - base))
191                         goto out;
192                 base = 0;
193         } else
194                 base -= len;
195
196         if (unlikely(pglen == 0))
197                 goto copy_tail;
198         if (unlikely(base >= pglen)) {
199                 base -= pglen;
200                 goto copy_tail;
201         }
202         if (base || xdr->page_base) {
203                 pglen -= base;
204                 base += xdr->page_base;
205                 ppage += base >> PAGE_CACHE_SHIFT;
206                 base &= ~PAGE_CACHE_MASK;
207         }
208
209         do {
210                 int flags = XS_SENDMSG_FLAGS;
211
212                 len = PAGE_CACHE_SIZE;
213                 if (base)
214                         len -= base;
215                 if (pglen < len)
216                         len = pglen;
217
218                 if (pglen != len || xdr->tail[0].iov_len != 0)
219                         flags |= MSG_MORE;
220
221                 err = kernel_sendpage(sock, *ppage, base, len, flags);
222                 if (ret == 0)
223                         ret = err;
224                 else if (err > 0)
225                         ret += err;
226                 if (err != len)
227                         goto out;
228                 base = 0;
229                 ppage++;
230         } while ((pglen -= len) != 0);
231 copy_tail:
232         len = xdr->tail[0].iov_len;
233         if (base < len) {
234                 err = xs_send_tail(sock, xdr, base, len);
235                 if (ret == 0)
236                         ret = err;
237                 else if (err > 0)
238                         ret += err;
239         }
240 out:
241         return ret;
242 }
243
244 /**
245  * xs_nospace - place task on wait queue if transmit was incomplete
246  * @task: task to put to sleep
247  *
248  */
249 static void xs_nospace(struct rpc_task *task)
250 {
251         struct rpc_rqst *req = task->tk_rqstp;
252         struct rpc_xprt *xprt = req->rq_xprt;
253
254         dprintk("RPC: %4d xmit incomplete (%u left of %u)\n",
255                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
256                         req->rq_slen);
257
258         if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
259                 /* Protect against races with write_space */
260                 spin_lock_bh(&xprt->transport_lock);
261
262                 /* Don't race with disconnect */
263                 if (!xprt_connected(xprt))
264                         task->tk_status = -ENOTCONN;
265                 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags))
266                         xprt_wait_for_buffer_space(task);
267
268                 spin_unlock_bh(&xprt->transport_lock);
269         } else
270                 /* Keep holding the socket if it is blocked */
271                 rpc_delay(task, HZ>>4);
272 }
273
274 /**
275  * xs_udp_send_request - write an RPC request to a UDP socket
276  * @task: address of RPC task that manages the state of an RPC request
277  *
278  * Return values:
279  *        0:    The request has been sent
280  *   EAGAIN:    The socket was blocked, please call again later to
281  *              complete the request
282  * ENOTCONN:    Caller needs to invoke connect logic then call again
283  *    other:    Some other error occured, the request was not sent
284  */
285 static int xs_udp_send_request(struct rpc_task *task)
286 {
287         struct rpc_rqst *req = task->tk_rqstp;
288         struct rpc_xprt *xprt = req->rq_xprt;
289         struct xdr_buf *xdr = &req->rq_snd_buf;
290         int status;
291
292         xs_pktdump("packet data:",
293                                 req->rq_svec->iov_base,
294                                 req->rq_svec->iov_len);
295
296         req->rq_xtime = jiffies;
297         status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr,
298                                 sizeof(xprt->addr), xdr, req->rq_bytes_sent);
299
300         dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
301                         xdr->len - req->rq_bytes_sent, status);
302
303         if (likely(status >= (int) req->rq_slen))
304                 return 0;
305
306         /* Still some bytes left; set up for a retry later. */
307         if (status > 0)
308                 status = -EAGAIN;
309
310         switch (status) {
311         case -ENETUNREACH:
312         case -EPIPE:
313         case -ECONNREFUSED:
314                 /* When the server has died, an ICMP port unreachable message
315                  * prompts ECONNREFUSED. */
316                 break;
317         case -EAGAIN:
318                 xs_nospace(task);
319                 break;
320         default:
321                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
322                         -status);
323                 break;
324         }
325
326         return status;
327 }
328
329 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
330 {
331         u32 reclen = buf->len - sizeof(rpc_fraghdr);
332         rpc_fraghdr *base = buf->head[0].iov_base;
333         *base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
334 }
335
336 /**
337  * xs_tcp_send_request - write an RPC request to a TCP socket
338  * @task: address of RPC task that manages the state of an RPC request
339  *
340  * Return values:
341  *        0:    The request has been sent
342  *   EAGAIN:    The socket was blocked, please call again later to
343  *              complete the request
344  * ENOTCONN:    Caller needs to invoke connect logic then call again
345  *    other:    Some other error occured, the request was not sent
346  *
347  * XXX: In the case of soft timeouts, should we eventually give up
348  *      if sendmsg is not able to make progress?
349  */
350 static int xs_tcp_send_request(struct rpc_task *task)
351 {
352         struct rpc_rqst *req = task->tk_rqstp;
353         struct rpc_xprt *xprt = req->rq_xprt;
354         struct xdr_buf *xdr = &req->rq_snd_buf;
355         int status, retry = 0;
356
357         xs_encode_tcp_record_marker(&req->rq_snd_buf);
358
359         xs_pktdump("packet data:",
360                                 req->rq_svec->iov_base,
361                                 req->rq_svec->iov_len);
362
363         /* Continue transmitting the packet/record. We must be careful
364          * to cope with writespace callbacks arriving _after_ we have
365          * called sendmsg(). */
366         while (1) {
367                 req->rq_xtime = jiffies;
368                 status = xs_sendpages(xprt->sock, NULL, 0, xdr,
369                                                 req->rq_bytes_sent);
370
371                 dprintk("RPC:      xs_tcp_send_request(%u) = %d\n",
372                                 xdr->len - req->rq_bytes_sent, status);
373
374                 if (unlikely(status < 0))
375                         break;
376
377                 /* If we've sent the entire packet, immediately
378                  * reset the count of bytes sent. */
379                 req->rq_bytes_sent += status;
380                 task->tk_bytes_sent += status;
381                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
382                         req->rq_bytes_sent = 0;
383                         return 0;
384                 }
385
386                 status = -EAGAIN;
387                 if (retry++ > XS_SENDMSG_RETRY)
388                         break;
389         }
390
391         switch (status) {
392         case -EAGAIN:
393                 xs_nospace(task);
394                 break;
395         case -ECONNREFUSED:
396         case -ECONNRESET:
397         case -ENOTCONN:
398         case -EPIPE:
399                 status = -ENOTCONN;
400                 break;
401         default:
402                 dprintk("RPC:      sendmsg returned unrecognized error %d\n",
403                         -status);
404                 xprt_disconnect(xprt);
405                 break;
406         }
407
408         return status;
409 }
410
411 /**
412  * xs_tcp_release_xprt - clean up after a tcp transmission
413  * @xprt: transport
414  * @task: rpc task
415  *
416  * This cleans up if an error causes us to abort the transmission of a request.
417  * In this case, the socket may need to be reset in order to avoid confusing
418  * the server.
419  */
420 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
421 {
422         struct rpc_rqst *req;
423
424         if (task != xprt->snd_task)
425                 return;
426         if (task == NULL)
427                 goto out_release;
428         req = task->tk_rqstp;
429         if (req->rq_bytes_sent == 0)
430                 goto out_release;
431         if (req->rq_bytes_sent == req->rq_snd_buf.len)
432                 goto out_release;
433         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
434 out_release:
435         xprt_release_xprt(xprt, task);
436 }
437
438 /**
439  * xs_close - close a socket
440  * @xprt: transport
441  *
442  * This is used when all requests are complete; ie, no DRC state remains
443  * on the server we want to save.
444  */
445 static void xs_close(struct rpc_xprt *xprt)
446 {
447         struct socket *sock = xprt->sock;
448         struct sock *sk = xprt->inet;
449
450         if (!sk)
451                 goto clear_close_wait;
452
453         dprintk("RPC:      xs_close xprt %p\n", xprt);
454
455         write_lock_bh(&sk->sk_callback_lock);
456         xprt->inet = NULL;
457         xprt->sock = NULL;
458
459         sk->sk_user_data = NULL;
460         sk->sk_data_ready = xprt->old_data_ready;
461         sk->sk_state_change = xprt->old_state_change;
462         sk->sk_write_space = xprt->old_write_space;
463         write_unlock_bh(&sk->sk_callback_lock);
464
465         sk->sk_no_check = 0;
466
467         sock_release(sock);
468 clear_close_wait:
469         smp_mb__before_clear_bit();
470         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
471         smp_mb__after_clear_bit();
472 }
473
474 /**
475  * xs_destroy - prepare to shutdown a transport
476  * @xprt: doomed transport
477  *
478  */
479 static void xs_destroy(struct rpc_xprt *xprt)
480 {
481         dprintk("RPC:      xs_destroy xprt %p\n", xprt);
482
483         cancel_delayed_work(&xprt->connect_worker);
484         flush_scheduled_work();
485
486         xprt_disconnect(xprt);
487         xs_close(xprt);
488         kfree(xprt->slot);
489 }
490
491 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
492 {
493         return (struct rpc_xprt *) sk->sk_user_data;
494 }
495
496 /**
497  * xs_udp_data_ready - "data ready" callback for UDP sockets
498  * @sk: socket with data to read
499  * @len: how much data to read
500  *
501  */
502 static void xs_udp_data_ready(struct sock *sk, int len)
503 {
504         struct rpc_task *task;
505         struct rpc_xprt *xprt;
506         struct rpc_rqst *rovr;
507         struct sk_buff *skb;
508         int err, repsize, copied;
509         u32 _xid, *xp;
510
511         read_lock(&sk->sk_callback_lock);
512         dprintk("RPC:      xs_udp_data_ready...\n");
513         if (!(xprt = xprt_from_sock(sk)))
514                 goto out;
515
516         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
517                 goto out;
518
519         if (xprt->shutdown)
520                 goto dropit;
521
522         repsize = skb->len - sizeof(struct udphdr);
523         if (repsize < 4) {
524                 dprintk("RPC:      impossible RPC reply size %d!\n", repsize);
525                 goto dropit;
526         }
527
528         /* Copy the XID from the skb... */
529         xp = skb_header_pointer(skb, sizeof(struct udphdr),
530                                 sizeof(_xid), &_xid);
531         if (xp == NULL)
532                 goto dropit;
533
534         /* Look up and lock the request corresponding to the given XID */
535         spin_lock(&xprt->transport_lock);
536         rovr = xprt_lookup_rqst(xprt, *xp);
537         if (!rovr)
538                 goto out_unlock;
539         task = rovr->rq_task;
540
541         if ((copied = rovr->rq_private_buf.buflen) > repsize)
542                 copied = repsize;
543
544         /* Suck it into the iovec, verify checksum if not done by hw. */
545         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
546                 goto out_unlock;
547
548         /* Something worked... */
549         dst_confirm(skb->dst);
550
551         xprt_adjust_cwnd(task, copied);
552         xprt_update_rtt(task);
553         xprt_complete_rqst(task, copied);
554
555  out_unlock:
556         spin_unlock(&xprt->transport_lock);
557  dropit:
558         skb_free_datagram(sk, skb);
559  out:
560         read_unlock(&sk->sk_callback_lock);
561 }
562
563 static inline size_t xs_tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
564 {
565         if (len > desc->count)
566                 len = desc->count;
567         if (skb_copy_bits(desc->skb, desc->offset, p, len)) {
568                 dprintk("RPC:      failed to copy %zu bytes from skb. %zu bytes remain\n",
569                                 len, desc->count);
570                 return 0;
571         }
572         desc->offset += len;
573         desc->count -= len;
574         dprintk("RPC:      copied %zu bytes from skb. %zu bytes remain\n",
575                         len, desc->count);
576         return len;
577 }
578
579 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
580 {
581         size_t len, used;
582         char *p;
583
584         p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
585         len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
586         used = xs_tcp_copy_data(desc, p, len);
587         xprt->tcp_offset += used;
588         if (used != len)
589                 return;
590
591         xprt->tcp_reclen = ntohl(xprt->tcp_recm);
592         if (xprt->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
593                 xprt->tcp_flags |= XPRT_LAST_FRAG;
594         else
595                 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
596         xprt->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
597
598         xprt->tcp_flags &= ~XPRT_COPY_RECM;
599         xprt->tcp_offset = 0;
600
601         /* Sanity check of the record length */
602         if (unlikely(xprt->tcp_reclen < 4)) {
603                 dprintk("RPC:      invalid TCP record fragment length\n");
604                 xprt_disconnect(xprt);
605                 return;
606         }
607         dprintk("RPC:      reading TCP record fragment of length %d\n",
608                         xprt->tcp_reclen);
609 }
610
611 static void xs_tcp_check_recm(struct rpc_xprt *xprt)
612 {
613         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
614                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags);
615         if (xprt->tcp_offset == xprt->tcp_reclen) {
616                 xprt->tcp_flags |= XPRT_COPY_RECM;
617                 xprt->tcp_offset = 0;
618                 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
619                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
620                         xprt->tcp_flags |= XPRT_COPY_XID;
621                         xprt->tcp_copied = 0;
622                 }
623         }
624 }
625
626 static inline void xs_tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
627 {
628         size_t len, used;
629         char *p;
630
631         len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
632         dprintk("RPC:      reading XID (%Zu bytes)\n", len);
633         p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
634         used = xs_tcp_copy_data(desc, p, len);
635         xprt->tcp_offset += used;
636         if (used != len)
637                 return;
638         xprt->tcp_flags &= ~XPRT_COPY_XID;
639         xprt->tcp_flags |= XPRT_COPY_DATA;
640         xprt->tcp_copied = 4;
641         dprintk("RPC:      reading reply for XID %08x\n",
642                                                 ntohl(xprt->tcp_xid));
643         xs_tcp_check_recm(xprt);
644 }
645
646 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
647 {
648         struct rpc_rqst *req;
649         struct xdr_buf *rcvbuf;
650         size_t len;
651         ssize_t r;
652
653         /* Find and lock the request corresponding to this xid */
654         spin_lock(&xprt->transport_lock);
655         req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
656         if (!req) {
657                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
658                 dprintk("RPC:      XID %08x request not found!\n",
659                                 ntohl(xprt->tcp_xid));
660                 spin_unlock(&xprt->transport_lock);
661                 return;
662         }
663
664         rcvbuf = &req->rq_private_buf;
665         len = desc->count;
666         if (len > xprt->tcp_reclen - xprt->tcp_offset) {
667                 skb_reader_t my_desc;
668
669                 len = xprt->tcp_reclen - xprt->tcp_offset;
670                 memcpy(&my_desc, desc, sizeof(my_desc));
671                 my_desc.count = len;
672                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
673                                           &my_desc, xs_tcp_copy_data);
674                 desc->count -= r;
675                 desc->offset += r;
676         } else
677                 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
678                                           desc, xs_tcp_copy_data);
679
680         if (r > 0) {
681                 xprt->tcp_copied += r;
682                 xprt->tcp_offset += r;
683         }
684         if (r != len) {
685                 /* Error when copying to the receive buffer,
686                  * usually because we weren't able to allocate
687                  * additional buffer pages. All we can do now
688                  * is turn off XPRT_COPY_DATA, so the request
689                  * will not receive any additional updates,
690                  * and time out.
691                  * Any remaining data from this record will
692                  * be discarded.
693                  */
694                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
695                 dprintk("RPC:      XID %08x truncated request\n",
696                                 ntohl(xprt->tcp_xid));
697                 dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
698                                 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
699                 goto out;
700         }
701
702         dprintk("RPC:      XID %08x read %Zd bytes\n",
703                         ntohl(xprt->tcp_xid), r);
704         dprintk("RPC:      xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
705                         xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen);
706
707         if (xprt->tcp_copied == req->rq_private_buf.buflen)
708                 xprt->tcp_flags &= ~XPRT_COPY_DATA;
709         else if (xprt->tcp_offset == xprt->tcp_reclen) {
710                 if (xprt->tcp_flags & XPRT_LAST_FRAG)
711                         xprt->tcp_flags &= ~XPRT_COPY_DATA;
712         }
713
714 out:
715         if (!(xprt->tcp_flags & XPRT_COPY_DATA))
716                 xprt_complete_rqst(req->rq_task, xprt->tcp_copied);
717         spin_unlock(&xprt->transport_lock);
718         xs_tcp_check_recm(xprt);
719 }
720
721 static inline void xs_tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
722 {
723         size_t len;
724
725         len = xprt->tcp_reclen - xprt->tcp_offset;
726         if (len > desc->count)
727                 len = desc->count;
728         desc->count -= len;
729         desc->offset += len;
730         xprt->tcp_offset += len;
731         dprintk("RPC:      discarded %Zu bytes\n", len);
732         xs_tcp_check_recm(xprt);
733 }
734
735 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
736 {
737         struct rpc_xprt *xprt = rd_desc->arg.data;
738         skb_reader_t desc = {
739                 .skb    = skb,
740                 .offset = offset,
741                 .count  = len,
742                 .csum   = 0
743         };
744
745         dprintk("RPC:      xs_tcp_data_recv started\n");
746         do {
747                 /* Read in a new fragment marker if necessary */
748                 /* Can we ever really expect to get completely empty fragments? */
749                 if (xprt->tcp_flags & XPRT_COPY_RECM) {
750                         xs_tcp_read_fraghdr(xprt, &desc);
751                         continue;
752                 }
753                 /* Read in the xid if necessary */
754                 if (xprt->tcp_flags & XPRT_COPY_XID) {
755                         xs_tcp_read_xid(xprt, &desc);
756                         continue;
757                 }
758                 /* Read in the request data */
759                 if (xprt->tcp_flags & XPRT_COPY_DATA) {
760                         xs_tcp_read_request(xprt, &desc);
761                         continue;
762                 }
763                 /* Skip over any trailing bytes on short reads */
764                 xs_tcp_read_discard(xprt, &desc);
765         } while (desc.count);
766         dprintk("RPC:      xs_tcp_data_recv done\n");
767         return len - desc.count;
768 }
769
770 /**
771  * xs_tcp_data_ready - "data ready" callback for TCP sockets
772  * @sk: socket with data to read
773  * @bytes: how much data to read
774  *
775  */
776 static void xs_tcp_data_ready(struct sock *sk, int bytes)
777 {
778         struct rpc_xprt *xprt;
779         read_descriptor_t rd_desc;
780
781         read_lock(&sk->sk_callback_lock);
782         dprintk("RPC:      xs_tcp_data_ready...\n");
783         if (!(xprt = xprt_from_sock(sk)))
784                 goto out;
785         if (xprt->shutdown)
786                 goto out;
787
788         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
789         rd_desc.arg.data = xprt;
790         rd_desc.count = 65536;
791         tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
792 out:
793         read_unlock(&sk->sk_callback_lock);
794 }
795
796 /**
797  * xs_tcp_state_change - callback to handle TCP socket state changes
798  * @sk: socket whose state has changed
799  *
800  */
801 static void xs_tcp_state_change(struct sock *sk)
802 {
803         struct rpc_xprt *xprt;
804
805         read_lock(&sk->sk_callback_lock);
806         if (!(xprt = xprt_from_sock(sk)))
807                 goto out;
808         dprintk("RPC:      xs_tcp_state_change client %p...\n", xprt);
809         dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
810                                 sk->sk_state, xprt_connected(xprt),
811                                 sock_flag(sk, SOCK_DEAD),
812                                 sock_flag(sk, SOCK_ZAPPED));
813
814         switch (sk->sk_state) {
815         case TCP_ESTABLISHED:
816                 spin_lock_bh(&xprt->transport_lock);
817                 if (!xprt_test_and_set_connected(xprt)) {
818                         /* Reset TCP record info */
819                         xprt->tcp_offset = 0;
820                         xprt->tcp_reclen = 0;
821                         xprt->tcp_copied = 0;
822                         xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
823                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
824                         xprt_wake_pending_tasks(xprt, 0);
825                 }
826                 spin_unlock_bh(&xprt->transport_lock);
827                 break;
828         case TCP_SYN_SENT:
829         case TCP_SYN_RECV:
830                 break;
831         case TCP_CLOSE_WAIT:
832                 /* Try to schedule an autoclose RPC calls */
833                 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
834                 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
835                         schedule_work(&xprt->task_cleanup);
836         default:
837                 xprt_disconnect(xprt);
838         }
839  out:
840         read_unlock(&sk->sk_callback_lock);
841 }
842
843 /**
844  * xs_udp_write_space - callback invoked when socket buffer space
845  *                             becomes available
846  * @sk: socket whose state has changed
847  *
848  * Called when more output buffer space is available for this socket.
849  * We try not to wake our writers until they can make "significant"
850  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
851  * with a bunch of small requests.
852  */
853 static void xs_udp_write_space(struct sock *sk)
854 {
855         read_lock(&sk->sk_callback_lock);
856
857         /* from net/core/sock.c:sock_def_write_space */
858         if (sock_writeable(sk)) {
859                 struct socket *sock;
860                 struct rpc_xprt *xprt;
861
862                 if (unlikely(!(sock = sk->sk_socket)))
863                         goto out;
864                 if (unlikely(!(xprt = xprt_from_sock(sk))))
865                         goto out;
866                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
867                         goto out;
868
869                 xprt_write_space(xprt);
870         }
871
872  out:
873         read_unlock(&sk->sk_callback_lock);
874 }
875
876 /**
877  * xs_tcp_write_space - callback invoked when socket buffer space
878  *                             becomes available
879  * @sk: socket whose state has changed
880  *
881  * Called when more output buffer space is available for this socket.
882  * We try not to wake our writers until they can make "significant"
883  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
884  * with a bunch of small requests.
885  */
886 static void xs_tcp_write_space(struct sock *sk)
887 {
888         read_lock(&sk->sk_callback_lock);
889
890         /* from net/core/stream.c:sk_stream_write_space */
891         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
892                 struct socket *sock;
893                 struct rpc_xprt *xprt;
894
895                 if (unlikely(!(sock = sk->sk_socket)))
896                         goto out;
897                 if (unlikely(!(xprt = xprt_from_sock(sk))))
898                         goto out;
899                 if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
900                         goto out;
901
902                 xprt_write_space(xprt);
903         }
904
905  out:
906         read_unlock(&sk->sk_callback_lock);
907 }
908
909 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
910 {
911         struct sock *sk = xprt->inet;
912
913         if (xprt->rcvsize) {
914                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
915                 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;
916         }
917         if (xprt->sndsize) {
918                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
919                 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
920                 sk->sk_write_space(sk);
921         }
922 }
923
924 /**
925  * xs_udp_set_buffer_size - set send and receive limits
926  * @xprt: generic transport
927  * @sndsize: requested size of send buffer, in bytes
928  * @rcvsize: requested size of receive buffer, in bytes
929  *
930  * Set socket send and receive buffer size limits.
931  */
932 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
933 {
934         xprt->sndsize = 0;
935         if (sndsize)
936                 xprt->sndsize = sndsize + 1024;
937         xprt->rcvsize = 0;
938         if (rcvsize)
939                 xprt->rcvsize = rcvsize + 1024;
940
941         xs_udp_do_set_buffer_size(xprt);
942 }
943
944 /**
945  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
946  * @task: task that timed out
947  *
948  * Adjust the congestion window after a retransmit timeout has occurred.
949  */
950 static void xs_udp_timer(struct rpc_task *task)
951 {
952         xprt_adjust_cwnd(task, -ETIMEDOUT);
953 }
954
955 static unsigned short xs_get_random_port(void)
956 {
957         unsigned short range = xprt_max_resvport - xprt_min_resvport;
958         unsigned short rand = (unsigned short) net_random() % range;
959         return rand + xprt_min_resvport;
960 }
961
962 /**
963  * xs_set_port - reset the port number in the remote endpoint address
964  * @xprt: generic transport
965  * @port: new port number
966  *
967  */
968 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
969 {
970         dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
971         xprt->addr.sin_port = htons(port);
972 }
973
974 static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
975 {
976         struct sockaddr_in myaddr = {
977                 .sin_family = AF_INET,
978         };
979         int err;
980         unsigned short port = xprt->port;
981
982         do {
983                 myaddr.sin_port = htons(port);
984                 err = kernel_bind(sock, (struct sockaddr *) &myaddr,
985                                                 sizeof(myaddr));
986                 if (err == 0) {
987                         xprt->port = port;
988                         dprintk("RPC:      xs_bindresvport bound to port %u\n",
989                                         port);
990                         return 0;
991                 }
992                 if (port <= xprt_min_resvport)
993                         port = xprt_max_resvport;
994                 else
995                         port--;
996         } while (err == -EADDRINUSE && port != xprt->port);
997
998         dprintk("RPC:      can't bind to reserved port (%d).\n", -err);
999         return err;
1000 }
1001
1002 /**
1003  * xs_udp_connect_worker - set up a UDP socket
1004  * @args: RPC transport to connect
1005  *
1006  * Invoked by a work queue tasklet.
1007  */
1008 static void xs_udp_connect_worker(void *args)
1009 {
1010         struct rpc_xprt *xprt = (struct rpc_xprt *) args;
1011         struct socket *sock = xprt->sock;
1012         int err, status = -EIO;
1013
1014         if (xprt->shutdown || xprt->addr.sin_port == 0)
1015                 goto out;
1016
1017         dprintk("RPC:      xs_udp_connect_worker for xprt %p\n", xprt);
1018
1019         /* Start by resetting any existing state */
1020         xs_close(xprt);
1021
1022         if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1023                 dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
1024                 goto out;
1025         }
1026
1027         if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
1028                 sock_release(sock);
1029                 goto out;
1030         }
1031
1032         if (!xprt->inet) {
1033                 struct sock *sk = sock->sk;
1034
1035                 write_lock_bh(&sk->sk_callback_lock);
1036
1037                 sk->sk_user_data = xprt;
1038                 xprt->old_data_ready = sk->sk_data_ready;
1039                 xprt->old_state_change = sk->sk_state_change;
1040                 xprt->old_write_space = sk->sk_write_space;
1041                 sk->sk_data_ready = xs_udp_data_ready;
1042                 sk->sk_write_space = xs_udp_write_space;
1043                 sk->sk_no_check = UDP_CSUM_NORCV;
1044                 sk->sk_allocation = GFP_ATOMIC;
1045
1046                 xprt_set_connected(xprt);
1047
1048                 /* Reset to new socket */
1049                 xprt->sock = sock;
1050                 xprt->inet = sk;
1051
1052                 write_unlock_bh(&sk->sk_callback_lock);
1053         }
1054         xs_udp_do_set_buffer_size(xprt);
1055         status = 0;
1056 out:
1057         xprt_wake_pending_tasks(xprt, status);
1058         xprt_clear_connecting(xprt);
1059 }
1060
1061 /*
1062  * We need to preserve the port number so the reply cache on the server can
1063  * find our cached RPC replies when we get around to reconnecting.
1064  */
1065 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1066 {
1067         int result;
1068         struct socket *sock = xprt->sock;
1069         struct sockaddr any;
1070
1071         dprintk("RPC:      disconnecting xprt %p to reuse port\n", xprt);
1072
1073         /*
1074          * Disconnect the transport socket by doing a connect operation
1075          * with AF_UNSPEC.  This should return immediately...
1076          */
1077         memset(&any, 0, sizeof(any));
1078         any.sa_family = AF_UNSPEC;
1079         result = kernel_connect(sock, &any, sizeof(any), 0);
1080         if (result)
1081                 dprintk("RPC:      AF_UNSPEC connect return code %d\n",
1082                                 result);
1083 }
1084
1085 /**
1086  * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1087  * @args: RPC transport to connect
1088  *
1089  * Invoked by a work queue tasklet.
1090  */
1091 static void xs_tcp_connect_worker(void *args)
1092 {
1093         struct rpc_xprt *xprt = (struct rpc_xprt *)args;
1094         struct socket *sock = xprt->sock;
1095         int err, status = -EIO;
1096
1097         if (xprt->shutdown || xprt->addr.sin_port == 0)
1098                 goto out;
1099
1100         dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);
1101
1102         if (!xprt->sock) {
1103                 /* start from scratch */
1104                 if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1105                         dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
1106                         goto out;
1107                 }
1108
1109                 if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
1110                         sock_release(sock);
1111                         goto out;
1112                 }
1113         } else
1114                 /* "close" the socket, preserving the local port */
1115                 xs_tcp_reuse_connection(xprt);
1116
1117         if (!xprt->inet) {
1118                 struct sock *sk = sock->sk;
1119
1120                 write_lock_bh(&sk->sk_callback_lock);
1121
1122                 sk->sk_user_data = xprt;
1123                 xprt->old_data_ready = sk->sk_data_ready;
1124                 xprt->old_state_change = sk->sk_state_change;
1125                 xprt->old_write_space = sk->sk_write_space;
1126                 sk->sk_data_ready = xs_tcp_data_ready;
1127                 sk->sk_state_change = xs_tcp_state_change;
1128                 sk->sk_write_space = xs_tcp_write_space;
1129                 sk->sk_allocation = GFP_ATOMIC;
1130
1131                 /* socket options */
1132                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1133                 sock_reset_flag(sk, SOCK_LINGER);
1134                 tcp_sk(sk)->linger2 = 0;
1135                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1136
1137                 xprt_clear_connected(xprt);
1138
1139                 /* Reset to new socket */
1140                 xprt->sock = sock;
1141                 xprt->inet = sk;
1142
1143                 write_unlock_bh(&sk->sk_callback_lock);
1144         }
1145
1146         /* Tell the socket layer to start connecting... */
1147         xprt->stat.connect_count++;
1148         xprt->stat.connect_start = jiffies;
1149         status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1150                         sizeof(xprt->addr), O_NONBLOCK);
1151         dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
1152                         xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1153         if (status < 0) {
1154                 switch (status) {
1155                         case -EINPROGRESS:
1156                         case -EALREADY:
1157                                 goto out_clear;
1158                         case -ECONNREFUSED:
1159                         case -ECONNRESET:
1160                                 /* retry with existing socket, after a delay */
1161                                 break;
1162                         default:
1163                                 /* get rid of existing socket, and retry */
1164                                 xs_close(xprt);
1165                                 break;
1166                 }
1167         }
1168 out:
1169         xprt_wake_pending_tasks(xprt, status);
1170 out_clear:
1171         xprt_clear_connecting(xprt);
1172 }
1173
1174 /**
1175  * xs_connect - connect a socket to a remote endpoint
1176  * @task: address of RPC task that manages state of connect request
1177  *
1178  * TCP: If the remote end dropped the connection, delay reconnecting.
1179  *
1180  * UDP socket connects are synchronous, but we use a work queue anyway
1181  * to guarantee that even unprivileged user processes can set up a
1182  * socket on a privileged port.
1183  *
1184  * If a UDP socket connect fails, the delay behavior here prevents
1185  * retry floods (hard mounts).
1186  */
1187 static void xs_connect(struct rpc_task *task)
1188 {
1189         struct rpc_xprt *xprt = task->tk_xprt;
1190
1191         if (xprt_test_and_set_connecting(xprt))
1192                 return;
1193
1194         if (xprt->sock != NULL) {
1195                 dprintk("RPC:      xs_connect delayed xprt %p for %lu seconds\n",
1196                                 xprt, xprt->reestablish_timeout / HZ);
1197                 schedule_delayed_work(&xprt->connect_worker,
1198                                         xprt->reestablish_timeout);
1199                 xprt->reestablish_timeout <<= 1;
1200                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1201                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1202         } else {
1203                 dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
1204                 schedule_work(&xprt->connect_worker);
1205
1206                 /* flush_scheduled_work can sleep... */
1207                 if (!RPC_IS_ASYNC(task))
1208                         flush_scheduled_work();
1209         }
1210 }
1211
1212 /**
1213  * xs_udp_print_stats - display UDP socket-specifc stats
1214  * @xprt: rpc_xprt struct containing statistics
1215  * @seq: output file
1216  *
1217  */
1218 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1219 {
1220         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1221                         xprt->port,
1222                         xprt->stat.bind_count,
1223                         xprt->stat.sends,
1224                         xprt->stat.recvs,
1225                         xprt->stat.bad_xids,
1226                         xprt->stat.req_u,
1227                         xprt->stat.bklog_u);
1228 }
1229
1230 /**
1231  * xs_tcp_print_stats - display TCP socket-specifc stats
1232  * @xprt: rpc_xprt struct containing statistics
1233  * @seq: output file
1234  *
1235  */
1236 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1237 {
1238         long idle_time = 0;
1239
1240         if (xprt_connected(xprt))
1241                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
1242
1243         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1244                         xprt->port,
1245                         xprt->stat.bind_count,
1246                         xprt->stat.connect_count,
1247                         xprt->stat.connect_time,
1248                         idle_time,
1249                         xprt->stat.sends,
1250                         xprt->stat.recvs,
1251                         xprt->stat.bad_xids,
1252                         xprt->stat.req_u,
1253                         xprt->stat.bklog_u);
1254 }
1255
1256 static struct rpc_xprt_ops xs_udp_ops = {
1257         .set_buffer_size        = xs_udp_set_buffer_size,
1258         .reserve_xprt           = xprt_reserve_xprt_cong,
1259         .release_xprt           = xprt_release_xprt_cong,
1260         .set_port               = xs_set_port,
1261         .connect                = xs_connect,
1262         .buf_alloc              = rpc_malloc,
1263         .buf_free               = rpc_free,
1264         .send_request           = xs_udp_send_request,
1265         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
1266         .timer                  = xs_udp_timer,
1267         .release_request        = xprt_release_rqst_cong,
1268         .close                  = xs_close,
1269         .destroy                = xs_destroy,
1270         .print_stats            = xs_udp_print_stats,
1271 };
1272
1273 static struct rpc_xprt_ops xs_tcp_ops = {
1274         .reserve_xprt           = xprt_reserve_xprt,
1275         .release_xprt           = xs_tcp_release_xprt,
1276         .set_port               = xs_set_port,
1277         .connect                = xs_connect,
1278         .buf_alloc              = rpc_malloc,
1279         .buf_free               = rpc_free,
1280         .send_request           = xs_tcp_send_request,
1281         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
1282         .close                  = xs_close,
1283         .destroy                = xs_destroy,
1284         .print_stats            = xs_tcp_print_stats,
1285 };
1286
1287 /**
1288  * xs_setup_udp - Set up transport to use a UDP socket
1289  * @xprt: transport to set up
1290  * @to:   timeout parameters
1291  *
1292  */
1293 int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1294 {
1295         size_t slot_table_size;
1296
1297         dprintk("RPC:      setting up udp-ipv4 transport...\n");
1298
1299         xprt->max_reqs = xprt_udp_slot_table_entries;
1300         slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
1301         xprt->slot = kzalloc(slot_table_size, GFP_KERNEL);
1302         if (xprt->slot == NULL)
1303                 return -ENOMEM;
1304
1305         xprt->prot = IPPROTO_UDP;
1306         xprt->port = xs_get_random_port();
1307         xprt->tsh_size = 0;
1308         xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1309         /* XXX: header size can vary due to auth type, IPv6, etc. */
1310         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1311
1312         INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
1313         xprt->bind_timeout = XS_BIND_TO;
1314         xprt->connect_timeout = XS_UDP_CONN_TO;
1315         xprt->reestablish_timeout = XS_UDP_REEST_TO;
1316         xprt->idle_timeout = XS_IDLE_DISC_TO;
1317
1318         xprt->ops = &xs_udp_ops;
1319
1320         if (to)
1321                 xprt->timeout = *to;
1322         else
1323                 xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1324
1325         return 0;
1326 }
1327
1328 /**
1329  * xs_setup_tcp - Set up transport to use a TCP socket
1330  * @xprt: transport to set up
1331  * @to: timeout parameters
1332  *
1333  */
1334 int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
1335 {
1336         size_t slot_table_size;
1337
1338         dprintk("RPC:      setting up tcp-ipv4 transport...\n");
1339
1340         xprt->max_reqs = xprt_tcp_slot_table_entries;
1341         slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
1342         xprt->slot = kzalloc(slot_table_size, GFP_KERNEL);
1343         if (xprt->slot == NULL)
1344                 return -ENOMEM;
1345
1346         xprt->prot = IPPROTO_TCP;
1347         xprt->port = xs_get_random_port();
1348         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1349         xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1350         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1351
1352         INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
1353         xprt->bind_timeout = XS_BIND_TO;
1354         xprt->connect_timeout = XS_TCP_CONN_TO;
1355         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1356         xprt->idle_timeout = XS_IDLE_DISC_TO;
1357
1358         xprt->ops = &xs_tcp_ops;
1359
1360         if (to)
1361                 xprt->timeout = *to;
1362         else
1363                 xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1364
1365         return 0;
1366 }