SUNRPC: Micro-optimise when the task is known not to be sleeping
[sfrench/cifs-2.6.git] / net / sunrpc / clnt.c
index d7ec6132c046ec409db057d6ace332ab620fa88b..498dd6ad5bc5799b62cf3aec3ef846c563fea505 100644 (file)
@@ -66,20 +66,19 @@ static void call_decode(struct rpc_task *task);
 static void    call_bind(struct rpc_task *task);
 static void    call_bind_status(struct rpc_task *task);
 static void    call_transmit(struct rpc_task *task);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-static void    call_bc_transmit(struct rpc_task *task);
-#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 static void    call_status(struct rpc_task *task);
 static void    call_transmit_status(struct rpc_task *task);
 static void    call_refresh(struct rpc_task *task);
 static void    call_refreshresult(struct rpc_task *task);
-static void    call_timeout(struct rpc_task *task);
 static void    call_connect(struct rpc_task *task);
 static void    call_connect_status(struct rpc_task *task);
 
-static __be32  *rpc_encode_header(struct rpc_task *task);
-static __be32  *rpc_verify_header(struct rpc_task *task);
+static int     rpc_encode_header(struct rpc_task *task,
+                                 struct xdr_stream *xdr);
+static int     rpc_decode_header(struct rpc_task *task,
+                                 struct xdr_stream *xdr);
 static int     rpc_ping(struct rpc_clnt *clnt);
+static void    rpc_check_timeout(struct rpc_task *task);
 
 static void rpc_register_client(struct rpc_clnt *clnt)
 {
@@ -834,9 +833,6 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
                if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
                        rovr->tk_flags |= RPC_TASK_KILLED;
                        rpc_exit(rovr, -EIO);
-                       if (RPC_IS_QUEUED(rovr))
-                               rpc_wake_up_queued_task(rovr->tk_waitqueue,
-                                                       rovr);
                }
        }
        spin_unlock(&clnt->cl_lock);
@@ -1131,6 +1127,8 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static void call_bc_encode(struct rpc_task *task);
+
 /**
  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
  * rpc_execute against it
@@ -1152,7 +1150,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
        task = rpc_new_task(&task_setup_data);
        xprt_init_bc_request(req, task);
 
-       task->tk_action = call_bc_transmit;
+       task->tk_action = call_bc_encode;
        atomic_inc(&task->tk_count);
        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
        rpc_execute(task);
@@ -1162,6 +1160,29 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
+/**
+ * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
+ * @req: RPC request to prepare
+ * @pages: vector of struct page pointers
+ * @base: offset in first page where receive should start, in bytes
+ * @len: expected size of the upper layer data payload, in bytes
+ * @hdrsize: expected size of upper layer reply header, in XDR words
+ *
+ */
+void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
+                            unsigned int base, unsigned int len,
+                            unsigned int hdrsize)
+{
+       /* Subtract one to force an extra word of buffer space for the
+        * payload's XDR pad to fall into the rcv_buf's tail iovec.
+        */
+       hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign - 1;
+
+       xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
+       trace_rpc_reply_pages(req);
+}
+EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
+
 void
 rpc_call_start(struct rpc_task *task)
 {
@@ -1519,6 +1540,7 @@ call_start(struct rpc_task *task)
        clnt->cl_stats->rpccnt++;
        task->tk_action = call_reserve;
        rpc_task_set_transport(task, clnt);
+       call_reserve(task);
 }
 
 /*
@@ -1532,6 +1554,9 @@ call_reserve(struct rpc_task *task)
        task->tk_status  = 0;
        task->tk_action  = call_reserveresult;
        xprt_reserve(task);
+       if (rpc_task_need_resched(task))
+               return;
+        call_reserveresult(task);
 }
 
 static void call_retry_reserve(struct rpc_task *task);
@@ -1554,6 +1579,7 @@ call_reserveresult(struct rpc_task *task)
        if (status >= 0) {
                if (task->tk_rqstp) {
                        task->tk_action = call_refresh;
+                       call_refresh(task);
                        return;
                }
 
@@ -1579,6 +1605,7 @@ call_reserveresult(struct rpc_task *task)
                /* fall through */
        case -EAGAIN:   /* woken up; retry */
                task->tk_action = call_retry_reserve;
