Merge branch 'core-rcu-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / kernel / rcu / tree.c
index a6dcf3bd244f0b4b94ab3670a7c0ffe1670a2af0..e354e475e645ff4f0fab186913a274e931da0db3 100644 (file)
@@ -98,8 +98,8 @@ struct rcu_state sname##_state = { \
        .gpnum = 0UL - 300UL, \
        .completed = 0UL - 300UL, \
        .orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
-       .orphan_nxttail = &sname##_state.orphan_nxtlist, \
-       .orphan_donetail = &sname##_state.orphan_donelist, \
+       .orphan_pend = RCU_CBLIST_INITIALIZER(sname##_state.orphan_pend), \
+       .orphan_done = RCU_CBLIST_INITIALIZER(sname##_state.orphan_done), \
        .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
        .name = RCU_STATE_NAME(sname), \
        .abbr = sabbr, \
@@ -124,7 +124,7 @@ static int rcu_fanout_leaf = RCU_FANOUT_LEAF;
 module_param(rcu_fanout_leaf, int, 0444);
 int rcu_num_lvls __read_mostly = RCU_NUM_LVLS;
 /* Number of rcu_nodes at specified level. */
-static int num_rcu_lvl[] = NUM_RCU_LVL_INIT;
+int num_rcu_lvl[] = NUM_RCU_LVL_INIT;
 int rcu_num_nodes __read_mostly = NUM_RCU_NODES; /* Total # rcu_nodes in use. */
 /* panic() on RCU Stall sysctl. */
 int sysctl_panic_on_rcu_stall __read_mostly;
@@ -200,7 +200,7 @@ static const int gp_cleanup_delay;
 
 /*
  * Number of grace periods between delays, normalized by the duration of
- * the delay.  The longer the the delay, the more the grace periods between
+ * the delay.  The longer the delay, the more the grace periods between
  * each delay.  The reason for this normalization is that it means that,
  * for non-zero delays, the overall slowdown of grace periods is constant
  * regardless of the duration of the delay.  This arrangement balances
@@ -273,11 +273,19 @@ void rcu_bh_qs(void)
        }
 }
 
-static DEFINE_PER_CPU(int, rcu_sched_qs_mask);
+/*
+ * Steal a bit from the bottom of ->dynticks for idle entry/exit
+ * control.  Initially this is for TLB flushing.
+ */
+#define RCU_DYNTICK_CTRL_MASK 0x1
+#define RCU_DYNTICK_CTRL_CTR  (RCU_DYNTICK_CTRL_MASK + 1)
+#ifndef rcu_eqs_special_exit
+#define rcu_eqs_special_exit() do { } while (0)
+#endif
 
 static DEFINE_PER_CPU(struct rcu_dynticks, rcu_dynticks) = {
        .dynticks_nesting = DYNTICK_TASK_EXIT_IDLE,
-       .dynticks = ATOMIC_INIT(1),
+       .dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
 #ifdef CONFIG_NO_HZ_FULL_SYSIDLE
        .dynticks_idle_nesting = DYNTICK_TASK_NEST_VALUE,
        .dynticks_idle = ATOMIC_INIT(1),
@@ -305,15 +313,20 @@ bool rcu_irq_enter_disabled(void)
 static void rcu_dynticks_eqs_enter(void)
 {
        struct rcu_dynticks *rdtp = this_cpu_ptr(&rcu_dynticks);
-       int special;
+       int seq;
 
        /*
-        * CPUs seeing atomic_inc_return() must see prior RCU read-side
+        * CPUs seeing atomic_add_return() must see prior RCU read-side
         * critical sections, and we also must force ordering with the
         * next idle sojourn.
         */
-       special = atomic_inc_return(&rdtp->dynticks);
-       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && special & 0x1);
+       seq = atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdtp->dynticks);
+       /* Better be in an extended quiescent state! */
+       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
+                    (seq & RCU_DYNTICK_CTRL_CTR));
+       /* Better not have special action (TLB flush) pending! */
+       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
+                    (seq & RCU_DYNTICK_CTRL_MASK));
 }
 
 /*
@@ -323,15 +336,22 @@ static void rcu_dynticks_eqs_enter(void)
 static void rcu_dynticks_eqs_exit(void)
 {
        struct rcu_dynticks *rdtp = this_cpu_ptr(&rcu_dynticks);
-       int special;
+       int seq;
 
        /*
-        * CPUs seeing atomic_inc_return() must see prior idle sojourns,
+        * CPUs seeing atomic_add_return() must see prior idle sojourns,
         * and we also must force ordering with the next RCU read-side
         * critical section.
         */
-       special = atomic_inc_return(&rdtp->dynticks);
-       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !(special & 0x1));
+       seq = atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdtp->dynticks);
+       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
+                    !(seq & RCU_DYNTICK_CTRL_CTR));
+       if (seq & RCU_DYNTICK_CTRL_MASK) {
+               atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdtp->dynticks);
+               smp_mb__after_atomic(); /* _exit after clearing mask. */
+               /* Prefer duplicate flushes to losing a flush. */
+               rcu_eqs_special_exit();
+       }
 }
 
 /*
@@ -348,9 +368,9 @@ static void rcu_dynticks_eqs_online(void)
 {
        struct rcu_dynticks *rdtp = this_cpu_ptr(&rcu_dynticks);
 
-       if (atomic_read(&rdtp->dynticks) & 0x1)
+       if (atomic_read(&rdtp->dynticks) & RCU_DYNTICK_CTRL_CTR)
                return;
-       atomic_add(0x1, &rdtp->dynticks);
+       atomic_add(RCU_DYNTICK_CTRL_CTR, &rdtp->dynticks);
 }
 
 /*
@@ -362,7 +382,7 @@ bool rcu_dynticks_curr_cpu_in_eqs(void)
 {
        struct rcu_dynticks *rdtp = this_cpu_ptr(&rcu_dynticks);
 
-       return !(atomic_read(&rdtp->dynticks) & 0x1);
+       return !(atomic_read(&rdtp->dynticks) & RCU_DYNTICK_CTRL_CTR);
 }
 
 /*
@@ -373,7 +393,7 @@ int rcu_dynticks_snap(struct rcu_dynticks *rdtp)
 {
        int snap = atomic_add_return(0, &rdtp->dynticks);
 
-       return snap;
+       return snap & ~RCU_DYNTICK_CTRL_MASK;
 }
 
 /*
@@ -382,7 +402,7 @@ int rcu_dynticks_snap(struct rcu_dynticks *rdtp)
  */
 static bool rcu_dynticks_in_eqs(int snap)
 {
-       return !(snap & 0x1);
+       return !(snap & RCU_DYNTICK_CTRL_CTR);
 }
 
 /*
@@ -402,14 +422,34 @@ static bool rcu_dynticks_in_eqs_since(struct rcu_dynticks *rdtp, int snap)
 static void rcu_dynticks_momentary_idle(void)
 {
        struct rcu_dynticks *rdtp = this_cpu_ptr(&rcu_dynticks);
-       int special = atomic_add_return(2, &rdtp->dynticks);
+       int special = atomic_add_return(2 * RCU_DYNTICK_CTRL_CTR,
+                                       &rdtp->dynticks);
 
        /* It is illegal to call this from idle state. */
