missing bits of "iov_iter: Separate type from direction and use accessor functions"
[sfrench/cifs-2.6.git] / net / sunrpc / xprtsock.c
index 6b7539c0466e85b164512de3845f11dec404c16d..ae77c71c1f640c32fd8d673934be412da9da0189 100644 (file)
 #include <net/checksum.h>
 #include <net/udp.h>
 #include <net/tcp.h>
+#include <linux/bvec.h>
+#include <linux/uio.h>
 
 #include <trace/events/sunrpc.h>
 
 #include "sunrpc.h"
 
-#define RPC_TCP_READ_CHUNK_SZ  (3*512*1024)
-
 static void xs_close(struct rpc_xprt *xprt);
 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
                struct socket *sock);
@@ -129,7 +129,7 @@ static struct ctl_table xs_tunables_table[] = {
                .mode           = 0644,
                .proc_handler   = proc_dointvec_minmax,
                .extra1         = &xprt_min_resvport_limit,
-               .extra2         = &xprt_max_resvport
+               .extra2         = &xprt_max_resvport_limit
        },
        {
                .procname       = "max_resvport",
@@ -137,7 +137,7 @@ static struct ctl_table xs_tunables_table[] = {
                .maxlen         = sizeof(unsigned int),
                .mode           = 0644,
                .proc_handler   = proc_dointvec_minmax,
-               .extra1         = &xprt_min_resvport,
+               .extra1         = &xprt_min_resvport_limit,
                .extra2         = &xprt_max_resvport_limit
        },
        {
@@ -325,6 +325,362 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
                }
 }
 
