Merge tag 'nfs-for-4.16-2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 9 Feb 2018 22:55:30 +0000 (14:55 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 9 Feb 2018 22:55:30 +0000 (14:55 -0800)
Pull more NFS client updates from Trond Myklebust:
 "A few bugfixes and some small sunrpc latency/performance improvements
  before the merge window closes:

  Stable fixes:

   - fix an incorrect calculation of the RDMA send scatter gather
     element limit

   - fix an Oops when attempting to free resources after RDMA device
     removal

  Bugfixes:

   - SUNRPC: Ensure we always release the TCP socket in a timely fashion
     when the connection is shut down.

   - SUNRPC: Don't call __UDPX_INC_STATS() from a preemptible context

  Latency/Performance:

   - SUNRPC: Queue latency sensitive socket tasks to the less contended
     xprtiod queue

   - SUNRPC: Make the xprtiod workqueue unbounded.

   - SUNRPC: Make the rpciod workqueue unbounded"

* tag 'nfs-for-4.16-2' of git://git.linux-nfs.org/projects/trondmy/linux-nfs:
  SUNRPC: Don't call __UDPX_INC_STATS() from a preemptible context
  fix parallelism for rpc tasks
  Make the xprtiod workqueue unbounded.
  SUNRPC: Queue latency-sensitive socket tasks to xprtiod
  SUNRPC: Ensure we always close the socket after a connection shuts down
  xprtrdma: Fix BUG after a device removal
  xprtrdma: Fix calculation of ri_max_send_sges

include/linux/sunrpc/sched.h
net/sunrpc/sched.c
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtsock.c

index d96e74e114c06b18bf022da70f13f02c511faf31..592653becd914a4e9dc3f8fdbea47c2939c706f7 100644 (file)
@@ -229,6 +229,9 @@ void                rpc_sleep_on_priority(struct rpc_wait_queue *,
                                        struct rpc_task *,
                                        rpc_action action,
                                        int priority);
+void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
+               struct rpc_wait_queue *queue,
+               struct rpc_task *task);
 void           rpc_wake_up_queued_task(struct rpc_wait_queue *,
                                        struct rpc_task *);
 void           rpc_wake_up(struct rpc_wait_queue *);
index 896691afbb1a805b71a51aa600efa599e784f3c7..d9db2eab3a8df1dec09a0df3a0e5e6310e5fe91d 100644 (file)
@@ -458,6 +458,18 @@ static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct r
        rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
 }
 
+/*
+ * Wake up a task on a specific queue
+ */
+void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
+               struct rpc_wait_queue *queue,
+               struct rpc_task *task)
+{
+       spin_lock_bh(&queue->lock);
+       rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
+       spin_unlock_bh(&queue->lock);
+}
+
 /*
  * Wake up a task on a specific queue
  */
@@ -1092,12 +1104,12 @@ static int rpciod_start(void)
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
+       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
        if (!wq)
                goto out_failed;
        rpciod_workqueue = wq;
        /* Note: highpri because network receive is latency sensitive */
-       wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+       wq = alloc_workqueue("xprtiod", WQ_UNBOUND|WQ_MEM_RECLAIM|WQ_HIGHPRI, 0);
        if (!wq)
                goto free_rpciod;
        xprtiod_workqueue = wq;
index 2436fd1125fc608067dcc7a7bc52b9ebb690ae48..8f0ad4f268da58e2380361e72e9adcace18548dd 100644 (file)
@@ -517,7 +517,8 @@ void xprt_write_space(struct rpc_xprt *xprt)
        if (xprt->snd_task) {
                dprintk("RPC:       write space: waking waiting task on "
                                "xprt %p\n", xprt);
-               rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
+               rpc_wake_up_queued_task_on_wq(xprtiod_workqueue,
+                               &xprt->pending, xprt->snd_task);
        }
        spin_unlock_bh(&xprt->transport_lock);
 }