+               call_retry_reserve(task);
                return;
        case -EIO:      /* probably a shutdown */
                break;
@@ -1601,6 +1628,9 @@ call_retry_reserve(struct rpc_task *task)
        task->tk_status  = 0;
        task->tk_action  = call_reserveresult;
        xprt_retry_reserve(task);
+       if (rpc_task_need_resched(task))
+               return;
+       call_reserveresult(task);
 }
 
 /*
@@ -1615,6 +1645,9 @@ call_refresh(struct rpc_task *task)
        task->tk_status = 0;
        task->tk_client->cl_stats->rpcauthrefresh++;
        rpcauth_refreshcred(task);
+       if (rpc_task_need_resched(task))
+               return;
+       call_refreshresult(task);
 }
 
 /*
@@ -1633,6 +1666,7 @@ call_refreshresult(struct rpc_task *task)
        case 0:
                if (rpcauth_uptodatecred(task)) {
                        task->tk_action = call_allocate;
+                       call_allocate(task);
                        return;
                }
                /* Use rate-limiting and a max number of retries if refresh
@@ -1651,6 +1685,7 @@ call_refreshresult(struct rpc_task *task)
                task->tk_cred_retry--;
                dprintk("RPC: %5u %s: retry refresh creds\n",
                                task->tk_pid, __func__);
+               call_refresh(task);
                return;
        }
        dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
@@ -1665,7 +1700,7 @@ call_refreshresult(struct rpc_task *task)
 static void
 call_allocate(struct rpc_task *task)
 {
-       unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
+       const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
        struct rpc_rqst *req = task->tk_rqstp;
        struct rpc_xprt *xprt = req->rq_xprt;
        const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
@@ -1676,8 +1711,10 @@ call_allocate(struct rpc_task *task)
        task->tk_status = 0;
        task->tk_action = call_encode;
 
-       if (req->rq_buffer)
+       if (req->rq_buffer) {
+               call_encode(task);
                return;
+       }
 
        if (proc->p_proc != 0) {
                BUG_ON(proc->p_arglen == 0);
@@ -1690,15 +1727,20 @@ call_allocate(struct rpc_task *task)
         * and reply headers, and convert both values
         * to byte sizes.
         */
-       req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
+       req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
+                          proc->p_arglen;
        req->rq_callsize <<= 2;
-       req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
+       req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + proc->p_replen;
        req->rq_rcvsize <<= 2;
 
        status = xprt->ops->buf_alloc(task);
        xprt_inject_disconnect(xprt);
-       if (status == 0)
+       if (status == 0) {
+               if (rpc_task_need_resched(task))
+                       return;
+               call_encode(task);
                return;
+       }
        if (status != -ENOMEM) {
                rpc_exit(task, status);
                return;
@@ -1728,10 +1770,7 @@ static void
 rpc_xdr_encode(struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       kxdreproc_t     encode;
-       __be32          *p;
-
-       dprint_status(task);
+       struct xdr_stream xdr;
 
        xdr_buf_init(&req->rq_snd_buf,
                     req->rq_buffer,
@@ -1740,18 +1779,13 @@ rpc_xdr_encode(struct rpc_task *task)
                     req->rq_rbuffer,
                     req->rq_rcvsize);
 
-       p = rpc_encode_header(task);
-       if (p == NULL)
+       req->rq_snd_buf.head[0].iov_len = 0;
+       xdr_init_encode(&xdr, &req->rq_snd_buf,
+                       req->rq_snd_buf.head[0].iov_base, req);
+       if (rpc_encode_header(task, &xdr))
                return;
 
-       encode = task->tk_msg.rpc_proc->p_encode;
-       if (encode == NULL)
-               return;
-
-       task->tk_status = rpcauth_wrap_req(task, encode, req, p,
-                       task->tk_msg.rpc_argp);
-       if (task->tk_status == 0)
-               xprt_request_prepare(req);
+       task->tk_status = rpcauth_wrap_req(task, &xdr);
 }
 
 /*
@@ -1762,6 +1796,7 @@ call_encode(struct rpc_task *task)
 {
        if (!rpc_task_need_encode(task))
                goto out;
+       dprint_status(task);
        /* Encode here so that rpcsec_gss can use correct sequence number. */
        rpc_xdr_encode(task);
        /* Did the encode result in an error condition? */
@@ -1779,6 +1814,8 @@ call_encode(struct rpc_task *task)
                        rpc_exit(task, task->tk_status);
                }
                return;