+static size_t
+xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
+{
+       size_t i,n;
+
+       if (!(buf->flags & XDRBUF_SPARSE_PAGES))
+               return want;
+       if (want > buf->page_len)
+               want = buf->page_len;
+       n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       for (i = 0; i < n; i++) {
+               if (buf->pages[i])
+                       continue;
+               buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
+               if (!buf->pages[i]) {
+                       buf->page_len = (i * PAGE_SIZE) - buf->page_base;
+                       return buf->page_len;
+               }
+       }
+       return want;
+}
+
+static ssize_t
+xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
+{
+       ssize_t ret;
+       if (seek != 0)
+               iov_iter_advance(&msg->msg_iter, seek);
+       ret = sock_recvmsg(sock, msg, flags);
+       return ret > 0 ? ret + seek : ret;
+}
+
+static ssize_t
+xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
+               struct kvec *kvec, size_t count, size_t seek)
+{
+       iov_iter_kvec(&msg->msg_iter, READ, kvec, 1, count);
+       return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
+               struct bio_vec *bvec, unsigned long nr, size_t count,
+               size_t seek)
+{
+       iov_iter_bvec(&msg->msg_iter, READ, bvec, nr, count);
+       return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
+               size_t count)
+{
+       struct kvec kvec = { 0 };
+       return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
+}
+
+static ssize_t
+xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
+               struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
+{
+       size_t want, seek_init = seek, offset = 0;
+       ssize_t ret;
+
+       if (seek < buf->head[0].iov_len) {
+               want = min_t(size_t, count, buf->head[0].iov_len);
+               ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+               seek = 0;
+       } else {
+               seek -= buf->head[0].iov_len;
+               offset += buf->head[0].iov_len;
+       }
+       if (seek < buf->page_len) {
+               want = xs_alloc_sparse_pages(buf,
+                               min_t(size_t, count - offset, buf->page_len),
+                               GFP_NOWAIT);
+               ret = xs_read_bvec(sock, msg, flags, buf->bvec,
+                               xdr_buf_pagecount(buf),
+                               want + buf->page_base,
+                               seek + buf->page_base);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret - buf->page_base;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+               seek = 0;
+       } else {
+               seek -= buf->page_len;
+               offset += buf->page_len;
+       }
+       if (seek < buf->tail[0].iov_len) {
+               want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+               ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
+               if (ret <= 0)
+                       goto sock_err;
+               offset += ret;
+               if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+                       goto out;
+               if (ret != want)
+                       goto eagain;
+       } else
+               offset += buf->tail[0].iov_len;
+       ret = -EMSGSIZE;
+       msg->msg_flags |= MSG_TRUNC;
+out:
+       *read = offset - seek_init;
+       return ret;
+eagain:
+       ret = -EAGAIN;
+       goto out;
+sock_err:
+       offset += seek;
+       goto out;
+}
+
+static void
+xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
+{
+       if (!transport->recv.copied) {
+               if (buf->head[0].iov_len >= transport->recv.offset)
+                       memcpy(buf->head[0].iov_base,
+                                       &transport->recv.xid,
+                                       transport->recv.offset);
+               transport->recv.copied = transport->recv.offset;
+       }
+}
+
+static bool
+xs_read_stream_request_done(struct sock_xprt *transport)
+{
+       return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
+}
+
+static ssize_t
+xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
+               int flags, struct rpc_rqst *req)
+{
+       struct xdr_buf *buf = &req->rq_private_buf;
+       size_t want, read;
+       ssize_t ret;
+
+       xs_read_header(transport, buf);
+
+       want = transport->recv.len - transport->recv.offset;
+       ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+                       transport->recv.copied + want, transport->recv.copied,
+                       &read);
+       transport->recv.offset += read;
+       transport->recv.copied += read;
+       if (transport->recv.offset == transport->recv.len) {
+               if (xs_read_stream_request_done(transport))
+                       msg->msg_flags |= MSG_EOR;
+               return transport->recv.copied;
+       }
+
+       switch (ret) {
+       case -EMSGSIZE:
+               return transport->recv.copied;
+       case 0:
+               return -ESHUTDOWN;
+       default:
+               if (ret < 0)
+                       return ret;
+       }
+       return -EAGAIN;
+}
+
+static size_t
+xs_read_stream_headersize(bool isfrag)
+{
+       if (isfrag)
+               return sizeof(__be32);
+       return 3 * sizeof(__be32);
+}
+
+static ssize_t
+xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
+               int flags, size_t want, size_t seek)
+{
+       struct kvec kvec = {
+               .iov_base = &transport->recv.fraghdr,
+               .iov_len = want,
+       };
+       return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
+}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       struct rpc_xprt *xprt = &transport->xprt;
+       struct rpc_rqst *req;
+       ssize_t ret;
+
+       /* Look up and lock the request corresponding to the given XID */
+       req = xprt_lookup_bc_request(xprt, transport->recv.xid);
+       if (!req) {
+               printk(KERN_WARNING "Callback slot table overflowed\n");
+               return -ESHUTDOWN;
+       }
+
+       ret = xs_read_stream_request(transport, msg, flags, req);
+       if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+               xprt_complete_bc_request(req, ret);
+
+       return ret;
+}
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       return -ESHUTDOWN;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+static ssize_t
+xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+       struct rpc_xprt *xprt = &transport->xprt;
+       struct rpc_rqst *req;
+       ssize_t ret = 0;
+
+       /* Look up and lock the request corresponding to the given XID */
+       spin_lock(&xprt->queue_lock);
+       req = xprt_lookup_rqst(xprt, transport->recv.xid);
+       if (!req) {
+               msg->msg_flags |= MSG_TRUNC;
+               goto out;
+       }
+       xprt_pin_rqst(req);
+       spin_unlock(&xprt->queue_lock);
+
+       ret = xs_read_stream_request(transport, msg, flags, req);
+
+       spin_lock(&xprt->queue_lock);
+       if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+               xprt_complete_rqst(req->rq_task, ret);
+       xprt_unpin_rqst(req);
+out:
+       spin_unlock(&xprt->queue_lock);
+       return ret;
+}
+
+static ssize_t
+xs_read_stream(struct sock_xprt *transport, int flags)
+{
+       struct msghdr msg = { 0 };
+       size_t want, read = 0;
+       ssize_t ret = 0;
+
+       if (transport->recv.len == 0) {
+               want = xs_read_stream_headersize(transport->recv.copied != 0);
+               ret = xs_read_stream_header(transport, &msg, flags, want,
+                               transport->recv.offset);
+               if (ret <= 0)
+                       goto out_err;
+               transport->recv.offset = ret;
+               if (ret != want) {
+                       ret = -EAGAIN;
+                       goto out_err;
+               }
+               transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
+                       RPC_FRAGMENT_SIZE_MASK;
+               transport->recv.offset -= sizeof(transport->recv.fraghdr);
+               read = ret;
+       }
+
+       switch (be32_to_cpu(transport->recv.calldir)) {
+       case RPC_CALL:
+               ret = xs_read_stream_call(transport, &msg, flags);
+               break;
+       case RPC_REPLY:
+               ret = xs_read_stream_reply(transport, &msg, flags);
+       }
+       if (msg.msg_flags & MSG_TRUNC) {
+               transport->recv.calldir = cpu_to_be32(-1);
+               transport->recv.copied = -1;
+       }
+       if (ret < 0)
+               goto out_err;
+       read += ret;
+       if (transport->recv.offset < transport->recv.len) {
+               ret = xs_read_discard(transport->sock, &msg, flags,
+                               transport->recv.len - transport->recv.offset);
+               if (ret <= 0)
+                       goto out_err;
+               transport->recv.offset += ret;
+               read += ret;
+               if (transport->recv.offset != transport->recv.len)
+                       return -EAGAIN;
+       }
+       if (xs_read_stream_request_done(transport)) {
+               trace_xs_stream_read_request(transport);
+               transport->recv.copied = 0;
+       }
+       transport->recv.offset = 0;
+       transport->recv.len = 0;
+       return read;
+out_err:
+       switch (ret) {
+       case 0:
+       case -ESHUTDOWN:
+               xprt_force_disconnect(&transport->xprt);
+               return -ESHUTDOWN;
+       }
+       return ret;
+}
+
+static void xs_stream_data_receive(struct sock_xprt *transport)
+{
+       size_t read = 0;
+       ssize_t ret = 0;
+
+       mutex_lock(&transport->recv_mutex);
+       if (transport->sock == NULL)
+               goto out;
+       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+       for (;;) {
+               ret = xs_read_stream(transport, MSG_DONTWAIT);
+               if (ret <= 0)
+                       break;
+               read += ret;
+               cond_resched();
+       }
+out:
+       mutex_unlock(&transport->recv_mutex);
+       trace_xs_stream_read_data(&transport->xprt, ret, read);
+}
+
+static void xs_stream_data_receive_workfn(struct work_struct *work)
+{
+       struct sock_xprt *transport =
+               container_of(work, struct sock_xprt, recv_worker);
+       xs_stream_data_receive(transport);
+}
+
+static void
+xs_stream_reset_connect(struct sock_xprt *transport)
+{
+       transport->recv.offset = 0;
+       transport->recv.len = 0;
+       transport->recv.copied = 0;
+       transport->xmit.offset = 0;
+       transport->xprt.stat.connect_count++;
+       transport->xprt.stat.connect_start = jiffies;
+}
+
 #define XS_SENDMSG_FLAGS       (MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -440,28 +796,21 @@ out:
        return err;
 }
 
