Merge branches 'doc.2017.08.17a', 'fixes.2017.08.17a', 'hotplug.2017.07.25b', 'misc...
[sfrench/cifs-2.6.git] / kernel / rcu / tree_plugin.h
index 908b309d60d7bc29e82f3a049d928fbd8140dfd2..55bde94b95728bae7eb1e9b4ead94e18dfa0e4e8 100644 (file)
@@ -180,6 +180,8 @@ static void rcu_preempt_ctxt_queue(struct rcu_node *rnp, struct rcu_data *rdp)
        struct task_struct *t = current;
 
        lockdep_assert_held(&rnp->lock);
+       WARN_ON_ONCE(rdp->mynode != rnp);
+       WARN_ON_ONCE(rnp->level != rcu_num_lvls - 1);
 
        /*
         * Decide where to queue the newly blocked task.  In theory,
@@ -261,6 +263,10 @@ static void rcu_preempt_ctxt_queue(struct rcu_node *rnp, struct rcu_data *rdp)
                rnp->gp_tasks = &t->rcu_node_entry;
        if (!rnp->exp_tasks && (blkd_state & RCU_EXP_BLKD))
                rnp->exp_tasks = &t->rcu_node_entry;
+       WARN_ON_ONCE(!(blkd_state & RCU_GP_BLKD) !=
+                    !(rnp->qsmask & rdp->grpmask));
+       WARN_ON_ONCE(!(blkd_state & RCU_EXP_BLKD) !=
+                    !(rnp->expmask & rdp->grpmask));
        raw_spin_unlock_rcu_node(rnp); /* interrupts remain disabled. */
 
        /*
@@ -482,6 +488,7 @@ void rcu_read_unlock_special(struct task_struct *t)
                rnp = t->rcu_blocked_node;
                raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
                WARN_ON_ONCE(rnp != t->rcu_blocked_node);
+               WARN_ON_ONCE(rnp->level != rcu_num_lvls - 1);
                empty_norm = !rcu_preempt_blocked_readers_cgp(rnp);
                empty_exp = sync_rcu_preempt_exp_done(rnp);
                smp_mb(); /* ensure expedited fastpath sees end of RCU c-s. */
@@ -495,10 +502,10 @@ void rcu_read_unlock_special(struct task_struct *t)
                if (&t->rcu_node_entry == rnp->exp_tasks)
                        rnp->exp_tasks = np;
                if (IS_ENABLED(CONFIG_RCU_BOOST)) {
-                       if (&t->rcu_node_entry == rnp->boost_tasks)
-                               rnp->boost_tasks = np;
                        /* Snapshot ->boost_mtx ownership w/rnp->lock held. */
                        drop_boost_mutex = rt_mutex_owner(&rnp->boost_mtx) == t;
+                       if (&t->rcu_node_entry == rnp->boost_tasks)
+                               rnp->boost_tasks = np;
                }
 
                /*
@@ -636,10 +643,17 @@ static int rcu_print_task_exp_stall(struct rcu_node *rnp)
  */
 static void rcu_preempt_check_blocked_tasks(struct rcu_node *rnp)
 {
+       struct task_struct *t;
+
        RCU_LOCKDEP_WARN(preemptible(), "rcu_preempt_check_blocked_tasks() invoked with preemption enabled!!!\n");
        WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp));
-       if (rcu_preempt_has_tasks(rnp))
+       if (rcu_preempt_has_tasks(rnp)) {
                rnp->gp_tasks = rnp->blkd_tasks.next;
+               t = container_of(rnp->gp_tasks, struct task_struct,
+                                rcu_node_entry);
+               trace_rcu_unlock_preempted_task(TPS("rcu_preempt-GPS"),
+                                               rnp->gpnum, t->pid);
+       }
        WARN_ON_ONCE(rnp->qsmask);
 }
 
@@ -1788,22 +1802,61 @@ bool rcu_is_nocb_cpu(int cpu)
 }
 
 /*
- * Kick the leader kthread for this NOCB group.
+ * Kick the leader kthread for this NOCB group.  Caller holds ->nocb_lock
+ * and this function releases it.
  */