+       } else {
+               xprt_request_prepare(task->tk_rqstp);
        }
 
        /* Add task to reply queue before transmission to avoid races */
@@ -1787,6 +1824,25 @@ call_encode(struct rpc_task *task)
        xprt_request_enqueue_transmit(task);
 out:
        task->tk_action = call_bind;
+       call_bind(task);
+}
+
+/*
+ * Helpers to check if the task was already transmitted, and
+ * to take action when that is the case.
+ */
+static bool
+rpc_task_transmitted(struct rpc_task *task)
+{
+       return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+}
+
+static void
+rpc_task_handle_transmitted(struct rpc_task *task)
+{
+       xprt_end_transmit(task);
+       task->tk_action = call_transmit_status;
+       call_transmit_status(task);
 }
 
 /*
@@ -1797,14 +1853,22 @@ call_bind(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 
-       dprint_status(task);
+       if (rpc_task_transmitted(task)) {
+               rpc_task_handle_transmitted(task);
+               return;
+       }
 
-       task->tk_action = call_connect;
-       if (!xprt_bound(xprt)) {
-               task->tk_action = call_bind_status;
-               task->tk_timeout = xprt->bind_timeout;
-               xprt->ops->rpcbind(task);
+       if (xprt_bound(xprt)) {
+               task->tk_action = call_connect;
+               call_connect(task);
+               return;
        }
+
+       dprint_status(task);
+
+       task->tk_action = call_bind_status;
+       task->tk_timeout = xprt->bind_timeout;
+       xprt->ops->rpcbind(task);
 }
 
 /*
@@ -1815,10 +1879,16 @@ call_bind_status(struct rpc_task *task)
 {
        int status = -EIO;
 
+       if (rpc_task_transmitted(task)) {
+               rpc_task_handle_transmitted(task);
+               return;
+       }
+
        if (task->tk_status >= 0) {
                dprint_status(task);
                task->tk_status = 0;
                task->tk_action = call_connect;
+               call_connect(task);
                return;
        }
 
@@ -1882,7 +1952,8 @@ call_bind_status(struct rpc_task *task)
 
 retry_timeout:
        task->tk_status = 0;
-       task->tk_action = call_timeout;
+       task->tk_action = call_encode;
+       rpc_check_timeout(task);
 }
 
 /*
@@ -1893,21 +1964,29 @@ call_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 
+       if (rpc_task_transmitted(task)) {
+               rpc_task_handle_transmitted(task);
+               return;
+       }
+
+       if (xprt_connected(xprt)) {
+               task->tk_action = call_transmit;
+               call_transmit(task);
+               return;
+       }
+
        dprintk("RPC: %5u call_connect xprt %p %s connected\n",
                        task->tk_pid, xprt,
                        (xprt_connected(xprt) ? "is" : "is not"));
 
-       task->tk_action = call_transmit;
-       if (!xprt_connected(xprt)) {
-               task->tk_action = call_connect_status;
-               if (task->tk_status < 0)
-                       return;
-               if (task->tk_flags & RPC_TASK_NOCONNECT) {
-                       rpc_exit(task, -ENOTCONN);
-                       return;
-               }
-               xprt_connect(task);
+       task->tk_action = call_connect_status;
+       if (task->tk_status < 0)
+               return;
+       if (task->tk_flags & RPC_TASK_NOCONNECT) {
+               rpc_exit(task, -ENOTCONN);
+               return;
        }
+       xprt_connect(task);
 }
 
 /*
@@ -1919,10 +1998,8 @@ call_connect_status(struct rpc_task *task)
        struct rpc_clnt *clnt = task->tk_client;
        int status = task->tk_status;
 
-       /* Check if the task was already transmitted */
-       if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
-               xprt_end_transmit(task);
-               task->tk_action = call_transmit_status;
+       if (rpc_task_transmitted(task)) {
+               rpc_task_handle_transmitted(task);
                return;
        }
 
@@ -1937,8 +2014,7 @@ call_connect_status(struct rpc_task *task)
                        break;
                if (clnt->cl_autobind) {
                        rpc_force_rebind(clnt);
-                       task->tk_action = call_bind;
-                       return;
+                       goto out_retry;
                }
                /* fall through */
        case -ECONNRESET:
