On Mon, Aug 24, 2020 at 10:48:40PM -0400, Joel Fernandes (Google) wrote: > Currently, rcu_do_batch() depends on the unsegmented callback list's len field > to know how many CBs are executed. This fields counts down from 0 as CBs are > dequeued. It is possible that all CBs could not be run because of reaching > limits in which case the remaining unexecuted callbacks are requeued in the > CPU's segcblist. Again, please mention why the ->cblist.len fields are preserved, namely due to interactions with rcu_barrier(). > The number of callbacks that were not requeued are then the negative count (how > many CBs were run) stored in the rcl->len which has been counting down on every > dequeue. This negative count is then added to the per-cpu segmented callback > list's to correct its count. > > Such a design works against future efforts to track the length of each segment > of the segmented callback list. The reason is because > rcu_segcblist_extract_done_cbs() will have to store the length of the callback > list in rcl->len to make rcu_segcblist_merge() work. > > Also, the design of counting down from 0 is confusing and error-prone IMHO. > > This commit therefore explicitly counts have many callbacks were executed in > rcu_do_batch() itself, and uses that to update the per-CPU segcb list's len > field, without relying on the negativity of rcl->len. This last, as noted in response to 1/4, should allow rcu_segcblist_merge() to avoid carrying the callback count separately. > Signed-off-by: Joel Fernandes (Google) <joel@xxxxxxxxxxxxxxxxx> > --- > kernel/rcu/rcu_segcblist.c | 2 +- > kernel/rcu/rcu_segcblist.h | 1 + > kernel/rcu/tree.c | 9 ++++----- > 3 files changed, 6 insertions(+), 6 deletions(-) > > diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c > index b70d4154433c..076337ae2e50 100644 > --- a/kernel/rcu/rcu_segcblist.c > +++ b/kernel/rcu/rcu_segcblist.c > @@ -95,7 +95,7 @@ static void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v) > * This increase is fully ordered with respect to the callers accesses > * both before and after. > */ > -static void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v) > +void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v) > { > #ifdef CONFIG_RCU_NOCB_CPU > smp_mb__before_atomic(); /* Up to the caller! */ > diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h > index 5c293afc07b8..b90725f81d77 100644 > --- a/kernel/rcu/rcu_segcblist.h > +++ b/kernel/rcu/rcu_segcblist.h > @@ -76,6 +76,7 @@ static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg) > } > > void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp); > +void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v); > void rcu_segcblist_init(struct rcu_segcblist *rsclp); > void rcu_segcblist_disable(struct rcu_segcblist *rsclp); > void rcu_segcblist_offload(struct rcu_segcblist *rsclp); > diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c > index 548404489c04..51348144a4ea 100644 > --- a/kernel/rcu/tree.c > +++ b/kernel/rcu/tree.c > @@ -2419,7 +2419,7 @@ static void rcu_do_batch(struct rcu_data *rdp) > rcu_segcblist_is_offloaded(&rdp->cblist); > struct rcu_head *rhp; > struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); > - long bl, count; > + long bl, count = 0; > long pending, tlimit = 0; > > /* If no callbacks are ready, just return. */ > @@ -2464,6 +2464,7 @@ static void rcu_do_batch(struct rcu_data *rdp) > for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) { > rcu_callback_t f; > > + count++; > debug_rcu_head_unqueue(rhp); > > rcu_lock_acquire(&rcu_callback_map); > @@ -2477,9 +2478,8 @@ static void rcu_do_batch(struct rcu_data *rdp) > > /* > * Stop only if limit reached and CPU has something to do. > - * Note: The rcl structure counts down from zero. > */ > - if (-rcl.len >= bl && !offloaded && > + if (count >= bl && !offloaded && > (need_resched() || > (!is_idle_task(current) && !rcu_is_callbacks_kthread()))) > break; Please also replace "-rcl.len" in this if statement with "count": /* only call local_clock() every 32 callbacks */ if (likely((-rcl.len & 31) || local_clock() < tlimit)) continue; /* Exceeded the time limit, so leave. */ break; Yeah, it does work either way, but having "count" consistently throughout is easier on the people reading the code. And I haven't heard many complaints about the code being too easy to read... Thanx, Paul > @@ -2502,7 +2502,6 @@ static void rcu_do_batch(struct rcu_data *rdp) > > local_irq_save(flags); > rcu_nocb_lock(rdp); > - count = -rcl.len; > rdp->n_cbs_invoked += count; > trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(), > is_idle_task(current), rcu_is_callbacks_kthread()); > @@ -2510,7 +2509,7 @@ static void rcu_do_batch(struct rcu_data *rdp) > /* Update counts and requeue any remaining callbacks. */ > rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl); > smp_mb(); /* List handling before counting for rcu_barrier(). */ > - rcu_segcblist_insert_count(&rdp->cblist, &rcl); > + rcu_segcblist_add_len(&rdp->cblist, -count); > > /* Reinstate batch limit if we have worked down the excess. */ > count = rcu_segcblist_n_cbs(&rdp->cblist); > -- > 2.28.0.297.g1956fa8f8d-goog >