Merge tag 'nfsd-4.15' of git://linux-nfs.org/~bfields/linux
[sfrench/cifs-2.6.git] / net / sunrpc / svc_xprt.c
index 71de77bd44236dee6bd7ea1e86b8e317aee65060..e8e0831229cfcce48b2d6802493e80a429aa108b 100644 (file)
@@ -250,9 +250,9 @@ void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new)
        svc_xprt_received(new);
 }
 
-int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
-                   struct net *net, const int family,
-                   const unsigned short port, int flags)
+static int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
+                           struct net *net, const int family,
+                           const unsigned short port, int flags)
 {
        struct svc_xprt_class *xcl;
 
@@ -380,7 +380,6 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
        struct svc_pool *pool;
        struct svc_rqst *rqstp = NULL;
        int cpu;
-       bool queued = false;
 
        if (!svc_xprt_has_something_to_do(xprt))
                goto out;
@@ -401,58 +400,25 @@ void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 
        atomic_long_inc(&pool->sp_stats.packets);
 
-redo_search:
+       dprintk("svc: transport %p put into queue\n", xprt);
+       spin_lock_bh(&pool->sp_lock);
+       list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
+       pool->sp_stats.sockets_queued++;
+       spin_unlock_bh(&pool->sp_lock);
+
        /* find a thread for this xprt */
        rcu_read_lock();
        list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
-               /* Do a lockless check first */
-               if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+               if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
                        continue;
-
-               /*
-                * Once the xprt has been queued, it can only be dequeued by
-                * the task that intends to service it. All we can do at that
-                * point is to try to wake this thread back up so that it can
-                * do so.
-                */
-               if (!queued) {
-                       spin_lock_bh(&rqstp->rq_lock);
-                       if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
-                               /* already busy, move on... */
-                               spin_unlock_bh(&rqstp->rq_lock);
-                               continue;
-                       }
-
-                       /* this one will do */
-                       rqstp->rq_xprt = xprt;
-                       svc_xprt_get(xprt);
-                       spin_unlock_bh(&rqstp->rq_lock);
-               }
-               rcu_read_unlock();
-
                atomic_long_inc(&pool->sp_stats.threads_woken);
                wake_up_process(rqstp->rq_task);
-               put_cpu();
-               goto out;
-       }
-       rcu_read_unlock();
-
-       /*
-        * We didn't find an idle thread to use, so we need to queue the xprt.
-        * Do so and then search again. If we find one, we can't hook this one
-        * up to it directly but we can wake the thread up in the hopes that it
-        * will pick it up once it searches for a xprt to service.
-        */
-       if (!queued) {
-               queued = true;
-               dprintk("svc: transport %p put into queue\n", xprt);
-               spin_lock_bh(&pool->sp_lock);
-               list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
-               pool->sp_stats.sockets_queued++;
-               spin_unlock_bh(&pool->sp_lock);
-               goto redo_search;
+               goto out_unlock;
        }
+       set_bit(SP_CONGESTED, &pool->sp_flags);
        rqstp = NULL;
+out_unlock:
+       rcu_read_unlock();
        put_cpu();
 out:
        trace_svc_xprt_do_enqueue(xprt, rqstp);
@@ -721,38 +687,25 @@ rqst_should_sleep(struct svc_rqst *rqstp)
 
 static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 {
-       struct svc_xprt *xprt;
        struct svc_pool         *pool = rqstp->rq_pool;
        long                    time_left = 0;
 
        /* rq_xprt should be clear on entry */
        WARN_ON_ONCE(rqstp->rq_xprt);
 
-       /* Normally we will wait up to 5 seconds for any required
-        * cache information to be provided.
-        */
-       rqstp->rq_chandle.thread_wait = 5*HZ;
-
-       xprt = svc_xprt_dequeue(pool);
-       if (xprt) {
-               rqstp->rq_xprt = xprt;
-
-               /* As there is a shortage of threads and this request
-                * had to be queued, don't allow the thread to wait so
-                * long for cache updates.
-                */
-               rqstp->rq_chandle.thread_wait = 1*HZ;
-               clear_bit(SP_TASK_PENDING, &pool->sp_flags);
-               return xprt;
-       }
+       rqstp->rq_xprt = svc_xprt_dequeue(pool);
+       if (rqstp->rq_xprt)
+               goto out_found;
 
        /*
         * We have to be able to interrupt this wait
         * to bring down the daemons ...
         */
        set_current_state(TASK_INTERRUPTIBLE);
+       smp_mb__before_atomic();
+       clear_bit(SP_CONGESTED, &pool->sp_flags);
        clear_bit(RQ_BUSY, &rqstp->rq_flags);
-       smp_mb();
+       smp_mb__after_atomic();
 
        if (likely(rqst_should_sleep(rqstp)))
                time_left = schedule_timeout(timeout);
@@ -761,13 +714,11 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 
        try_to_freeze();
 
-       spin_lock_bh(&rqstp->rq_lock);
        set_bit(RQ_BUSY, &rqstp->rq_flags);
-       spin_unlock_bh(&rqstp->rq_lock);
-
-       xprt = rqstp->rq_xprt;
-       if (xprt != NULL)
-               return xprt;
+       smp_mb__after_atomic();
+       rqstp->rq_xprt = svc_xprt_dequeue(pool);
+       if (rqstp->rq_xprt)
+               goto out_found;
 
        if (!time_left)
                atomic_long_inc(&pool->sp_stats.threads_timedout);
@@ -775,6 +726,15 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
        if (signalled() || kthread_should_stop())
                return ERR_PTR(-EINTR);
        return ERR_PTR(-EAGAIN);
+out_found:
+       /* Normally we will wait up to 5 seconds for any required
+        * cache information to be provided.
+        */
+       if (!test_bit(SP_CONGESTED, &pool->sp_flags))
+               rqstp->rq_chandle.thread_wait = 5*HZ;
+       else
+               rqstp->rq_chandle.thread_wait = 1*HZ;
+       return rqstp->rq_xprt;
 }
 
 static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)