@@ -1958,16 +2034,20 @@ call_connect_status(struct rpc_task *task)
                /* fall through */
        case -ENOTCONN:
        case -EAGAIN:
-               /* Check for timeouts before looping back to call_bind */
        case -ETIMEDOUT:
-               task->tk_action = call_timeout;
-               return;
+               goto out_retry;
        case 0:
                clnt->cl_stats->netreconn++;
                task->tk_action = call_transmit;
+               call_transmit(task);
                return;
        }
        rpc_exit(task, status);
+       return;
+out_retry:
+       /* Check for timeouts before looping back to call_bind */
+       task->tk_action = call_bind;
+       rpc_check_timeout(task);
 }
 
 /*
@@ -1976,16 +2056,28 @@ call_connect_status(struct rpc_task *task)
 static void
 call_transmit(struct rpc_task *task)
 {
+       if (rpc_task_transmitted(task)) {
+               rpc_task_handle_transmitted(task);
+               return;
+       }
+
        dprint_status(task);
 
+       task->tk_action = call_transmit_status;
+       if (!xprt_prepare_transmit(task))
+               return;
        task->tk_status = 0;
        if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
-               if (!xprt_prepare_transmit(task))
+               if (!xprt_connected(task->tk_xprt)) {
+                       task->tk_status = -ENOTCONN;
                        return;
+               }
                xprt_transmit(task);
        }
-       task->tk_action = call_transmit_status;
        xprt_end_transmit(task);
+       if (rpc_task_need_resched(task))
+               return;
+       call_transmit_status(task);
 }
 
 /*
@@ -2000,8 +2092,12 @@ call_transmit_status(struct rpc_task *task)
         * Common case: success.  Force the compiler to put this
         * test first.
         */
-       if (task->tk_status == 0) {
-               xprt_request_wait_receive(task);
+       if (rpc_task_transmitted(task)) {
+               if (task->tk_status == 0)
+                       xprt_request_wait_receive(task);
+               if (rpc_task_need_resched(task))
+                       return;
+               call_status(task);
                return;
        }
 
@@ -2038,7 +2134,7 @@ call_transmit_status(struct rpc_task *task)
                                trace_xprt_ping(task->tk_xprt,
                                                task->tk_status);
                        rpc_exit(task, task->tk_status);
-                       break;
+                       return;
                }
                /* fall through */
        case -ECONNRESET:
@@ -2046,11 +2142,25 @@ call_transmit_status(struct rpc_task *task)
        case -EADDRINUSE:
        case -ENOTCONN:
        case -EPIPE:
+               task->tk_action = call_bind;
+               task->tk_status = 0;
                break;
        }
+       rpc_check_timeout(task);
 }
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static void call_bc_transmit(struct rpc_task *task);
+static void call_bc_transmit_status(struct rpc_task *task);
+
+static void
+call_bc_encode(struct rpc_task *task)
+{
+       xprt_request_enqueue_transmit(task);
+       task->tk_action = call_bc_transmit;
+       call_bc_transmit(task);
+}
+
 /*
  * 5b. Send the backchannel RPC reply.  On error, drop the reply.  In
  * addition, disconnect on connectivity errors.
@@ -2058,26 +2168,23 @@ call_transmit_status(struct rpc_task *task)
 static void
 call_bc_transmit(struct rpc_task *task)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
-
-       if (rpc_task_need_encode(task))
-               xprt_request_enqueue_transmit(task);
-       if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
-               goto out_wakeup;
-
-       if (!xprt_prepare_transmit(task))
-               goto out_retry;
-
-       if (task->tk_status < 0) {
-               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
-                       "error: %d\n", task->tk_status);
-               goto out_done;
+       task->tk_action = call_bc_transmit_status;
+       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
+               if (!xprt_prepare_transmit(task))
+                       return;
+               task->tk_status = 0;
+               xprt_transmit(task);
        }
+       xprt_end_transmit(task);
+}
 
-       xprt_transmit(task);
+static void
+call_bc_transmit_status(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
 
-       xprt_end_transmit(task);
        dprint_status(task);
+
        switch (task->tk_status) {
        case 0:
                /* Success */