-static void xs_nospace_callback(struct rpc_task *task)
-{
-       struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
-
-       transport->inet->sk_write_pending--;
-}
-
 /**
- * xs_nospace - place task on wait queue if transmit was incomplete
- * @task: task to put to sleep
+ * xs_nospace - handle transmit was incomplete
+ * @req: pointer to RPC request
  *
  */
-static int xs_nospace(struct rpc_task *task)
+static int xs_nospace(struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct sock *sk = transport->inet;
        int ret = -EAGAIN;
 
        dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
-                       task->tk_pid, req->rq_slen - req->rq_bytes_sent,
+                       req->rq_task->tk_pid,
+                       req->rq_slen - transport->xmit.offset,
                        req->rq_slen);
 
        /* Protect against races with write_space */
@@ -471,7 +820,7 @@ static int xs_nospace(struct rpc_task *task)
        if (xprt_connected(xprt)) {
                /* wait for more buffer space */
                sk->sk_write_pending++;
-               xprt_wait_for_buffer_space(task, xs_nospace_callback);
+               xprt_wait_for_buffer_space(xprt);
        } else
                ret = -ENOTCONN;
 
@@ -491,6 +840,22 @@ static int xs_nospace(struct rpc_task *task)
        return ret;
 }
 
+static void
+xs_stream_prepare_request(struct rpc_rqst *req)
+{
+       req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+}
+
+/*
+ * Determine if the previous message in the stream was aborted before it
+ * could complete transmission.
+ */
+static bool
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
+{
+       return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
+}
+
 /*
  * Construct a stream transport record marker in @buf.
  */
@@ -503,7 +868,7 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
 
 /**
  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
- * @task: RPC task that manages the state of an RPC request
+ * @req: pointer to RPC request
  *
  * Return values:
  *        0:   The request has been sent
@@ -512,9 +877,8 @@ static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
  * ENOTCONN:   Caller needs to invoke connect logic then call again
  *    other:   Some other error occured, the request was not sent
  */
-static int xs_local_send_request(struct rpc_task *task)
+static int xs_local_send_request(struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport =
                                container_of(xprt, struct sock_xprt, xprt);
@@ -522,25 +886,34 @@ static int xs_local_send_request(struct rpc_task *task)
        int status;
        int sent = 0;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               xs_close(xprt);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 
        req->rq_xtime = ktime_get();
-       status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent,
+       status = xs_sendpages(transport->sock, NULL, 0, xdr,
+                             transport->xmit.offset,
                              true, &sent);
        dprintk("RPC:       %s(%u) = %d\n",
-                       __func__, xdr->len - req->rq_bytes_sent, status);
+                       __func__, xdr->len - transport->xmit.offset, status);
 
        if (status == -EAGAIN && sock_writeable(transport->inet))
                status = -ENOBUFS;
 
        if (likely(sent > 0) || status == 0) {
-               req->rq_bytes_sent += sent;
-               req->rq_xmit_bytes_sent += sent;
+               transport->xmit.offset += sent;
+               req->rq_bytes_sent = transport->xmit.offset;
                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+                       req->rq_xmit_bytes_sent += transport->xmit.offset;
                        req->rq_bytes_sent = 0;
+                       transport->xmit.offset = 0;
                        return 0;
                }
                status = -EAGAIN;
@@ -550,7 +923,7 @@ static int xs_local_send_request(struct rpc_task *task)
        case -ENOBUFS:
                break;
        case -EAGAIN:
-               status = xs_nospace(task);
+               status = xs_nospace(req);
                break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
@@ -566,7 +939,7 @@ static int xs_local_send_request(struct rpc_task *task)
 
 /**
  * xs_udp_send_request - write an RPC request to a UDP socket
- * @task: address of RPC task that manages the state of an RPC request
+ * @req: pointer to RPC request
  *
  * Return values:
  *        0:   The request has been sent
@@ -575,9 +948,8 @@ static int xs_local_send_request(struct rpc_task *task)
  * ENOTCONN:   Caller needs to invoke connect logic then call again
  *    other:   Some other error occurred, the request was not sent
  */
-static int xs_udp_send_request(struct rpc_task *task)
+static int xs_udp_send_request(struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -590,12 +962,16 @@ static int xs_udp_send_request(struct rpc_task *task)
 
        if (!xprt_bound(xprt))
                return -ENOTCONN;
+
+       if (!xprt_request_get_cong(xprt, req))
+               return -EBADSLT;
+
        req->rq_xtime = ktime_get();
        status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
-                             xdr, req->rq_bytes_sent, true, &sent);
+                             xdr, 0, true, &sent);
 
        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
-                       xdr->len - req->rq_bytes_sent, status);
+                       xdr->len, status);
 
        /* firewall is blocking us, don't return -EAGAIN or we end up looping */
        if (status == -EPERM)
@@ -619,7 +995,7 @@ process_status:
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               status = xs_nospace(task);
+               status = xs_nospace(req);
                break;
        case -ENETUNREACH:
        case -ENOBUFS:
@@ -639,7 +1015,7 @@ process_status:
 
 /**
  * xs_tcp_send_request - write an RPC request to a TCP socket
- * @task: address of RPC task that manages the state of an RPC request
+ * @req: pointer to RPC request
  *
  * Return values:
  *        0:   The request has been sent
@@ -651,9 +1027,8 @@ process_status:
  * XXX: In the case of soft timeouts, should we eventually give up
  *     if sendmsg is not able to make progress?
  */