-       WARN_ON_ONCE(!(special & 0x1));
+       WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
 }
 
-DEFINE_PER_CPU_SHARED_ALIGNED(unsigned long, rcu_qs_ctr);
-EXPORT_PER_CPU_SYMBOL_GPL(rcu_qs_ctr);
+/*
+ * Set the special (bottom) bit of the specified CPU so that it
+ * will take special action (such as flushing its TLB) on the
+ * next exit from an extended quiescent state.  Returns true if
+ * the bit was successfully set, or false if the CPU was not in
+ * an extended quiescent state.
+ */
+bool rcu_eqs_special_set(int cpu)
+{
+       int old;
+       int new;
+       struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
+
+       do {
+               old = atomic_read(&rdtp->dynticks);
+               if (old & RCU_DYNTICK_CTRL_CTR)
+                       return false;
+               new = old | RCU_DYNTICK_CTRL_MASK;
+       } while (atomic_cmpxchg(&rdtp->dynticks, old, new) != old);
+       return true;
+}
 
 /*
  * Let the RCU core know that this CPU has gone through the scheduler,
@@ -418,44 +458,14 @@ EXPORT_PER_CPU_SYMBOL_GPL(rcu_qs_ctr);
  * memory barriers to let the RCU core know about it, regardless of what
  * this CPU might (or might not) do in the near future.
  *
- * We inform the RCU core by emulating a zero-duration dyntick-idle
- * period, which we in turn do by incrementing the ->dynticks counter
- * by two.
+ * We inform the RCU core by emulating a zero-duration dyntick-idle period.
  *
  * The caller must have disabled interrupts.
  */
 static void rcu_momentary_dyntick_idle(void)
 {
-       struct rcu_data *rdp;
-       int resched_mask;
-       struct rcu_state *rsp;
-
-       /*
-        * Yes, we can lose flag-setting operations.  This is OK, because
-        * the flag will be set again after some delay.
-        */
-       resched_mask = raw_cpu_read(rcu_sched_qs_mask);
-       raw_cpu_write(rcu_sched_qs_mask, 0);
-
-       /* Find the flavor that needs a quiescent state. */
-       for_each_rcu_flavor(rsp) {
-               rdp = raw_cpu_ptr(rsp->rda);
-               if (!(resched_mask & rsp->flavor_mask))
-                       continue;
-               smp_mb(); /* rcu_sched_qs_mask before cond_resched_completed. */
-               if (READ_ONCE(rdp->mynode->completed) !=
-                   READ_ONCE(rdp->cond_resched_completed))
-                       continue;
-
-               /*
-                * Pretend to be momentarily idle for the quiescent state.
-                * This allows the grace-period kthread to record the
-                * quiescent state, with no need for this CPU to do anything
-                * further.
-                */
-               rcu_dynticks_momentary_idle();
-               break;
-       }
+       raw_cpu_write(rcu_dynticks.rcu_need_heavy_qs, false);
+       rcu_dynticks_momentary_idle();
 }
 
 /*
@@ -463,14 +473,22 @@ static void rcu_momentary_dyntick_idle(void)
  * and requires special handling for preemptible RCU.
  * The caller must have disabled interrupts.
  */
-void rcu_note_context_switch(void)
+void rcu_note_context_switch(bool preempt)
 {
        barrier(); /* Avoid RCU read-side critical sections leaking down. */
        trace_rcu_utilization(TPS("Start context switch"));
        rcu_sched_qs();
        rcu_preempt_note_context_switch();
-       if (unlikely(raw_cpu_read(rcu_sched_qs_mask)))
+       /* Load rcu_urgent_qs before other flags. */
+       if (!smp_load_acquire(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs)))
+               goto out;
+       this_cpu_write(rcu_dynticks.rcu_urgent_qs, false);
+       if (unlikely(raw_cpu_read(rcu_dynticks.rcu_need_heavy_qs)))
                rcu_momentary_dyntick_idle();
+       this_cpu_inc(rcu_dynticks.rcu_qs_ctr);
+       if (!preempt)
+               rcu_note_voluntary_context_switch_lite(current);
+out:
        trace_rcu_utilization(TPS("End context switch"));
        barrier(); /* Avoid RCU read-side critical sections leaking up. */
 }
@@ -493,29 +511,26 @@ void rcu_all_qs(void)
 {
        unsigned long flags;
 
+       if (!raw_cpu_read(rcu_dynticks.rcu_urgent_qs))
+               return;
+       preempt_disable();
+       /* Load rcu_urgent_qs before other flags. */
+       if (!smp_load_acquire(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs))) {
+               preempt_enable();
+               return;
+       }
+       this_cpu_write(rcu_dynticks.rcu_urgent_qs, false);
        barrier(); /* Avoid RCU read-side critical sections leaking down. */
-       if (unlikely(raw_cpu_read(rcu_sched_qs_mask))) {
+       if (unlikely(raw_cpu_read(rcu_dynticks.rcu_need_heavy_qs))) {
                local_irq_save(flags);
                rcu_momentary_dyntick_idle();
                local_irq_restore(flags);
        }
-       if (unlikely(raw_cpu_read(rcu_sched_data.cpu_no_qs.b.exp))) {
-               /*
-                * Yes, we just checked a per-CPU variable with preemption
-                * enabled, so we might be migrated to some other CPU at
-                * this point.  That is OK because in that case, the
-                * migration will supply the needed quiescent state.
-                * We might end up needlessly disabling preemption and
-                * invoking rcu_sched_qs() on the destination CPU, but
-                * the probability and cost are both quite low, so this
-                * should not be a problem in practice.
-                */
-               preempt_disable();
+       if (unlikely(raw_cpu_read(rcu_sched_data.cpu_no_qs.b.exp)))
                rcu_sched_qs();
-               preempt_enable();
-       }
-       this_cpu_inc(rcu_qs_ctr);
+       this_cpu_inc(rcu_dynticks.rcu_qs_ctr);
        barrier(); /* Avoid RCU read-side critical sections leaking up. */
+       preempt_enable();
 }
 EXPORT_SYMBOL_GPL(rcu_all_qs);
 