index 162e5dd82466b7d00e32e969547e56732b8c76e2..f0855a959a278dac5c8e6459cd94df244f87ee2c 100644 (file)
@@ -143,7 +143,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
        if (xdr->page_len) {
                remaining = xdr->page_len;
                offset = offset_in_page(xdr->page_base);
-               count = 0;
+               count = RPCRDMA_MIN_SEND_SGES;
                while (remaining) {
                        remaining -= min_t(unsigned int,
                                           PAGE_SIZE - offset, remaining);
index f4eb63e8e6892516e04eee8b6ebf1be487b9a689..e6f84a6434a049e41d83092a0764668a21416d4c 100644 (file)
@@ -505,7 +505,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
                return -ENOMEM;
        }
-       ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES;
+       ia->ri_max_send_sges = max_sge;
 
        if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
                dprintk("RPC:       %s: insufficient wqe's available\n",
@@ -1502,6 +1502,9 @@ __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
 static void
 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
 {
+       if (!rb)
+               return;
+
        if (!rpcrdma_regbuf_is_mapped(rb))
                return;
 
@@ -1517,9 +1520,6 @@ rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
 void
 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
 {
-       if (!rb)
-               return;
-
        rpcrdma_dma_unmap_regbuf(rb);
        kfree(rb);
 }
index 18803021f242ebecb5cde14ae892009e65edabf2..a6b8c1f8f92a815db5fcce32c78b4355dc308c03 100644 (file)
@@ -807,13 +807,6 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
        smp_mb__after_atomic();
 }
 
-static void xs_sock_mark_closed(struct rpc_xprt *xprt)
-{
-       xs_sock_reset_connection_flags(xprt);
-       /* Mark transport as closed and wake up all pending tasks */
-       xprt_disconnect_done(xprt);
-}
-
 /**
  * xs_error_report - callback to handle TCP socket state errors
  * @sk: socket
@@ -833,9 +826,6 @@ static void xs_error_report(struct sock *sk)
        err = -sk->sk_err;
        if (err == 0)
                goto out;
-       /* Is this a reset event? */
-       if (sk->sk_state == TCP_CLOSE)
-               xs_sock_mark_closed(xprt);
        dprintk("RPC:       xs_error_report client %p, error=%d...\n",
                        xprt, -err);
        trace_rpc_socket_error(xprt, sk->sk_socket, err);
@@ -1078,18 +1068,18 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 
        /* Suck it into the iovec, verify checksum if not done by hw. */
        if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
-               __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
                spin_lock(&xprt->recv_lock);
+               __UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
                goto out_unpin;
        }
 
-       __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 
        spin_lock_bh(&xprt->transport_lock);
        xprt_adjust_cwnd(xprt, task, copied);
        spin_unlock_bh(&xprt->transport_lock);
        spin_lock(&xprt->recv_lock);
        xprt_complete_rqst(task, copied);
+       __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 out_unpin:
        xprt_unpin_rqst(rovr);
  out_unlock:
@@ -1655,9 +1645,11 @@ static void xs_tcp_state_change(struct sock *sk)
                if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
                                        &transport->sock_state))
                        xprt_clear_connecting(xprt);
+               clear_bit(XPRT_CLOSING, &xprt->state);
                if (sk->sk_err)
                        xprt_wake_pending_tasks(xprt, -sk->sk_err);
-               xs_sock_mark_closed(xprt);
+               /* Trigger the socket release */
+               xs_tcp_force_close(xprt);
        }
  out:
        read_unlock_bh(&sk->sk_callback_lock);
@@ -2265,14 +2257,19 @@ static void xs_tcp_shutdown(struct rpc_xprt *xprt)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct socket *sock = transport->sock;
+       int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
 
        if (sock == NULL)
                return;
-       if (xprt_connected(xprt)) {
+       switch (skst) {
+       default:
                kernel_sock_shutdown(sock, SHUT_RDWR);
                trace_rpc_socket_shutdown(xprt, sock);
-       } else
+               break;
+       case TCP_CLOSE:
+       case TCP_TIME_WAIT:
                xs_reset_transport(transport);
+       }
 }
 
 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,