-static int xs_tcp_send_request(struct rpc_task *task)
+static int xs_tcp_send_request(struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *xdr = &req->rq_snd_buf;
@@ -662,6 +1037,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
        int status;
        int sent;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               if (transport->sock != NULL)
+                       kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
@@ -671,7 +1053,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
         * completes while the socket holds a reference to the pages,
         * then we may end up resending corrupted data.
         */
-       if (task->tk_flags & RPC_TASK_SENT)
+       if (req->rq_task->tk_flags & RPC_TASK_SENT)
                zerocopy = false;
 
        if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
@@ -684,17 +1066,20 @@ static int xs_tcp_send_request(struct rpc_task *task)
        while (1) {
                sent = 0;
                status = xs_sendpages(transport->sock, NULL, 0, xdr,
-                                     req->rq_bytes_sent, zerocopy, &sent);
+                                     transport->xmit.offset,
+                                     zerocopy, &sent);
 
                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
-                               xdr->len - req->rq_bytes_sent, status);
+                               xdr->len - transport->xmit.offset, status);
 
                /* If we've sent the entire packet, immediately
                 * reset the count of bytes sent. */
-               req->rq_bytes_sent += sent;
-               req->rq_xmit_bytes_sent += sent;
+               transport->xmit.offset += sent;
+               req->rq_bytes_sent = transport->xmit.offset;
                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
+                       req->rq_xmit_bytes_sent += transport->xmit.offset;
                        req->rq_bytes_sent = 0;
+                       transport->xmit.offset = 0;
                        return 0;
                }
 
@@ -732,7 +1117,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               status = xs_nospace(task);
+               status = xs_nospace(req);
                break;
        case -ECONNRESET:
        case -ECONNREFUSED:
@@ -749,35 +1134,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
        return status;
 }
 
-/**
- * xs_tcp_release_xprt - clean up after a tcp transmission
- * @xprt: transport
- * @task: rpc task
- *
- * This cleans up if an error causes us to abort the transmission of a request.
- * In this case, the socket may need to be reset in order to avoid confusing
- * the server.
- */
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-       struct rpc_rqst *req;
-
-       if (task != xprt->snd_task)
-               return;
-       if (task == NULL)
-               goto out_release;
-       req = task->tk_rqstp;
-       if (req == NULL)
-               goto out_release;
-       if (req->rq_bytes_sent == 0)
-               goto out_release;
-       if (req->rq_bytes_sent == req->rq_snd_buf.len)
-               goto out_release;
-       set_bit(XPRT_CLOSE_WAIT, &xprt->state);
-out_release:
-       xprt_release_xprt(xprt, task);
-}
-
 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 {
        transport->old_data_ready = sk->sk_data_ready;
@@ -921,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt)
        module_put(THIS_MODULE);
 }
 
-static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
-{
-       struct xdr_skb_reader desc = {
-               .skb            = skb,
-               .offset         = sizeof(rpc_fraghdr),
-               .count          = skb->len - sizeof(rpc_fraghdr),
-       };
-
-       if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
-               return -1;
-       if (desc.count)
-               return -1;
-       return 0;
-}
-
-/**
- * xs_local_data_read_skb
- * @xprt: transport
- * @sk: socket
- * @skb: skbuff
- *
- * Currently this assumes we can read the whole reply in a single gulp.
- */
-static void xs_local_data_read_skb(struct rpc_xprt *xprt,
-               struct sock *sk,
-               struct sk_buff *skb)
-{
-       struct rpc_task *task;
-       struct rpc_rqst *rovr;
-       int repsize, copied;
-       u32 _xid;
-       __be32 *xp;
-
-       repsize = skb->len - sizeof(rpc_fraghdr);
-       if (repsize < 4) {
-               dprintk("RPC:       impossible RPC reply size %d\n", repsize);
-               return;
-       }
-
-       /* Copy the XID from the skb... */
-       xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
-       if (xp == NULL)
-               return;
-
-       /* Look up and lock the request corresponding to the given XID */
-       spin_lock(&xprt->recv_lock);
-       rovr = xprt_lookup_rqst(xprt, *xp);
-       if (!rovr)
-               goto out_unlock;
-       xprt_pin_rqst(rovr);
-       spin_unlock(&xprt->recv_lock);
-       task = rovr->rq_task;
-
-       copied = rovr->rq_private_buf.buflen;
-       if (copied > repsize)
-               copied = repsize;
-
-       if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
-               dprintk("RPC:       sk_buff copy failed\n");
-               spin_lock(&xprt->recv_lock);
-               goto out_unpin;
-       }
-
-       spin_lock(&xprt->recv_lock);
-       xprt_complete_rqst(task, copied);
-out_unpin:
-       xprt_unpin_rqst(rovr);
- out_unlock:
-       spin_unlock(&xprt->recv_lock);
-}
-
-static void xs_local_data_receive(struct sock_xprt *transport)
-{
-       struct sk_buff *skb;
-       struct sock *sk;
-       int err;
-
-restart:
-       mutex_lock(&transport->recv_mutex);
-       sk = transport->inet;
-       if (sk == NULL)
-               goto out;
-       for (;;) {
-               skb = skb_recv_datagram(sk, 0, 1, &err);
-               if (skb != NULL) {
-                       xs_local_data_read_skb(&transport->xprt, sk, skb);
-                       skb_free_datagram(sk, skb);
-                       continue;
-               }
-               if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-                       break;
-               if (need_resched()) {
-                       mutex_unlock(&transport->recv_mutex);
-                       cond_resched();
-                       goto restart;
-               }
-       }
-out:
-       mutex_unlock(&transport->recv_mutex);
-}
-
-static void xs_local_data_receive_workfn(struct work_struct *work)
-{
-       struct sock_xprt *transport =
-               container_of(work, struct sock_xprt, recv_worker);
-       xs_local_data_receive(transport);
-}
-
 /**
  * xs_udp_data_read_skb - receive callback for UDP sockets
  * @xprt: transport
@@ -1058,13 +1306,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
                return;
 
        /* Look up and lock the request corresponding to the given XID */