@@ -704,15 +719,11 @@ void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
        default:
                break;
        }
-       if (rsp != NULL) {
-               *flags = READ_ONCE(rsp->gp_flags);
-               *gpnum = READ_ONCE(rsp->gpnum);
-               *completed = READ_ONCE(rsp->completed);
+       if (rsp == NULL)
                return;
-       }
-       *flags = 0;
-       *gpnum = 0;
-       *completed = 0;
+       *flags = READ_ONCE(rsp->gp_flags);
+       *gpnum = READ_ONCE(rsp->gpnum);
+       *completed = READ_ONCE(rsp->completed);
 }
 EXPORT_SYMBOL_GPL(rcutorture_get_gp_data);
 
@@ -727,16 +738,6 @@ void rcutorture_record_progress(unsigned long vernum)
 }
 EXPORT_SYMBOL_GPL(rcutorture_record_progress);
 
-/*
- * Does the CPU have callbacks ready to be invoked?
- */
-static int
-cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
-{
-       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
-              rdp->nxttail[RCU_NEXT_TAIL] != NULL;
-}
-
 /*
  * Return the root node of the specified rcu_state structure.
  */
@@ -767,21 +768,17 @@ static int rcu_future_needs_gp(struct rcu_state *rsp)
 static bool
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-       int i;
-
        if (rcu_gp_in_progress(rsp))
                return false;  /* No, a grace period is already in progress. */
        if (rcu_future_needs_gp(rsp))
                return true;  /* Yes, a no-CBs CPU needs one. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL])
+       if (!rcu_segcblist_is_enabled(&rdp->cblist))
                return false;  /* No, this is a no-CBs (or offline) CPU. */
-       if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
+       if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
                return true;  /* Yes, CPU has newly registered callbacks. */
-       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
-               if (rdp->nxttail[i - 1] != rdp->nxttail[i] &&
-                   ULONG_CMP_LT(READ_ONCE(rsp->completed),
-                                rdp->nxtcompleted[i]))
-                       return true;  /* Yes, CBs for future grace period. */
+       if (rcu_segcblist_future_gp_needed(&rdp->cblist,
+                                          READ_ONCE(rsp->completed)))
+               return true;  /* Yes, CBs for future grace period. */
        return false; /* No grace period needed. */
 }
 
@@ -1162,6 +1159,24 @@ bool notrace rcu_is_watching(void)
 }
 EXPORT_SYMBOL_GPL(rcu_is_watching);
 
+/*
+ * If a holdout task is actually running, request an urgent quiescent
+ * state from its CPU.  This is unsynchronized, so migrations can cause
+ * the request to go to the wrong CPU.  Which is OK, all that will happen
+ * is that the CPU's next context switch will be a bit slower and next
+ * time around this task will generate another request.
+ */
+void rcu_request_urgent_qs_task(struct task_struct *t)
+{
+       int cpu;
+
+       barrier();
+       cpu = task_cpu(t);
+       if (!task_curr(t))
+               return; /* This task is not running on that CPU. */
+       smp_store_release(per_cpu_ptr(&rcu_dynticks.rcu_urgent_qs, cpu), true);
+}
+
 #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU)
 
 /*
@@ -1247,7 +1262,8 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
                                    bool *isidle, unsigned long *maxj)
 {
        unsigned long jtsq;
-       int *rcrmp;
+       bool *rnhqp;
+       bool *ruqp;
        unsigned long rjtsc;
        struct rcu_node *rnp;
 
@@ -1283,11 +1299,15 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
         * might not be the case for nohz_full CPUs looping in the kernel.
         */
        rnp = rdp->mynode;
+       ruqp = per_cpu_ptr(&rcu_dynticks.rcu_urgent_qs, rdp->cpu);
        if (time_after(jiffies, rdp->rsp->gp_start + jtsq) &&
-           READ_ONCE(rdp->rcu_qs_ctr_snap) != per_cpu(rcu_qs_ctr, rdp->cpu) &&
+           READ_ONCE(rdp->rcu_qs_ctr_snap) != per_cpu(rcu_dynticks.rcu_qs_ctr, rdp->cpu) &&
            READ_ONCE(rdp->gpnum) == rnp->gpnum && !rdp->gpwrap) {
                trace_rcu_fqs(rdp->rsp->name, rdp->gpnum, rdp->cpu, TPS("rqc"));
                return 1;
+       } else {
+               /* Load rcu_qs_ctr before store to rcu_urgent_qs. */
+               smp_store_release(ruqp, true);
        }
 
        /* Check for the CPU being offline. */
@@ -1304,7 +1324,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
         * in-kernel CPU-bound tasks cannot advance grace periods.
         * So if the grace period is old enough, make the CPU pay attention.
         * Note that the unsynchronized assignments to the per-CPU
-        * rcu_sched_qs_mask variable are safe.  Yes, setting of
+        * rcu_need_heavy_qs variable are safe.  Yes, setting of
         * bits can be lost, but they will be set again on the next
         * force-quiescent-state pass.  So lost bit sets do not result
         * in incorrect behavior, merely in a grace period lasting
@@ -1318,16 +1338,13 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
         * is set too high, we override with half of the RCU CPU stall
         * warning delay.
         */