@@ -2091,8 +2198,14 @@ call_bc_transmit(struct rpc_task *task)
        case -ENOTCONN:
        case -EPIPE:
                break;
+       case -ENOBUFS:
+               rpc_delay(task, HZ>>2);
+               /* fall through */
+       case -EBADSLT:
        case -EAGAIN:
-               goto out_retry;
+               task->tk_status = 0;
+               task->tk_action = call_bc_transmit;
+               return;
        case -ETIMEDOUT:
                /*
                 * Problem reaching the server.  Disconnect and let the
@@ -2111,18 +2224,11 @@ call_bc_transmit(struct rpc_task *task)
                 * We were unable to reply and will have to drop the
                 * request.  The server should reconnect and retransmit.
                 */
-               WARN_ON_ONCE(task->tk_status == -EAGAIN);
                printk(KERN_NOTICE "RPC: Could not send backchannel reply "
                        "error: %d\n", task->tk_status);
                break;
        }
-out_wakeup:
-       rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
-out_done:
        task->tk_action = rpc_exit_task;
-       return;
-out_retry:
-       task->tk_status = 0;
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
@@ -2143,6 +2249,7 @@ call_status(struct rpc_task *task)
        status = task->tk_status;
        if (status >= 0) {
                task->tk_action = call_decode;
+               call_decode(task);
                return;
        }
 
@@ -2154,10 +2261,8 @@ call_status(struct rpc_task *task)
        case -EHOSTUNREACH:
        case -ENETUNREACH:
        case -EPERM:
-               if (RPC_IS_SOFTCONN(task)) {
-                       rpc_exit(task, status);
-                       break;
-               }
+               if (RPC_IS_SOFTCONN(task))
+                       goto out_exit;
                /*
                 * Delay any retries for 3 seconds, then handle as if it
                 * were a timeout.
@@ -2165,7 +2270,6 @@ call_status(struct rpc_task *task)
                rpc_delay(task, 3*HZ);
                /* fall through */
        case -ETIMEDOUT:
-               task->tk_action = call_timeout;
                break;
        case -ECONNREFUSED:
        case -ECONNRESET:
@@ -2178,34 +2282,30 @@ call_status(struct rpc_task *task)
        case -EPIPE:
        case -ENOTCONN:
        case -EAGAIN:
-               task->tk_action = call_encode;
                break;
        case -EIO:
                /* shutdown or soft timeout */
-               rpc_exit(task, status);
-               break;
+               goto out_exit;
        default:
                if (clnt->cl_chatty)
                        printk("%s: RPC call returned error %d\n",
                               clnt->cl_program->name, -status);
-               rpc_exit(task, status);
+               goto out_exit;
        }
+       task->tk_action = call_encode;
+       rpc_check_timeout(task);
+       return;
+out_exit:
+       rpc_exit(task, status);
 }
 
-/*
- * 6a. Handle RPC timeout
- *     We do not release the request slot, so we keep using the
- *     same XID for all retransmits.
- */
 static void
-call_timeout(struct rpc_task *task)
+rpc_check_timeout(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
 
-       if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
-               dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
-               goto retry;
-       }
+       if (xprt_adjust_timeout(task->tk_rqstp) == 0)
+               return;
 
        dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
        task->tk_timeouts++;
@@ -2241,10 +2341,6 @@ call_timeout(struct rpc_task *task)
         * event? RFC2203 requires the server to drop all such requests.
         */
        rpcauth_invalcred(task);