-       spin_lock(&xprt->recv_lock);
+       spin_lock(&xprt->queue_lock);
        rovr = xprt_lookup_rqst(xprt, *xp);
        if (!rovr)
                goto out_unlock;
        xprt_pin_rqst(rovr);
        xprt_update_rtt(rovr->rq_task);
-       spin_unlock(&xprt->recv_lock);
+       spin_unlock(&xprt->queue_lock);
        task = rovr->rq_task;
 
        if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1072,7 +1320,7 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 
        /* Suck it into the iovec, verify checksum if not done by hw. */
        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
-               spin_lock(&xprt->recv_lock);
+               spin_lock(&xprt->queue_lock);
                __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
                goto out_unpin;
        }
@@ -1081,13 +1329,13 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
        spin_lock_bh(&xprt->transport_lock);
        xprt_adjust_cwnd(xprt, task, copied);
        spin_unlock_bh(&xprt->transport_lock);
-       spin_lock(&xprt->recv_lock);
+       spin_lock(&xprt->queue_lock);
        xprt_complete_rqst(task, copied);
        __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 out_unpin:
        xprt_unpin_rqst(rovr);
  out_unlock:
-       spin_unlock(&xprt->recv_lock);
+       spin_unlock(&xprt->queue_lock);
 }
 
 static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1096,25 +1344,18 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
        struct sock *sk;
        int err;
 
-restart:
        mutex_lock(&transport->recv_mutex);
        sk = transport->inet;
        if (sk == NULL)
                goto out;
+       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
        for (;;) {
                skb = skb_recv_udp(sk, 0, 1, &err);
-               if (skb != NULL) {
-                       xs_udp_data_read_skb(&transport->xprt, sk, skb);
-                       consume_skb(skb);
-                       continue;
-               }
-               if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+               if (skb == NULL)
                        break;
-               if (need_resched()) {
-                       mutex_unlock(&transport->recv_mutex);
-                       cond_resched();
-                       goto restart;
-               }
+               xs_udp_data_read_skb(&transport->xprt, sk, skb);
+               consume_skb(skb);
+               cond_resched();
        }
 out:
        mutex_unlock(&transport->recv_mutex);
@@ -1163,263 +1404,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
        xprt_force_disconnect(xprt);
 }
 