-       rcrmp = &per_cpu(rcu_sched_qs_mask, rdp->cpu);
-       if (time_after(jiffies, rdp->rsp->gp_start + jtsq) ||
-           time_after(jiffies, rdp->rsp->jiffies_resched)) {
-               if (!(READ_ONCE(*rcrmp) & rdp->rsp->flavor_mask)) {
-                       WRITE_ONCE(rdp->cond_resched_completed,
-                                  READ_ONCE(rdp->mynode->completed));
-                       smp_mb(); /* ->cond_resched_completed before *rcrmp. */
-                       WRITE_ONCE(*rcrmp,
-                                  READ_ONCE(*rcrmp) + rdp->rsp->flavor_mask);
-               }
+       rnhqp = &per_cpu(rcu_dynticks.rcu_need_heavy_qs, rdp->cpu);
+       if (!READ_ONCE(*rnhqp) &&
+           (time_after(jiffies, rdp->rsp->gp_start + jtsq) ||
+            time_after(jiffies, rdp->rsp->jiffies_resched))) {
+               WRITE_ONCE(*rnhqp, true);
+               /* Store rcu_need_heavy_qs before rcu_urgent_qs. */
+               smp_store_release(ruqp, true);
                rdp->rsp->jiffies_resched += 5; /* Re-enable beating. */
        }
 
@@ -1487,7 +1504,8 @@ static void print_other_cpu_stall(struct rcu_state *rsp, unsigned long gpnum)
 
        print_cpu_stall_info_end();
        for_each_possible_cpu(cpu)
-               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+               totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+                                                           cpu)->cblist);
        pr_cont("(detected by %d, t=%ld jiffies, g=%ld, c=%ld, q=%lu)\n",
               smp_processor_id(), (long)(jiffies - rsp->gp_start),
               (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1541,7 +1559,8 @@ static void print_cpu_stall(struct rcu_state *rsp)
        print_cpu_stall_info(rsp, smp_processor_id());
        print_cpu_stall_info_end();
        for_each_possible_cpu(cpu)
-               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+               totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+                                                           cpu)->cblist);
        pr_cont(" (t=%lu jiffies g=%ld c=%ld q=%lu)\n",
                jiffies - rsp->gp_start,
                (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1643,30 +1662,6 @@ void rcu_cpu_stall_reset(void)
                WRITE_ONCE(rsp->jiffies_stall, jiffies + ULONG_MAX / 2);
 }
 
-/*
- * Initialize the specified rcu_data structure's default callback list
- * to empty.  The default callback list is the one that is not used by
- * no-callbacks CPUs.
- */
-static void init_default_callback_list(struct rcu_data *rdp)
-{
-       int i;
-
-       rdp->nxtlist = NULL;
-       for (i = 0; i < RCU_NEXT_SIZE; i++)
-               rdp->nxttail[i] = &rdp->nxtlist;
-}
-
-/*
- * Initialize the specified rcu_data structure's callback list to empty.
- */
-static void init_callback_list(struct rcu_data *rdp)
-{
-       if (init_nocb_callback_list(rdp))
-               return;
-       init_default_callback_list(rdp);
-}
-
 /*
  * Determine the value that ->completed will have at the end of the
  * next subsequent grace period.  This is used to tag callbacks so that
@@ -1721,7 +1716,6 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
                    unsigned long *c_out)
 {
        unsigned long c;
-       int i;
        bool ret = false;
        struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
 
@@ -1767,13 +1761,11 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
        /*
         * Get a new grace-period number.  If there really is no grace
         * period in progress, it will be smaller than the one we obtained
-        * earlier.  Adjust callbacks as needed.  Note that even no-CBs
-        * CPUs have a ->nxtcompleted[] array, so no no-CBs checks needed.
+        * earlier.  Adjust callbacks as needed.
         */
        c = rcu_cbs_completed(rdp->rsp, rnp_root);
-       for (i = RCU_DONE_TAIL; i < RCU_NEXT_TAIL; i++)
-               if (ULONG_CMP_LT(c, rdp->nxtcompleted[i]))
-                       rdp->nxtcompleted[i] = c;
+       if (!rcu_is_nocb_cpu(rdp->cpu))
+               (void)rcu_segcblist_accelerate(&rdp->cblist, c);
 
        /*
         * If the needed for the required grace period is already
@@ -1805,9 +1797,7 @@ out:
 
 /*
  * Clean up any old requests for the just-ended grace period.  Also return
- * whether any additional grace periods have been requested.  Also invoke
- * rcu_nocb_gp_cleanup() in order to wake up any no-callbacks kthreads
- * waiting for this grace period to complete.
+ * whether any additional grace periods have been requested.
  */
 static int rcu_future_gp_cleanup(struct rcu_state *rsp, struct rcu_node *rnp)
 {
@@ -1853,57 +1843,27 @@ static void rcu_gp_kthread_wake(struct rcu_state *rsp)
 static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                               struct rcu_data *rdp)
 {
-       unsigned long c;
-       int i;
-       bool ret;
-
-       /* If the CPU has no callbacks, nothing to do. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
-               return false;
-
-       /*
-        * Starting from the sublist containing the callbacks most
-        * recently assigned a ->completed number and working down, find the
-        * first sublist that is not assignable to an upcoming grace period.
-        * Such a sublist has something in it (first two tests) and has
-        * a ->completed number assigned that will complete sooner than
-        * the ->completed number for newly arrived callbacks (last test).
-        *
-        * The key point is that any later sublist can be assigned the
-        * same ->completed number as the newly arrived callbacks, which
-        * means that the callbacks in any of these later sublist can be
-        * grouped into a single sublist, whether or not they have already
-        * been assigned a ->completed number.
-        */
-       c = rcu_cbs_completed(rsp, rnp);
-       for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--)
-               if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
-                   !ULONG_CMP_GE(rdp->nxtcompleted[i], c))
-                       break;
+       bool ret = false;
 
-       /*
-        * If there are no sublist for unassigned callbacks, leave.
-        * At the same time, advance "i" one sublist, so that "i" will
-        * index into the sublist where all the remaining callbacks should
-        * be grouped into.
-        */
-       if (++i >= RCU_NEXT_TAIL)
+       /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+       if (!rcu_segcblist_pend_cbs(&rdp->cblist))
                return false;
 
        /*
-        * Assign all subsequent callbacks' ->completed number to the next
-        * full grace period and group them all in the sublist initially
-        * indexed by "i".
+        * Callbacks are often registered with incomplete grace-period
+        * information.  Something about the fact that getting exact
+        * information requires acquiring a global lock...  RCU therefore
+        * makes a conservative estimate of the grace period number at which
+        * a given callback will become ready to invoke.        The following
+        * code checks this estimate and improves it when possible, thus
+        * accelerating callback invocation to an earlier grace-period
+        * number.
         */
-       for (; i <= RCU_NEXT_TAIL; i++) {
-               rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
-               rdp->nxtcompleted[i] = c;
-       }
-       /* Record any needed additional grace periods. */
-       ret = rcu_start_future_gp(rnp, rdp, NULL);
+       if (rcu_segcblist_accelerate(&rdp->cblist, rcu_cbs_completed(rsp, rnp)))
+               ret = rcu_start_future_gp(rnp, rdp, NULL);
 
        /* Trace depending on how much we were able to accelerate. */
-       if (!*rdp->nxttail[RCU_WAIT_TAIL])
+       if (rcu_segcblist_restempty(&rdp->cblist, RCU_WAIT_TAIL))
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
        else
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccReadyCB"));
@@ -1923,32 +1883,15 @@ static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                            struct rcu_data *rdp)
 {
-       int i, j;
-
-       /* If the CPU has no callbacks, nothing to do. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
+       /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+       if (!rcu_segcblist_pend_cbs(&rdp->cblist))
                return false;
 
        /*
         * Find all callbacks whose ->completed numbers indicate that they
         * are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
         */
-       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
-               if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i]))
-                       break;
-               rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
-       }
-       /* Clean up any sublist tail pointers that were misordered above. */
-       for (j = RCU_WAIT_TAIL; j < i; j++)
-               rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL];
-
-       /* Copy down callbacks to fill in empty sublists. */
-       for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
-               if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
-                       break;
-               rdp->nxttail[j] = rdp->nxttail[i];
-               rdp->nxtcompleted[j] = rdp->nxtcompleted[i];
-       }
+       rcu_segcblist_advance(&rdp->cblist, rnp->completed);
 
        /* Classify any remaining callbacks. */
        return rcu_accelerate_cbs(rsp, rnp, rdp);