-
-retry:
-       task->tk_action = call_encode;
-       task->tk_status = 0;
 }
 
 /*
@@ -2255,12 +2351,11 @@ call_decode(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
-       kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
-       __be32          *p;
+       struct xdr_stream xdr;
 
        dprint_status(task);
 
-       if (!decode) {
+       if (!task->tk_msg.rpc_proc->p_decode) {
                task->tk_action = rpc_exit_task;
                return;
        }
@@ -2285,223 +2380,195 @@ call_decode(struct rpc_task *task)
        WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
                                sizeof(req->rq_rcv_buf)) != 0);
 
-       if (req->rq_rcv_buf.len < 12) {
-               if (!RPC_IS_SOFT(task)) {
-                       task->tk_action = call_encode;
-                       goto out_retry;
-               }
-               dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
-                               clnt->cl_program->name, task->tk_status);
-               task->tk_action = call_timeout;
+       if (req->rq_rcv_buf.len < 12)
                goto out_retry;
-       }
 
-       p = rpc_verify_header(task);
-       if (IS_ERR(p)) {
-               if (p == ERR_PTR(-EAGAIN))
-                       goto out_retry;
+       xdr_init_decode(&xdr, &req->rq_rcv_buf,
+                       req->rq_rcv_buf.head[0].iov_base, req);
+       switch (rpc_decode_header(task, &xdr)) {
+       case 0:
+               task->tk_action = rpc_exit_task;
+               task->tk_status = rpcauth_unwrap_resp(task, &xdr);
+               dprintk("RPC: %5u %s result %d\n",
+                       task->tk_pid, __func__, task->tk_status);
                return;
-       }
-       task->tk_action = rpc_exit_task;
-
-       task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
-                                             task->tk_msg.rpc_resp);
-
-       dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
-                       task->tk_status);
-       return;
+       case -EAGAIN:
 out_retry:
-       task->tk_status = 0;
-       /* Note: rpc_verify_header() may have freed the RPC slot */
-       if (task->tk_rqstp == req) {
-               xdr_free_bvec(&req->rq_rcv_buf);
-               req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
-               if (task->tk_client->cl_discrtry)
-                       xprt_conditional_disconnect(req->rq_xprt,
-                                       req->rq_connect_cookie);
+               task->tk_status = 0;
+               /* Note: rpc_decode_header() may have freed the RPC slot */
+               if (task->tk_rqstp == req) {
+                       xdr_free_bvec(&req->rq_rcv_buf);
+                       req->rq_reply_bytes_recvd = 0;
+                       req->rq_rcv_buf.len = 0;
+                       if (task->tk_client->cl_discrtry)
+                               xprt_conditional_disconnect(req->rq_xprt,
+                                                           req->rq_connect_cookie);
+               }
+               task->tk_action = call_encode;
+               rpc_check_timeout(task);
        }
 }
 
-static __be32 *
-rpc_encode_header(struct rpc_task *task)
+static int
+rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
 {
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
-       __be32          *p = req->rq_svec[0].iov_base;
-
-       /* FIXME: check buffer size? */
-
-       p = xprt_skip_transport_header(req->rq_xprt, p);
-       *p++ = req->rq_xid;             /* XID */
-       *p++ = htonl(RPC_CALL);         /* CALL */
-       *p++ = htonl(RPC_VERSION);      /* RPC version */
-       *p++ = htonl(clnt->cl_prog);    /* program number */
-       *p++ = htonl(clnt->cl_vers);    /* program version */
-       *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
-       p = rpcauth_marshcred(task, p);
-       if (p)
-               req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
-       return p;
+       __be32 *p;
+       int error;
+
+       error = -EMSGSIZE;
+       p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
+       if (!p)
+               goto out_fail;
+       *p++ = req->rq_xid;
+       *p++ = rpc_call;
+       *p++ = cpu_to_be32(RPC_VERSION);
+       *p++ = cpu_to_be32(clnt->cl_prog);
+       *p++ = cpu_to_be32(clnt->cl_vers);
+       *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
+
+       error = rpcauth_marshcred(task, xdr);
+       if (error < 0)
+               goto out_fail;
+       return 0;
+out_fail:
+       trace_rpc_bad_callhdr(task);
+       rpc_exit(task, error);
+       return error;
 }
 