-static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       size_t len, used;
-       char *p;
-
-       p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
-       len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->tcp_offset += used;
-       if (used != len)
-               return;
-
-       transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
-       if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
-               transport->tcp_flags |= TCP_RCV_LAST_FRAG;
-       else
-               transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
-       transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
-
-       transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
-       transport->tcp_offset = 0;
-
-       /* Sanity check of the record length */
-       if (unlikely(transport->tcp_reclen < 8)) {
-               dprintk("RPC:       invalid TCP record fragment length\n");
-               xs_tcp_force_close(xprt);
-               return;
-       }
-       dprintk("RPC:       reading TCP record fragment of length %d\n",
-                       transport->tcp_reclen);
-}
-
-static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
-{
-       if (transport->tcp_offset == transport->tcp_reclen) {
-               transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
-               transport->tcp_offset = 0;
-               if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
-                       transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-                       transport->tcp_flags |= TCP_RCV_COPY_XID;
-                       transport->tcp_copied = 0;
-               }
-       }
-}
-
-static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-       size_t len, used;
-       char *p;
-
-       len = sizeof(transport->tcp_xid) - transport->tcp_offset;
-       dprintk("RPC:       reading XID (%zu bytes)\n", len);
-       p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->tcp_offset += used;
-       if (used != len)
-               return;
-       transport->tcp_flags &= ~TCP_RCV_COPY_XID;
-       transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
-       transport->tcp_copied = 4;
-       dprintk("RPC:       reading %s XID %08x\n",
-                       (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
-                                                             : "request with",
-                       ntohl(transport->tcp_xid));
-       xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
-                                      struct xdr_skb_reader *desc)
-{
-       size_t len, used;
-       u32 offset;
-       char *p;
-
-       /*
-        * We want transport->tcp_offset to be 8 at the end of this routine
-        * (4 bytes for the xid and 4 bytes for the call/reply flag).
-        * When this function is called for the first time,
-        * transport->tcp_offset is 4 (after having already read the xid).
-        */
-       offset = transport->tcp_offset - sizeof(transport->tcp_xid);
-       len = sizeof(transport->tcp_calldir) - offset;
-       dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
-       p = ((char *) &transport->tcp_calldir) + offset;
-       used = xdr_skb_read_bits(desc, p, len);
-       transport->tcp_offset += used;
-       if (used != len)
-               return;
-       transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
-       /*
-        * We don't yet have the XDR buffer, so we will write the calldir
-        * out after we get the buffer from the 'struct rpc_rqst'
-        */
-       switch (ntohl(transport->tcp_calldir)) {
-       case RPC_REPLY:
-               transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
-               transport->tcp_flags |= TCP_RCV_COPY_DATA;
-               transport->tcp_flags |= TCP_RPC_REPLY;
-               break;
-       case RPC_CALL:
-               transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
-               transport->tcp_flags |= TCP_RCV_COPY_DATA;
-               transport->tcp_flags &= ~TCP_RPC_REPLY;
-               break;
-       default:
-               dprintk("RPC:       invalid request message type\n");
-               xs_tcp_force_close(&transport->xprt);
-       }
-       xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
-                                    struct xdr_skb_reader *desc,
-                                    struct rpc_rqst *req)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct xdr_buf *rcvbuf;
-       size_t len;
-       ssize_t r;
-
-       rcvbuf = &req->rq_private_buf;
-
-       if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
-               /*
-                * Save the RPC direction in the XDR buffer
-                */
-               memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
-                       &transport->tcp_calldir,
-                       sizeof(transport->tcp_calldir));
-               transport->tcp_copied += sizeof(transport->tcp_calldir);
-               transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
-       }
-
-       len = desc->count;
-       if (len > transport->tcp_reclen - transport->tcp_offset)
-               desc->count = transport->tcp_reclen - transport->tcp_offset;
-       r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
-                                         desc, xdr_skb_read_bits);
-
-       if (desc->count) {
-               /* Error when copying to the receive buffer,
-                * usually because we weren't able to allocate
-                * additional buffer pages. All we can do now
-                * is turn off TCP_RCV_COPY_DATA, so the request
-                * will not receive any additional updates,
-                * and time out.
-                * Any remaining data from this record will
-                * be discarded.
-                */
-               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-               dprintk("RPC:       XID %08x truncated request\n",
-                               ntohl(transport->tcp_xid));
-               dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
-                               "tcp_offset = %u, tcp_reclen = %u\n",
-                               xprt, transport->tcp_copied,
-                               transport->tcp_offset, transport->tcp_reclen);
-               return;
-       }
-
-       transport->tcp_copied += r;
-       transport->tcp_offset += r;
-       desc->count = len - r;
-
-       dprintk("RPC:       XID %08x read %zd bytes\n",
-                       ntohl(transport->tcp_xid), r);
-       dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
-                       "tcp_reclen = %u\n", xprt, transport->tcp_copied,
-                       transport->tcp_offset, transport->tcp_reclen);
-
-       if (transport->tcp_copied == req->rq_private_buf.buflen)
-               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-       else if (transport->tcp_offset == transport->tcp_reclen) {
-               if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
-                       transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-       }
-}
-
-/*
- * Finds the request corresponding to the RPC xid and invokes the common
- * tcp read code to read the data.
- */
-static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
-                                   struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
-
-       dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
-
-       /* Find and lock the request corresponding to this xid */
-       spin_lock(&xprt->recv_lock);
-       req = xprt_lookup_rqst(xprt, transport->tcp_xid);
-       if (!req) {
-               dprintk("RPC:       XID %08x request not found!\n",
-                               ntohl(transport->tcp_xid));
-               spin_unlock(&xprt->recv_lock);
-               return -1;
-       }
-       xprt_pin_rqst(req);
-       spin_unlock(&xprt->recv_lock);
-
-       xs_tcp_read_common(xprt, desc, req);
-
-       spin_lock(&xprt->recv_lock);
-       if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
-               xprt_complete_rqst(req->rq_task, transport->tcp_copied);
-       xprt_unpin_rqst(req);
-       spin_unlock(&xprt->recv_lock);
-       return 0;
-}
-
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-/*
- * Obtains an rpc_rqst previously allocated and invokes the common
- * tcp read code to read the data.  The result is placed in the callback
- * queue.
- * If we're unable to obtain the rpc_rqst we schedule the closing of the
- * connection and return -1.
- */
-static int xs_tcp_read_callback(struct rpc_xprt *xprt,
-                                      struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct rpc_rqst *req;
-
-       /* Look up the request corresponding to the given XID */
-       req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
-       if (req == NULL) {
-               printk(KERN_WARNING "Callback slot table overflowed\n");
-               xprt_force_disconnect(xprt);
-               return -1;
-       }
-
-       dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
-       xs_tcp_read_common(xprt, desc, req);
-
-       if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
-               xprt_complete_bc_request(req, transport->tcp_copied);
-
-       return 0;
-}
-
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-                                       struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-
-       return (transport->tcp_flags & TCP_RPC_REPLY) ?
-               xs_tcp_read_reply(xprt, desc) :
-               xs_tcp_read_callback(xprt, desc);
-}
-
 static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 {
        int ret;
@@ -1435,145 +1420,8 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
 {
        return PAGE_SIZE;
 }
-#else
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-                                       struct xdr_skb_reader *desc)
-{
-       return xs_tcp_read_reply(xprt, desc);
-}
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-/*
- * Read data off the transport.  This can be either an RPC_CALL or an
- * RPC_REPLY.  Relay the processing to helper functions.
- */
-static void xs_tcp_read_data(struct rpc_xprt *xprt,
-                                   struct xdr_skb_reader *desc)
-{
-       struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-
-       if (_xs_tcp_read_data(xprt, desc) == 0)
-               xs_tcp_check_fraghdr(transport);
-       else {
-               /*
-                * The transport_lock protects the request handling.
-                * There's no need to hold it to update the tcp_flags.
-                */
-               transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
-       }
-}
-
-static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-       size_t len;
-
-       len = transport->tcp_reclen - transport->tcp_offset;
-       if (len > desc->count)
-               len = desc->count;
-       desc->count -= len;
-       desc->offset += len;
-       transport->tcp_offset += len;
-       dprintk("RPC:       discarded %zu bytes\n", len);
-       xs_tcp_check_fraghdr(transport);
-}
-
-static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
-{
-       struct rpc_xprt *xprt = rd_desc->arg.data;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-       struct xdr_skb_reader desc = {
-               .skb    = skb,
-               .offset = offset,
-               .count  = len,
-       };
-       size_t ret;
-
-       dprintk("RPC:       xs_tcp_data_recv started\n");
-       do {
-               trace_xs_tcp_data_recv(transport);
-               /* Read in a new fragment marker if necessary */
-               /* Can we ever really expect to get completely empty fragments? */
-               if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
-                       xs_tcp_read_fraghdr(xprt, &desc);
-                       continue;
-               }
-               /* Read in the xid if necessary */
-               if (transport->tcp_flags & TCP_RCV_COPY_XID) {
-                       xs_tcp_read_xid(transport, &desc);
-                       continue;
-               }
-               /* Read in the call/reply flag */
-               if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
-                       xs_tcp_read_calldir(transport, &desc);
-                       continue;
-               }
-               /* Read in the request data */
-               if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
-                       xs_tcp_read_data(xprt, &desc);
-                       continue;
-               }
-               /* Skip over any trailing bytes on short reads */
-               xs_tcp_read_discard(transport, &desc);
-       } while (desc.count);
-       ret = len - desc.count;
-       if (ret < rd_desc->count)
-               rd_desc->count -= ret;
-       else
-               rd_desc->count = 0;
-       trace_xs_tcp_data_recv(transport);
-       dprintk("RPC:       xs_tcp_data_recv done\n");
-       return ret;
-}
-
-static void xs_tcp_data_receive(struct sock_xprt *transport)
-{
-       struct rpc_xprt *xprt = &transport->xprt;
-       struct sock *sk;
-       read_descriptor_t rd_desc = {
-               .arg.data = xprt,
-       };
-       unsigned long total = 0;
-       int read = 0;
-
-restart:
-       mutex_lock(&transport->recv_mutex);
-       sk = transport->inet;
-       if (sk == NULL)
-               goto out;
-
-       /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
-       for (;;) {
-               rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
-               lock_sock(sk);
-               read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-               if (rd_desc.count != 0 || read < 0) {
-                       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
-                       release_sock(sk);
-                       break;
-               }
-               release_sock(sk);
-               total += read;
-               if (need_resched()) {
-                       mutex_unlock(&transport->recv_mutex);
-                       cond_resched();
-                       goto restart;
-               }
-       }
-       if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-               queue_work(xprtiod_workqueue, &transport->recv_worker);
-out:
-       mutex_unlock(&transport->recv_mutex);
-       trace_xs_tcp_data_ready(xprt, read, total);
-}
-
-static void xs_tcp_data_receive_workfn(struct work_struct *work)
-{
-       struct sock_xprt *transport =
-               container_of(work, struct sock_xprt, recv_worker);
-       xs_tcp_data_receive(transport);
-}
-
 /**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
@@ -1600,17 +1448,13 @@ static void xs_tcp_state_change(struct sock *sk)
        case TCP_ESTABLISHED:
                spin_lock(&xprt->transport_lock);
                if (!xprt_test_and_set_connected(xprt)) {
-
-                       /* Reset TCP record info */
-                       transport->tcp_offset = 0;
-                       transport->tcp_reclen = 0;
-                       transport->tcp_copied = 0;
-                       transport->tcp_flags =
-                               TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
                        xprt->connect_cookie++;
                        clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
                        xprt_clear_connecting(xprt);
 