@@ -1993,7 +1936,7 @@ static bool __note_gp_changes(struct rcu_state *rsp, struct rcu_node *rnp,
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("cpustart"));
                need_gp = !!(rnp->qsmask & rdp->grpmask);
                rdp->cpu_no_qs.b.norm = need_gp;
-               rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_qs_ctr);
+               rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_dynticks.rcu_qs_ctr);
                rdp->core_needs_qs = need_gp;
                zero_cpu_stall_ticks(rdp);
                WRITE_ONCE(rdp->gpwrap, false);
@@ -2591,7 +2534,7 @@ rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
                 * within the current grace period.
                 */
                rdp->cpu_no_qs.b.norm = true;   /* need qs for new gp. */
-               rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_qs_ctr);
+               rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_dynticks.rcu_qs_ctr);
                raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
                return;
        }
@@ -2665,13 +2608,8 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
         * because _rcu_barrier() excludes CPU-hotplug operations, so it
         * cannot be running now.  Thus no memory barrier is required.
         */
-       if (rdp->nxtlist != NULL) {
-               rsp->qlen_lazy += rdp->qlen_lazy;
-               rsp->qlen += rdp->qlen;
-               rdp->n_cbs_orphaned += rdp->qlen;
-               rdp->qlen_lazy = 0;
-               WRITE_ONCE(rdp->qlen, 0);
-       }
+       rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
+       rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
 
        /*
         * Next, move those callbacks still needing a grace period to
@@ -2679,31 +2617,18 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
         * Some of the callbacks might have gone partway through a grace
         * period, but that is too bad.  They get to start over because we
         * cannot assume that grace periods are synchronized across CPUs.
-        * We don't bother updating the ->nxttail[] array yet, instead
-        * we just reset the whole thing later on.
         */
-       if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
-               *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
-               rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
-               *rdp->nxttail[RCU_DONE_TAIL] = NULL;
-       }
+       rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
 
        /*
         * Then move the ready-to-invoke callbacks to the orphanage,
         * where some other CPU will pick them up.  These will not be
         * required to pass though another grace period: They are done.
         */
-       if (rdp->nxtlist != NULL) {
-               *rsp->orphan_donetail = rdp->nxtlist;
-               rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
-       }
+       rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
 
-       /*
-        * Finally, initialize the rcu_data structure's list to empty and
-        * disallow further callbacks on this CPU.
-        */
-       init_callback_list(rdp);
-       rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+       /* Finally, disallow further callbacks on this CPU.  */
+       rcu_segcblist_disable(&rdp->cblist);
 }
 
 /*
@@ -2712,7 +2637,6 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 {
-       int i;
        struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
 
        /* No-CBs CPUs are handled specially. */
@@ -2721,13 +2645,10 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
                return;
 
        /* Do the accounting first. */
-       rdp->qlen_lazy += rsp->qlen_lazy;
-       rdp->qlen += rsp->qlen;
-       rdp->n_cbs_adopted += rsp->qlen;
-       if (rsp->qlen_lazy != rsp->qlen)
+       rdp->n_cbs_adopted += rsp->orphan_done.len;
+       if (rsp->orphan_done.len_lazy != rsp->orphan_done.len)
                rcu_idle_count_callbacks_posted();
-       rsp->qlen_lazy = 0;
-       rsp->qlen = 0;
+       rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
 
        /*
         * We do not need a memory barrier here because the only way we
@@ -2735,24 +2656,13 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
         * we are the task doing the rcu_barrier().
         */
 
-       /* First adopt the ready-to-invoke callbacks. */
-       if (rsp->orphan_donelist != NULL) {
-               *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
-               *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
-               for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
-                       if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-                               rdp->nxttail[i] = rsp->orphan_donetail;
-               rsp->orphan_donelist = NULL;
-               rsp->orphan_donetail = &rsp->orphan_donelist;
-       }
-
-       /* And then adopt the callbacks that still need a grace period. */
-       if (rsp->orphan_nxtlist != NULL) {
-               *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
-               rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
-               rsp->orphan_nxtlist = NULL;
-               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
-       }
+       /* First adopt the ready-to-invoke callbacks, then the done ones. */
+       rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
+       WARN_ON_ONCE(rsp->orphan_done.head);
+       rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+       WARN_ON_ONCE(rsp->orphan_pend.head);
+       WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
+                    !rcu_segcblist_n_cbs(&rdp->cblist));
 }
 
 /*
@@ -2760,14 +2670,14 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
  */
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
 {
-       RCU_TRACE(unsigned long mask);
-       RCU_TRACE(struct rcu_data *rdp = this_cpu_ptr(rsp->rda));
-       RCU_TRACE(struct rcu_node *rnp = rdp->mynode);
+       RCU_TRACE(unsigned long mask;)
+       RCU_TRACE(struct rcu_data *rdp = this_cpu_ptr(rsp->rda);)
+       RCU_TRACE(struct rcu_node *rnp = rdp->mynode;)
 
        if (!IS_ENABLED(CONFIG_HOTPLUG_CPU))
                return;
 
-       RCU_TRACE(mask = rdp->grpmask);
+       RCU_TRACE(mask = rdp->grpmask;)
        trace_rcu_grace_period(rsp->name,
                               rnp->gpnum + 1 - !!(rnp->qsmask & mask),
                               TPS("cpuofl"));
@@ -2840,9 +2750,11 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
        rcu_adopt_orphan_cbs(rsp, flags);
        raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
 
-       WARN_ONCE(rdp->qlen != 0 || rdp->nxtlist != NULL,
-                 "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, nxtlist=%p\n",
-                 cpu, rdp->qlen, rdp->nxtlist);
+       WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
+                 !rcu_segcblist_empty(&rdp->cblist),
+                 "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
+                 cpu, rcu_segcblist_n_cbs(&rdp->cblist),
+                 rcu_segcblist_first_cb(&rdp->cblist));
 }
 
 /*
@@ -2852,14 +2764,17 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
        unsigned long flags;
-       struct rcu_head *next, *list, **tail;
-       long bl, count, count_lazy;
-       int i;
+       struct rcu_head *rhp;
+       struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
+       long bl, count;
 
        /* If no callbacks are ready, just return. */