-static __be32 *
-rpc_verify_header(struct rpc_task *task)
+static noinline int
+rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
 {
        struct rpc_clnt *clnt = task->tk_client;
-       struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
-       int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
-       __be32  *p = iov->iov_base;
-       u32 n;
        int error = -EACCES;
+       __be32 *p;
 
-       if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
-               /* RFC-1014 says that the representation of XDR data must be a
-                * multiple of four bytes
-                * - if it isn't pointer subtraction in the NFS client may give
-                *   undefined results
-                */
-               dprintk("RPC: %5u %s: XDR representation not a multiple of"
-                      " 4 bytes: 0x%x\n", task->tk_pid, __func__,
-                      task->tk_rqstp->rq_rcv_buf.len);
-               error = -EIO;
-               goto out_err;
-       }
-       if ((len -= 3) < 0)
-               goto out_overflow;
-
-       p += 1; /* skip XID */
-       if ((n = ntohl(*p++)) != RPC_REPLY) {
-               dprintk("RPC: %5u %s: not an RPC reply: %x\n",
-                       task->tk_pid, __func__, n);
-               error = -EIO;
-               goto out_garbage;
-       }
-
-       if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
-               if (--len < 0)
-                       goto out_overflow;
-               switch ((n = ntohl(*p++))) {
-               case RPC_AUTH_ERROR:
-                       break;
-               case RPC_MISMATCH:
-                       dprintk("RPC: %5u %s: RPC call version mismatch!\n",
-                               task->tk_pid, __func__);
-                       error = -EPROTONOSUPPORT;
-                       goto out_err;
-               default:
-                       dprintk("RPC: %5u %s: RPC call rejected, "
-                               "unknown error: %x\n",
-                               task->tk_pid, __func__, n);
-                       error = -EIO;
-                       goto out_err;
-               }
-               if (--len < 0)
-                       goto out_overflow;
-               switch ((n = ntohl(*p++))) {
-               case RPC_AUTH_REJECTEDCRED:
-               case RPC_AUTH_REJECTEDVERF:
-               case RPCSEC_GSS_CREDPROBLEM:
-               case RPCSEC_GSS_CTXPROBLEM:
-                       if (!task->tk_cred_retry)
-                               break;
-                       task->tk_cred_retry--;
-                       dprintk("RPC: %5u %s: retry stale creds\n",
-                                       task->tk_pid, __func__);
-                       rpcauth_invalcred(task);
-                       /* Ensure we obtain a new XID! */
-                       xprt_release(task);
-                       task->tk_action = call_reserve;
-                       goto out_retry;
-               case RPC_AUTH_BADCRED:
-               case RPC_AUTH_BADVERF:
-                       /* possibly garbled cred/verf? */
-                       if (!task->tk_garb_retry)
-                               break;
-                       task->tk_garb_retry--;
-                       dprintk("RPC: %5u %s: retry garbled creds\n",
-                                       task->tk_pid, __func__);
-                       task->tk_action = call_encode;
-                       goto out_retry;
-               case RPC_AUTH_TOOWEAK:
-                       printk(KERN_NOTICE "RPC: server %s requires stronger "
-                              "authentication.\n",
-                              task->tk_xprt->servername);
-                       break;
-               default:
-                       dprintk("RPC: %5u %s: unknown auth error: %x\n",
-                                       task->tk_pid, __func__, n);
-                       error = -EIO;
-               }
-               dprintk("RPC: %5u %s: call rejected %d\n",
-                               task->tk_pid, __func__, n);
-               goto out_err;
-       }
-       p = rpcauth_checkverf(task, p);
-       if (IS_ERR(p)) {
-               error = PTR_ERR(p);
-               dprintk("RPC: %5u %s: auth check failed with %d\n",
-                               task->tk_pid, __func__, error);
-               goto out_garbage;               /* bad verifier, retry */
-       }
-       len = p - (__be32 *)iov->iov_base - 1;
-       if (len < 0)
-               goto out_overflow;
-       switch ((n = ntohl(*p++))) {
-       case RPC_SUCCESS:
-               return p;
-       case RPC_PROG_UNAVAIL:
-               dprintk("RPC: %5u %s: program %u is unsupported "
-                               "by server %s\n", task->tk_pid, __func__,
-                               (unsigned int)clnt->cl_prog,
-                               task->tk_xprt->servername);
+       /* RFC-1014 says that the representation of XDR data must be a
+        * multiple of four bytes
+        * - if it isn't pointer subtraction in the NFS client may give
+        *   undefined results
+        */
+       if (task->tk_rqstp->rq_rcv_buf.len & 3)
+               goto out_badlen;
+
+       p = xdr_inline_decode(xdr, 3 * sizeof(*p));
+       if (!p)
+               goto out_unparsable;
+       p++;    /* skip XID */
+       if (*p++ != rpc_reply)
+               goto out_unparsable;
+       if (*p++ != rpc_msg_accepted)
+               goto out_msg_denied;
+
+       error = rpcauth_checkverf(task, xdr);
+       if (error)
+               goto out_verifier;
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (!p)
+               goto out_unparsable;
+       switch (*p) {
+       case rpc_success:
+               return 0;
+       case rpc_prog_unavail:
+               trace_rpc__prog_unavail(task);
                error = -EPFNOSUPPORT;
                goto out_err;
-       case RPC_PROG_MISMATCH:
-               dprintk("RPC: %5u %s: program %u, version %u unsupported "
-                               "by server %s\n", task->tk_pid, __func__,
-                               (unsigned int)clnt->cl_prog,
-                               (unsigned int)clnt->cl_vers,
-                               task->tk_xprt->servername);
+       case rpc_prog_mismatch:
+               trace_rpc__prog_mismatch(task);
                error = -EPROTONOSUPPORT;
                goto out_err;
-       case RPC_PROC_UNAVAIL:
-               dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
-                               "version %u on server %s\n",
-                               task->tk_pid, __func__,
-                               rpc_proc_name(task),
-                               clnt->cl_prog, clnt->cl_vers,
-                               task->tk_xprt->servername);
+       case rpc_proc_unavail:
+               trace_rpc__proc_unavail(task);
                error = -EOPNOTSUPP;
                goto out_err;
-       case RPC_GARBAGE_ARGS:
-               dprintk("RPC: %5u %s: server saw garbage\n",
-                               task->tk_pid, __func__);
-               break;                  /* retry */
+       case rpc_garbage_args:
+               trace_rpc__garbage_args(task);
+               break;
        default:
-               dprintk("RPC: %5u %s: server accept status: %x\n",
-                               task->tk_pid, __func__, n);
-               /* Also retry */
+               trace_rpc__unparsable(task);
        }
 
 out_garbage:
        clnt->cl_stats->rpcgarbage++;
        if (task->tk_garb_retry) {
                task->tk_garb_retry--;
-               dprintk("RPC: %5u %s: retrying\n",
-                               task->tk_pid, __func__);
                task->tk_action = call_encode;
-out_retry:
-               return ERR_PTR(-EAGAIN);
+               return -EAGAIN;
        }
 out_err:
        rpc_exit(task, error);