+                       xprt->stat.connect_count++;
+                       xprt->stat.connect_time += (long)jiffies -
+                                                  xprt->stat.connect_start;
                        xprt_wake_pending_tasks(xprt, -EAGAIN);
                }
                spin_unlock(&xprt->transport_lock);
@@ -1675,7 +1519,8 @@ static void xs_write_space(struct sock *sk)
        if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
                goto out;
 
-       xprt_write_space(xprt);
+       if (xprt_write_space(xprt))
+               sk->sk_write_pending--;
 out:
        rcu_read_unlock();
 }
@@ -1773,11 +1618,17 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 }
 
-static unsigned short xs_get_random_port(void)
+static int xs_get_random_port(void)
 {
-       unsigned short range = xprt_max_resvport - xprt_min_resvport + 1;
-       unsigned short rand = (unsigned short) prandom_u32() % range;
-       return rand + xprt_min_resvport;
+       unsigned short min = xprt_min_resvport, max = xprt_max_resvport;
+       unsigned short range;
+       unsigned short rand;
+
+       if (max < min)
+               return -EADDRINUSE;
+       range = max - min + 1;
+       rand = (unsigned short) prandom_u32() % range;
+       return rand + min;
 }
 
 /**
@@ -1833,9 +1684,9 @@ static void xs_set_srcport(struct sock_xprt *transport, struct socket *sock)
                transport->srcport = xs_sock_getport(sock);
 }
 
-static unsigned short xs_get_srcport(struct sock_xprt *transport)
+static int xs_get_srcport(struct sock_xprt *transport)
 {
-       unsigned short port = transport->srcport;
+       int port = transport->srcport;
 
        if (port == 0 && transport->xprt.resvport)
                port = xs_get_random_port();
@@ -1856,7 +1707,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
 {
        struct sockaddr_storage myaddr;
        int err, nloop = 0;
-       unsigned short port = xs_get_srcport(transport);
+       int port = xs_get_srcport(transport);
        unsigned short last;
 
        /*
@@ -1874,8 +1725,8 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
         * transport->xprt.resvport == 1) xs_get_srcport above will
         * ensure that port is non-zero and we will bind as needed.
         */