-       if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
-               trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);
-               trace_rcu_batch_end(rsp->name, 0, !!READ_ONCE(rdp->nxtlist),
+       if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
+               trace_rcu_batch_start(rsp->name,
+                                     rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                     rcu_segcblist_n_cbs(&rdp->cblist), 0);
+               trace_rcu_batch_end(rsp->name, 0,
+                                   !rcu_segcblist_empty(&rdp->cblist),
                                    need_resched(), is_idle_task(current),
                                    rcu_is_callbacks_kthread());
                return;
@@ -2867,73 +2782,61 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 
        /*
         * Extract the list of ready callbacks, disabling to prevent
-        * races with call_rcu() from interrupt handlers.
+        * races with call_rcu() from interrupt handlers.  Leave the
+        * callback counts, as rcu_barrier() needs to be conservative.
         */
        local_irq_save(flags);
        WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
        bl = rdp->blimit;
-       trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);
-       list = rdp->nxtlist;
-       rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-       *rdp->nxttail[RCU_DONE_TAIL] = NULL;
-       tail = rdp->nxttail[RCU_DONE_TAIL];
-       for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)
-               if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-                       rdp->nxttail[i] = &rdp->nxtlist;
+       trace_rcu_batch_start(rsp->name, rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                             rcu_segcblist_n_cbs(&rdp->cblist), bl);
+       rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
        local_irq_restore(flags);
 
        /* Invoke callbacks. */
-       count = count_lazy = 0;
-       while (list) {
-               next = list->next;
-               prefetch(next);
-               debug_rcu_head_unqueue(list);
-               if (__rcu_reclaim(rsp->name, list))
-                       count_lazy++;
-               list = next;
-               /* Stop only if limit reached and CPU has something to do. */
-               if (++count >= bl &&
+       rhp = rcu_cblist_dequeue(&rcl);
+       for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+               debug_rcu_head_unqueue(rhp);
+               if (__rcu_reclaim(rsp->name, rhp))
+                       rcu_cblist_dequeued_lazy(&rcl);
+               /*
+                * Stop only if limit reached and CPU has something to do.
+                * Note: The rcl structure counts down from zero.
+                */
+               if (-rcl.len >= bl &&
                    (need_resched() ||
                     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
                        break;
        }
 
        local_irq_save(flags);
-       trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),
-                           is_idle_task(current),
-                           rcu_is_callbacks_kthread());
-
-       /* Update count, and requeue any remaining callbacks. */
-       if (list != NULL) {
-               *tail = rdp->nxtlist;
-               rdp->nxtlist = list;
-               for (i = 0; i < RCU_NEXT_SIZE; i++)
-                       if (&rdp->nxtlist == rdp->nxttail[i])
-                               rdp->nxttail[i] = tail;
-                       else
-                               break;
-       }
+       count = -rcl.len;
+       trace_rcu_batch_end(rsp->name, count, !!rcl.head, need_resched(),
+                           is_idle_task(current), rcu_is_callbacks_kthread());
+
+       /* Update counts and requeue any remaining callbacks. */
+       rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
        smp_mb(); /* List handling before counting for rcu_barrier(). */
-       rdp->qlen_lazy -= count_lazy;
-       WRITE_ONCE(rdp->qlen, rdp->qlen - count);
        rdp->n_cbs_invoked += count;
+       rcu_segcblist_insert_count(&rdp->cblist, &rcl);
 
        /* Reinstate batch limit if we have worked down the excess. */
-       if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
+       count = rcu_segcblist_n_cbs(&rdp->cblist);
+       if (rdp->blimit == LONG_MAX && count <= qlowmark)
                rdp->blimit = blimit;
 
        /* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */
-       if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {
+       if (count == 0 && rdp->qlen_last_fqs_check != 0) {
                rdp->qlen_last_fqs_check = 0;
                rdp->n_force_qs_snap = rsp->n_force_qs;
-       } else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)
-               rdp->qlen_last_fqs_check = rdp->qlen;
-       WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));
+       } else if (count < rdp->qlen_last_fqs_check - qhimark)
+               rdp->qlen_last_fqs_check = count;
+       WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
 
        local_irq_restore(flags);
 
        /* Re-invoke RCU core processing if there are callbacks remaining. */
-       if (cpu_has_callbacks_ready_to_invoke(rdp))
+       if (rcu_segcblist_ready_cbs(&rdp->cblist))
                invoke_rcu_core();
 }
 
@@ -3099,7 +3002,7 @@ __rcu_process_callbacks(struct rcu_state *rsp)
        bool needwake;
        struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
 
-       WARN_ON_ONCE(rdp->beenonline == 0);
+       WARN_ON_ONCE(!rdp->beenonline);
 
        /* Update RCU state based on any recent quiescent states. */
        rcu_check_quiescent_state(rsp, rdp);
@@ -3117,7 +3020,7 @@ __rcu_process_callbacks(struct rcu_state *rsp)
        }
 
        /* If there are callbacks ready, invoke them. */
-       if (cpu_has_callbacks_ready_to_invoke(rdp))
+       if (rcu_segcblist_ready_cbs(&rdp->cblist))
                invoke_rcu_callbacks(rsp, rdp);
 
        /* Do any needed deferred wakeups of rcuo kthreads. */
@@ -3189,7 +3092,8 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
         * invoking force_quiescent_state() if the newly enqueued callback
         * is the only one waiting for a grace period to complete.
         */
-       if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) {
+       if (unlikely(rcu_segcblist_n_cbs(&rdp->cblist) >
+                    rdp->qlen_last_fqs_check + qhimark)) {
 
                /* Are we ignoring a completed grace period? */
                note_gp_changes(rsp, rdp);
@@ -3207,10 +3111,10 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
                        /* Give the grace period a kick. */
                        rdp->blimit = LONG_MAX;
                        if (rsp->n_force_qs == rdp->n_force_qs_snap &&
-                           *rdp->nxttail[RCU_DONE_TAIL] != head)
+                           rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
                                force_quiescent_state(rsp);
                        rdp->n_force_qs_snap = rsp->n_force_qs;
-                       rdp->qlen_last_fqs_check = rdp->qlen;
+                       rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
                }
        }
 }
@@ -3250,7 +3154,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+       if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
                int offline;
 
                if (cpu != -1)
@@ -3269,23 +3173,21 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
                 */
                BUG_ON(cpu != -1);
                WARN_ON_ONCE(!rcu_is_watching());
-               if (!likely(rdp->nxtlist))
-                       init_default_callback_list(rdp);
+               if (rcu_segcblist_empty(&rdp->cblist))
+                       rcu_segcblist_init(&rdp->cblist);
        }