-static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+static void __wake_nocb_leader(struct rcu_data *rdp, bool force,
+                              unsigned long flags)
+       __releases(rdp->nocb_lock)
 {
        struct rcu_data *rdp_leader = rdp->nocb_leader;
 
-       if (!READ_ONCE(rdp_leader->nocb_kthread))
+       lockdep_assert_held(&rdp->nocb_lock);
+       if (!READ_ONCE(rdp_leader->nocb_kthread)) {
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
                return;
-       if (READ_ONCE(rdp_leader->nocb_leader_sleep) || force) {
+       }
+       if (rdp_leader->nocb_leader_sleep || force) {
                /* Prior smp_mb__after_atomic() orders against prior enqueue. */
                WRITE_ONCE(rdp_leader->nocb_leader_sleep, false);
+               del_timer(&rdp->nocb_timer);
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
                smp_mb(); /* ->nocb_leader_sleep before swake_up(). */
                swake_up(&rdp_leader->nocb_wq);
+       } else {
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
        }
 }
 
+/*
+ * Kick the leader kthread for this NOCB group, but caller has not
+ * acquired locks.
+ */
+static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+{
+       unsigned long flags;
+
+       raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+       __wake_nocb_leader(rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the leader kthread for this NOCB group at some
+ * future time when it is safe to do so.
+ */
+static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype,
+                                  const char *reason)
+{
+       unsigned long flags;
+
+       raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+       if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
+               mod_timer(&rdp->nocb_timer, jiffies + 1);
+       WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
+       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, reason);
+       raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+}
+
 /*
  * Does the specified CPU need an RCU callback for the specified flavor
  * of rcu_barrier()?
@@ -1891,11 +1944,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                            TPS("WakeEmpty"));
                } else {
-                       WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE);
-                       /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
-                       smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           TPS("WakeEmptyIsDeferred"));
+                       wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+                                              TPS("WakeEmptyIsDeferred"));
                }
                rdp->qlen_last_fqs_check = 0;
        } else if (len > rdp->qlen_last_fqs_check + qhimark) {
@@ -1905,11 +1955,8 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                            TPS("WakeOvf"));
                } else {
-                       WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_FORCE);
-                       /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
-                       smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           TPS("WakeOvfIsDeferred"));
+                       wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+                                              TPS("WakeOvfIsDeferred"));
                }
                rdp->qlen_last_fqs_check = LONG_MAX / 2;
        } else {
@@ -1961,30 +2008,19 @@ static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
  * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
  * not a no-CBs CPU.
  */
-static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
                                                     struct rcu_data *rdp,
                                                     unsigned long flags)
 {
-       long ql = rsp->orphan_done.len;
-       long qll = rsp->orphan_done.len_lazy;
-
-       /* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+       RCU_LOCKDEP_WARN(!irqs_disabled(), "rcu_nocb_adopt_orphan_cbs() invoked with irqs enabled!!!");
        if (!rcu_is_nocb_cpu(smp_processor_id()))
-               return false;
-
-       /* First, enqueue the donelist, if any.  This preserves CB ordering. */
-       if (rsp->orphan_done.head) {
-               __call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_done),
-                                       rcu_cblist_tail(&rsp->orphan_done),
-                                       ql, qll, flags);
-       }
-       if (rsp->orphan_pend.head) {
-               __call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_pend),
-                                       rcu_cblist_tail(&rsp->orphan_pend),
-                                       ql, qll, flags);
-       }
-       rcu_cblist_init(&rsp->orphan_done);
-       rcu_cblist_init(&rsp->orphan_pend);
+               return false; /* Not NOCBs CPU, caller must migrate CBs. */
+       __call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist),
+                               rcu_segcblist_tail(&rdp->cblist),
+                               rcu_segcblist_n_cbs(&rdp->cblist),
+                               rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags);
+       rcu_segcblist_init(&rdp->cblist);
+       rcu_segcblist_disable(&rdp->cblist);
        return true;
 }
 
@@ -2031,6 +2067,7 @@ static void rcu_nocb_wait_gp(struct rcu_data *rdp)
 static void nocb_leader_wait(struct rcu_data *my_rdp)
 {
        bool firsttime = true;
+       unsigned long flags;
        bool gotcbs;
        struct rcu_data *rdp;
        struct rcu_head **tail;
@@ -2039,13 +2076,17 @@ wait_again:
 
        /* Wait for callbacks to appear. */
        if (!rcu_nocb_poll) {
-               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
+               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, TPS("Sleep"));
                swait_event_interruptible(my_rdp->nocb_wq,
                                !READ_ONCE(my_rdp->nocb_leader_sleep));
-               /* Memory barrier handled by smp_mb() calls below and repoll. */
+               raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
+               my_rdp->nocb_leader_sleep = true;
+               WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+               del_timer(&my_rdp->nocb_timer);
+               raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
        } else if (firsttime) {
                firsttime = false; /* Don't drown trace log with "Poll"! */
-               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
+               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, TPS("Poll"));
        }
 
        /*
@@ -2054,7 +2095,7 @@ wait_again:
         * nocb_gp_head, where they await a grace period.
         */
        gotcbs = false;
-       smp_mb(); /* wakeup before ->nocb_head reads. */
+       smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
        for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
                rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head);
                if (!rdp->nocb_gp_head)
@@ -2066,56 +2107,41 @@ wait_again:
                gotcbs = true;
        }
 
-       /*
-        * If there were no callbacks, sleep a bit, rescan after a
-        * memory barrier, and go retry.
-        */
+       /* No callbacks?  Sleep a bit if polling, and go retry.  */
        if (unlikely(!gotcbs)) {
-               if (!rcu_nocb_poll)
-                       trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
-                                           "WokeEmpty");
                WARN_ON(signal_pending(current));
-               schedule_timeout_interruptible(1);
-
-               /* Rescan in case we were a victim of memory ordering. */
-               my_rdp->nocb_leader_sleep = true;
-               smp_mb();  /* Ensure _sleep true before scan. */
-               for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
-                       if (READ_ONCE(rdp->nocb_head)) {
-                               /* Found CB, so short-circuit next wait. */
-                               my_rdp->nocb_leader_sleep = false;
-                               break;
-                       }
+               if (rcu_nocb_poll) {
+                       schedule_timeout_interruptible(1);
+               } else {
+                       trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
+                                           TPS("WokeEmpty"));
+               }
                goto wait_again;
        }
 
        /* Wait for one grace period. */
        rcu_nocb_wait_gp(my_rdp);
 