-       if (port == 0)
-               return 0;
+       if (port <= 0)
+               return port;
 
        memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
        do {
@@ -2028,9 +1879,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
-       /* Tell the socket layer to start connecting... */
-       xprt->stat.connect_count++;
-       xprt->stat.connect_start = jiffies;
+       xs_stream_reset_connect(transport);
+
        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
 }
 
@@ -2062,6 +1912,9 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
        case 0:
                dprintk("RPC:       xprt %p connected to %s\n",
                                xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
+               xprt->stat.connect_count++;
+               xprt->stat.connect_time += (long)jiffies -
+                                          xprt->stat.connect_start;
                xprt_set_connected(xprt);
        case -ENOBUFS:
                break;
@@ -2386,9 +2239,10 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
        xs_set_memalloc(xprt);
 
+       /* Reset TCP record info */
+       xs_stream_reset_connect(transport);
+
        /* Tell the socket layer to start connecting... */
-       xprt->stat.connect_count++;
-       xprt->stat.connect_start = jiffies;
        set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
        switch (ret) {
@@ -2561,7 +2415,7 @@ static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                        "%llu %llu %lu %llu %llu\n",
                        xprt->stat.bind_count,
                        xprt->stat.connect_count,
-                       xprt->stat.connect_time,
+                       xprt->stat.connect_time / HZ,
                        idle_time,
                        xprt->stat.sends,
                        xprt->stat.recvs,
@@ -2616,7 +2470,7 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                        transport->srcport,
                        xprt->stat.bind_count,
                        xprt->stat.connect_count,
-                       xprt->stat.connect_time,
+                       xprt->stat.connect_time / HZ,
                        idle_time,
                        xprt->stat.sends,
                        xprt->stat.recvs,
@@ -2704,9 +2558,8 @@ static int bc_sendto(struct rpc_rqst *req)
 /*
  * The send routine. Borrows from svc_send
  */
-static int bc_send_request(struct rpc_task *task)
+static int bc_send_request(struct rpc_rqst *req)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
        struct svc_xprt *xprt;
        int len;
 
@@ -2720,12 +2573,7 @@ static int bc_send_request(struct rpc_task *task)
         * Grab the mutex to serialize data as the connection is shared
         * with the fore channel
         */
-       if (!mutex_trylock(&xprt->xpt_mutex)) {
-               rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
-               if (!mutex_trylock(&xprt->xpt_mutex))
-                       return -EAGAIN;
-               rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
-       }
+       mutex_lock(&xprt->xpt_mutex);
        if (test_bit(XPT_DEAD, &xprt->xpt_flags))
                len = -ENOTCONN;
        else
@@ -2761,7 +2609,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
 
 static const struct rpc_xprt_ops xs_local_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
+       .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = xs_local_rpcbind,
@@ -2769,6 +2617,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
        .connect                = xs_local_connect,
        .buf_alloc              = rpc_malloc,
        .buf_free               = rpc_free,
+       .prepare_request        = xs_stream_prepare_request,
        .send_request           = xs_local_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
        .close                  = xs_close,
@@ -2803,14 +2652,15 @@ static const struct rpc_xprt_ops xs_udp_ops = {
 
 static const struct rpc_xprt_ops xs_tcp_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
-       .alloc_slot             = xprt_lock_and_alloc_slot,
+       .release_xprt           = xprt_release_xprt,
+       .alloc_slot             = xprt_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = rpcb_getport_async,
        .set_port               = xs_set_port,
        .connect                = xs_connect,
        .buf_alloc              = rpc_malloc,
        .buf_free               = rpc_free,
+       .prepare_request        = xs_stream_prepare_request,
        .send_request           = xs_tcp_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
        .close                  = xs_tcp_shutdown,
@@ -2952,9 +2802,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
        xprt->ops = &xs_local_ops;
        xprt->timeout = &xs_local_default_timeout;
 
-       INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
-       INIT_DELAYED_WORK(&transport->connect_worker,
-                       xs_dummy_setup_socket);
+       INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+       INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
 
        switch (sun->sun_family) {
        case AF_LOCAL:
@@ -3106,7 +2955,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
        xprt->connect_timeout = xprt->timeout->to_initval *
                (xprt->timeout->to_retries + 1);
 
-       INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
+       INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
        INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
 
        switch (addr->sa_family) {
@@ -3317,12 +3166,8 @@ static int param_set_uint_minmax(const char *val,
 
 static int param_set_portnr(const char *val, const struct kernel_param *kp)
 {
-       if (kp->arg == &xprt_min_resvport)
-               return param_set_uint_minmax(val, kp,
-                       RPC_MIN_RESVPORT,
-                       xprt_max_resvport);
        return param_set_uint_minmax(val, kp,
-                       xprt_min_resvport,
+                       RPC_MIN_RESVPORT,
                        RPC_MAX_RESVPORT);
 }