-       WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
-       if (lazy)
-               rdp->qlen_lazy++;
-       else
+       rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
+       if (!lazy)
                rcu_idle_count_callbacks_posted();
-       smp_mb();  /* Count before adding callback for rcu_barrier(). */
-       *rdp->nxttail[RCU_NEXT_TAIL] = head;
-       rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
        if (__is_kfree_rcu_offset((unsigned long)func))
                trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
-                                        rdp->qlen_lazy, rdp->qlen);
+                                        rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                        rcu_segcblist_n_cbs(&rdp->cblist));
        else
-               trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);
+               trace_rcu_callback(rsp->name, head,
+                                  rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                  rcu_segcblist_n_cbs(&rdp->cblist));
 
        /* Go handle any RCU core processing required. */
        __call_rcu_core(rsp, rdp, head, flags);
@@ -3531,41 +3433,6 @@ void cond_synchronize_sched(unsigned long oldstate)
 }
 EXPORT_SYMBOL_GPL(cond_synchronize_sched);
 
-/* Adjust sequence number for start of update-side operation. */
-static void rcu_seq_start(unsigned long *sp)
-{
-       WRITE_ONCE(*sp, *sp + 1);
-       smp_mb(); /* Ensure update-side operation after counter increment. */
-       WARN_ON_ONCE(!(*sp & 0x1));
-}
-
-/* Adjust sequence number for end of update-side operation. */
-static void rcu_seq_end(unsigned long *sp)
-{
-       smp_mb(); /* Ensure update-side operation before counter increment. */
-       WRITE_ONCE(*sp, *sp + 1);
-       WARN_ON_ONCE(*sp & 0x1);
-}
-
-/* Take a snapshot of the update side's sequence number. */
-static unsigned long rcu_seq_snap(unsigned long *sp)
-{
-       unsigned long s;
-
-       s = (READ_ONCE(*sp) + 3) & ~0x1;
-       smp_mb(); /* Above access must not bleed into critical section. */
-       return s;
-}
-
-/*
- * Given a snapshot from rcu_seq_snap(), determine whether or not a
- * full update-side operation has occurred.
- */
-static bool rcu_seq_done(unsigned long *sp, unsigned long s)
-{
-       return ULONG_CMP_GE(READ_ONCE(*sp), s);
-}
-
 /*
  * Check to see if there is any immediate RCU-related work to be done
  * by the current CPU, for the specified type of RCU, returning 1 if so.
@@ -3589,7 +3456,7 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
        /* Is the RCU core waiting for a quiescent state from this CPU? */
        if (rcu_scheduler_fully_active &&
            rdp->core_needs_qs && rdp->cpu_no_qs.b.norm &&
-           rdp->rcu_qs_ctr_snap == __this_cpu_read(rcu_qs_ctr)) {
+           rdp->rcu_qs_ctr_snap == __this_cpu_read(rcu_dynticks.rcu_qs_ctr)) {
                rdp->n_rp_core_needs_qs++;
        } else if (rdp->core_needs_qs && !rdp->cpu_no_qs.b.norm) {
                rdp->n_rp_report_qs++;
@@ -3597,7 +3464,7 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
        }
 
        /* Does this CPU have callbacks ready to invoke? */
-       if (cpu_has_callbacks_ready_to_invoke(rdp)) {
+       if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
                rdp->n_rp_cb_ready++;
                return 1;
        }
@@ -3661,10 +3528,10 @@ static bool __maybe_unused rcu_cpu_has_callbacks(bool *all_lazy)
 
        for_each_rcu_flavor(rsp) {
                rdp = this_cpu_ptr(rsp->rda);
-               if (!rdp->nxtlist)
+               if (rcu_segcblist_empty(&rdp->cblist))
                        continue;
                hc = true;
-               if (rdp->qlen != rdp->qlen_lazy || !all_lazy) {
+               if (rcu_segcblist_n_nonlazy_cbs(&rdp->cblist) || !all_lazy) {
                        al = false;
                        break;
                }
@@ -3773,7 +3640,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
                                __call_rcu(&rdp->barrier_head,
                                           rcu_barrier_callback, rsp, cpu, 0);
                        }
-               } else if (READ_ONCE(rdp->qlen)) {
+               } else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
                        _rcu_barrier_trace(rsp, "OnlineQ", cpu,
                                           rsp->barrier_sequence);
                        smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -3882,8 +3749,9 @@ rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
        rdp->qlen_last_fqs_check = 0;
        rdp->n_force_qs_snap = rsp->n_force_qs;
        rdp->blimit = blimit;
-       if (!rdp->nxtlist)
-               init_callback_list(rdp);  /* Re-enable callbacks on this CPU. */
+       if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
+           !init_nocb_callback_list(rdp))
+               rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
        rdp->dynticks->dynticks_nesting = DYNTICK_TASK_EXIT_IDLE;
        rcu_sysidle_init_percpu_data(rdp->dynticks);
        rcu_dynticks_eqs_online();
@@ -3902,12 +3770,16 @@ rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
        rdp->gpnum = rnp->completed; /* Make CPU later note any new GP. */
        rdp->completed = rnp->completed;
        rdp->cpu_no_qs.b.norm = true;
-       rdp->rcu_qs_ctr_snap = per_cpu(rcu_qs_ctr, cpu);
+       rdp->rcu_qs_ctr_snap = per_cpu(rcu_dynticks.rcu_qs_ctr, cpu);
        rdp->core_needs_qs = false;
        trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("cpuonl"));
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
 
+/*
+ * Invoked early in the CPU-online process, when pretty much all
+ * services are available.  The incoming CPU is not present.
+ */
 int rcutree_prepare_cpu(unsigned int cpu)
 {
        struct rcu_state *rsp;
@@ -3921,6 +3793,9 @@ int rcutree_prepare_cpu(unsigned int cpu)
        return 0;
 }
 
+/*
+ * Update RCU priority boot kthread affinity for CPU-hotplug changes.
+ */
 static void rcutree_affinity_setting(unsigned int cpu, int outgoing)
 {
        struct rcu_data *rdp = per_cpu_ptr(rcu_state_p->rda, cpu);
@@ -3928,20 +3803,34 @@ static void rcutree_affinity_setting(unsigned int cpu, int outgoing)
        rcu_boost_kthread_setaffinity(rdp->mynode, outgoing);
 }
 
+/*
+ * Near the end of the CPU-online process.  Pretty much all services
+ * enabled, and the CPU is now very much alive.
+ */
 int rcutree_online_cpu(unsigned int cpu)
 {
        sync_sched_exp_online_cleanup(cpu);
        rcutree_affinity_setting(cpu, -1);
+       if (IS_ENABLED(CONFIG_TREE_SRCU))
+               srcu_online_cpu(cpu);
        return 0;
 }
 