-       dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
-                       __func__, error);
-       return ERR_PTR(error);
-out_overflow:
-       dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
-                       __func__);
+       return error;
+
+out_badlen:
+       trace_rpc__unparsable(task);
+       error = -EIO;
+       goto out_err;
+
+out_unparsable:
+       trace_rpc__unparsable(task);
+       error = -EIO;
+       goto out_garbage;
+
+out_verifier:
+       trace_rpc_bad_verifier(task);
        goto out_garbage;
+
+out_msg_denied:
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (!p)
+               goto out_unparsable;
+       switch (*p++) {
+       case rpc_auth_error:
+               break;
+       case rpc_mismatch:
+               trace_rpc__mismatch(task);
+               error = -EPROTONOSUPPORT;
+               goto out_err;
+       default:
+               trace_rpc__unparsable(task);
+               error = -EIO;
+               goto out_err;
+       }
+
+       p = xdr_inline_decode(xdr, sizeof(*p));
+       if (!p)
+               goto out_unparsable;
+       switch (*p++) {
+       case rpc_autherr_rejectedcred:
+       case rpc_autherr_rejectedverf:
+       case rpcsec_gsserr_credproblem:
+       case rpcsec_gsserr_ctxproblem:
+               if (!task->tk_cred_retry)
+                       break;
+               task->tk_cred_retry--;
+               trace_rpc__stale_creds(task);
+               rpcauth_invalcred(task);
+               /* Ensure we obtain a new XID! */
+               xprt_release(task);
+               task->tk_action = call_reserve;
+               return -EAGAIN;
+       case rpc_autherr_badcred:
+       case rpc_autherr_badverf:
+               /* possibly garbled cred/verf? */
+               if (!task->tk_garb_retry)
+                       break;
+               task->tk_garb_retry--;
+               trace_rpc__bad_creds(task);
+               task->tk_action = call_encode;
+               return -EAGAIN;
+       case rpc_autherr_tooweak:
+               trace_rpc__auth_tooweak(task);
+               pr_warn("RPC: server %s requires stronger authentication.\n",
+                       task->tk_xprt->servername);
+               break;
+       default:
+               trace_rpc__unparsable(task);
+               error = -EIO;
+       }
+       goto out_err;
 }
 
 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,