-       /*
-        * We left ->nocb_leader_sleep unset to reduce cache thrashing.
-        * We set it now, but recheck for new callbacks while
-        * traversing our follower list.
-        */
-       my_rdp->nocb_leader_sleep = true;
-       smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */
-
        /* Each pass through the following loop wakes a follower, if needed. */
        for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
-               if (READ_ONCE(rdp->nocb_head))
+               if (!rcu_nocb_poll &&
+                   READ_ONCE(rdp->nocb_head) &&
+                   READ_ONCE(my_rdp->nocb_leader_sleep)) {
+                       raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
                        my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
+                       raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
+               }
                if (!rdp->nocb_gp_head)
                        continue; /* No CBs, so no need to wake follower. */
 
                /* Append callbacks to follower's "done" list. */
-               tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
+               raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+               tail = rdp->nocb_follower_tail;
+               rdp->nocb_follower_tail = rdp->nocb_gp_tail;
                *tail = rdp->nocb_gp_head;
-               smp_mb__after_atomic(); /* Store *tail before wakeup. */
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
                if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
-                       /*
-                        * List was empty, wake up the follower.
-                        * Memory barriers supplied by atomic_long_add().
-                        */
+                       /* List was empty, so wake up the follower.  */
                        swake_up(&rdp->nocb_wq);
                }
        }
@@ -2131,28 +2157,16 @@ wait_again:
  */
 static void nocb_follower_wait(struct rcu_data *rdp)
 {
-       bool firsttime = true;
-
        for (;;) {
-               if (!rcu_nocb_poll) {
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           "FollowerSleep");
-                       swait_event_interruptible(rdp->nocb_wq,
-                                                READ_ONCE(rdp->nocb_follower_head));
-               } else if (firsttime) {
-                       /* Don't drown trace log with "Poll"! */
-                       firsttime = false;
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
-               }
+               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("FollowerSleep"));
+               swait_event_interruptible(rdp->nocb_wq,
+                                        READ_ONCE(rdp->nocb_follower_head));
                if (smp_load_acquire(&rdp->nocb_follower_head)) {
                        /* ^^^ Ensure CB invocation follows _head test. */
                        return;
                }
-               if (!rcu_nocb_poll)
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           "WokeEmpty");
                WARN_ON(signal_pending(current));
-               schedule_timeout_interruptible(1);
+               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WokeEmpty"));
        }
 }
 
@@ -2165,6 +2179,7 @@ static void nocb_follower_wait(struct rcu_data *rdp)
 static int rcu_nocb_kthread(void *arg)
 {
        int c, cl;
+       unsigned long flags;
        struct rcu_head *list;
        struct rcu_head *next;
        struct rcu_head **tail;
@@ -2179,11 +2194,14 @@ static int rcu_nocb_kthread(void *arg)
                        nocb_follower_wait(rdp);
 
                /* Pull the ready-to-invoke callbacks onto local list. */
-               list = READ_ONCE(rdp->nocb_follower_head);
+               raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+               list = rdp->nocb_follower_head;
+               rdp->nocb_follower_head = NULL;
+               tail = rdp->nocb_follower_tail;
+               rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
                BUG_ON(!list);
-               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
-               WRITE_ONCE(rdp->nocb_follower_head, NULL);
-               tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
+               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WokeNonEmpty"));
 
                /* Each pass through the following loop invokes a callback. */
                trace_rcu_batch_start(rdp->rsp->name,
@@ -2226,18 +2244,39 @@ static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
 }
 
 /* Do a deferred wakeup of rcu_nocb_kthread(). */
-static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
 {
+       unsigned long flags;
        int ndw;
 
-       if (!rcu_nocb_need_deferred_wakeup(rdp))
+       raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+       if (!rcu_nocb_need_deferred_wakeup(rdp)) {
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
                return;
+       }
        ndw = READ_ONCE(rdp->nocb_defer_wakeup);
        WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-       wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE);
+       __wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWake"));
 }
 
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(unsigned long x)
+{
+       do_nocb_deferred_wakeup_common((struct rcu_data *)x);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check.  Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+       if (rcu_nocb_need_deferred_wakeup(rdp))
+               do_nocb_deferred_wakeup_common(rdp);
+}
+
 void __init rcu_init_nohz(void)
 {
        int cpu;
@@ -2287,6 +2326,9 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
        rdp->nocb_tail = &rdp->nocb_head;
        init_swait_queue_head(&rdp->nocb_wq);
        rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+       raw_spin_lock_init(&rdp->nocb_lock);
+       setup_timer(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer,
+                   (unsigned long)rdp);
 }
 
 /*
@@ -2459,7 +2501,7 @@ static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
        return false;
 }
 
-static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp,
                                                     struct rcu_data *rdp,
                                                     unsigned long flags)
 {