+/*
+ * Near the beginning of the process.  The CPU is still very much alive
+ * with pretty much all services enabled.
+ */
 int rcutree_offline_cpu(unsigned int cpu)
 {
        rcutree_affinity_setting(cpu, cpu);
+       if (IS_ENABLED(CONFIG_TREE_SRCU))
+               srcu_offline_cpu(cpu);
        return 0;
 }
 
-
+/*
+ * Near the end of the offline process.  We do only tracing here.
+ */
 int rcutree_dying_cpu(unsigned int cpu)
 {
        struct rcu_state *rsp;
@@ -3951,6 +3840,9 @@ int rcutree_dying_cpu(unsigned int cpu)
        return 0;
 }
 
+/*
+ * The outgoing CPU is gone and we are running elsewhere.
+ */
 int rcutree_dead_cpu(unsigned int cpu)
 {
        struct rcu_state *rsp;
@@ -3968,6 +3860,10 @@ int rcutree_dead_cpu(unsigned int cpu)
  * incoming CPUs are not allowed to use RCU read-side critical sections
  * until this function is called.  Failing to observe this restriction
  * will result in lockdep splats.
+ *
+ * Note that this function is special in that it is invoked directly
+ * from the incoming CPU rather than from the cpuhp_step mechanism.
+ * This is because this function must be invoked at a precise location.
  */
 void rcu_cpu_starting(unsigned int cpu)
 {
@@ -3990,9 +3886,6 @@ void rcu_cpu_starting(unsigned int cpu)
 
 #ifdef CONFIG_HOTPLUG_CPU
 /*
- * The CPU is exiting the idle loop into the arch_cpu_idle_dead()
- * function.  We now remove it from the rcu_node tree's ->qsmaskinit
- * bit masks.
  * The CPU is exiting the idle loop into the arch_cpu_idle_dead()
  * function.  We now remove it from the rcu_node tree's ->qsmaskinit
  * bit masks.
@@ -4011,6 +3904,14 @@ static void rcu_cleanup_dying_idle_cpu(int cpu, struct rcu_state *rsp)
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
 
+/*
+ * The outgoing function has no further need of RCU, so remove it from
+ * the list of CPUs that RCU must track.
+ *
+ * Note that this function is special in that it is invoked directly
+ * from the outgoing CPU rather than from the cpuhp_step mechanism.
+ * This is because this function must be invoked at a precise location.
+ */
 void rcu_report_dead(unsigned int cpu)
 {
        struct rcu_state *rsp;
@@ -4025,6 +3926,10 @@ void rcu_report_dead(unsigned int cpu)
 }
 #endif
 
+/*
+ * On non-huge systems, use expedited RCU grace periods to make suspend
+ * and hibernation run faster.
+ */
 static int rcu_pm_notify(struct notifier_block *self,
                         unsigned long action, void *hcpu)
 {
@@ -4095,7 +4000,7 @@ early_initcall(rcu_spawn_gp_kthread);
  * task is booting the system, and such primitives are no-ops).  After this
  * function is called, any synchronous grace-period primitives are run as
  * expedited, with the requesting task driving the grace period forward.
- * A later core_initcall() rcu_exp_runtime_mode() will switch to full
+ * A later core_initcall() rcu_set_runtime_mode() will switch to full
  * runtime RCU functionality.
  */
 void rcu_scheduler_starting(void)
@@ -4107,31 +4012,6 @@ void rcu_scheduler_starting(void)
        rcu_test_sync_prims();
 }
 
-/*
- * Compute the per-level fanout, either using the exact fanout specified
- * or balancing the tree, depending on the rcu_fanout_exact boot parameter.
- */
-static void __init rcu_init_levelspread(int *levelspread, const int *levelcnt)
-{
-       int i;
-
-       if (rcu_fanout_exact) {
-               levelspread[rcu_num_lvls - 1] = rcu_fanout_leaf;
-               for (i = rcu_num_lvls - 2; i >= 0; i--)
-                       levelspread[i] = RCU_FANOUT;
-       } else {
-               int ccur;
-               int cprv;
-
-               cprv = nr_cpu_ids;
-               for (i = rcu_num_lvls - 1; i >= 0; i--) {
-                       ccur = levelcnt[i];
-                       levelspread[i] = (cprv + ccur - 1) / ccur;
-                       cprv = ccur;
-               }
-       }
-}
-
 /*
  * Helper function for rcu_init() that initializes one rcu_state structure.
  */
@@ -4141,9 +4021,7 @@ static void __init rcu_init_one(struct rcu_state *rsp)
        static const char * const fqs[] = RCU_FQS_NAME_INIT;
        static struct lock_class_key rcu_node_class[RCU_NUM_LVLS];
        static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
-       static u8 fl_mask = 0x1;
 
-       int levelcnt[RCU_NUM_LVLS];             /* # nodes in each level. */
        int levelspread[RCU_NUM_LVLS];          /* kids/node in each level. */
        int cpustride = 1;
        int i;
@@ -4158,20 +4036,16 @@ static void __init rcu_init_one(struct rcu_state *rsp)
 
        /* Initialize the level-tracking arrays. */
 
-       for (i = 0; i < rcu_num_lvls; i++)
-               levelcnt[i] = num_rcu_lvl[i];
        for (i = 1; i < rcu_num_lvls; i++)
-               rsp->level[i] = rsp->level[i - 1] + levelcnt[i - 1];
-       rcu_init_levelspread(levelspread, levelcnt);
-       rsp->flavor_mask = fl_mask;
-       fl_mask <<= 1;
+               rsp->level[i] = rsp->level[i - 1] + num_rcu_lvl[i - 1];
+       rcu_init_levelspread(levelspread, num_rcu_lvl);
 
        /* Initialize the elements themselves, starting from the leaves. */
 
        for (i = rcu_num_lvls - 1; i >= 0; i--) {
                cpustride *= levelspread[i];
                rnp = rsp->level[i];
-               for (j = 0; j < levelcnt[i]; j++, rnp++) {
+               for (j = 0; j < num_rcu_lvl[i]; j++, rnp++) {
                        raw_spin_lock_init(&ACCESS_PRIVATE(rnp, lock));
                        lockdep_set_class_and_name(&ACCESS_PRIVATE(rnp, lock),
                                                   &rcu_node_class[i], buf[i]);
@@ -4344,6 +4218,8 @@ void __init rcu_init(void)
        for_each_online_cpu(cpu) {
                rcutree_prepare_cpu(cpu);
                rcu_cpu_starting(cpu);
+               if (IS_ENABLED(CONFIG_TREE_SRCU))
+                       srcu_online_cpu(cpu);
        }
 }