On Mon 06-11-17 10:47:08, Davidlohr Bueso wrote: > Instead of the current O(N) implementation, at the cost > of adding an atomic counter, we can convert the call to > an atomic_read(). The counter only serves for accounting > empty to non-empty transitions, and vice versa; therefore > only modified twice for each of the lists during the > lifetime of the dlock (while used). > > In addition, to be able to unaccount a list_del(), we > add a dlist pointer to each head, thus minimizing the > overall memory footprint. > > Signed-off-by: Davidlohr Bueso <dbueso@xxxxxxx> Looks good to me. You can add: Reviewed-by: Jan Kara <jack@xxxxxxx> Honza Kara > --- > Changes from v3: > - s/waiters/used_lists, more doc around the counter. > - fixed racy scenario when the list empty/non-empty > condition changes after taking the lock. > - sprinkled unlikely() around all checks, these are > only corner cases in the lifetime of the lock. > > include/linux/dlock-list.h | 8 ++++++ > lib/dlock-list.c | 67 +++++++++++++++++++++++++++++++++++++++------- > 2 files changed, 65 insertions(+), 10 deletions(-) > > diff --git a/include/linux/dlock-list.h b/include/linux/dlock-list.h > index c00c7f92ada4..e18690a9bba6 100644 > --- a/include/linux/dlock-list.h > +++ b/include/linux/dlock-list.h > @@ -32,10 +32,18 @@ > struct dlock_list_head { > struct list_head list; > spinlock_t lock; > + struct dlock_list_heads *dlist; > } ____cacheline_aligned_in_smp; > > +/* > + * This is the main dlist data structure, with the array of heads > + * and a counter that atomically tracks if any of the lists are > + * being used. That is, empty to non-empty (and vice versa) > + * head->list transitions. > + */ > struct dlock_list_heads { > struct dlock_list_head *heads; > + atomic_t used_lists; > }; > > /* > diff --git a/lib/dlock-list.c b/lib/dlock-list.c > index a4ddecc01b12..a9c855d492b8 100644 > --- a/lib/dlock-list.c > +++ b/lib/dlock-list.c > @@ -122,8 +122,11 @@ int __alloc_dlock_list_heads(struct dlock_list_heads *dlist, > > INIT_LIST_HEAD(&head->list); > head->lock = __SPIN_LOCK_UNLOCKED(&head->lock); > + head->dlist = dlist; > lockdep_set_class(&head->lock, key); > } > + > + atomic_set(&dlist->used_lists, 0); > return 0; > } > EXPORT_SYMBOL(__alloc_dlock_list_heads); > @@ -139,29 +142,36 @@ void free_dlock_list_heads(struct dlock_list_heads *dlist) > { > kfree(dlist->heads); > dlist->heads = NULL; > + atomic_set(&dlist->used_lists, 0); > } > EXPORT_SYMBOL(free_dlock_list_heads); > > /** > * dlock_lists_empty - Check if all the dlock lists are empty > * @dlist: Pointer to the dlock_list_heads structure > - * Return: true if list is empty, false otherwise. > * > - * This can be a pretty expensive function call. If this function is required > - * in a performance critical path, we may have to maintain a global count > - * of the list entries in the global dlock_list_heads structure instead. > + * Return: true if all dlock lists are empty, false otherwise. > */ > bool dlock_lists_empty(struct dlock_list_heads *dlist) > { > - int idx; > - > /* Shouldn't be called before nr_dlock_lists is initialized */ > WARN_ON_ONCE(!nr_dlock_lists); > > - for (idx = 0; idx < nr_dlock_lists; idx++) > - if (!list_empty(&dlist->heads[idx].list)) > - return false; > - return true; > + /* > + * Serialize dlist->used_lists such that a 0->1 transition is not > + * missed by another thread checking if any of the dlock lists are > + * used. > + * > + * CPU0 CPU1 > + * dlock_list_add() dlock_lists_empty() > + * [S] atomic_inc(used_lists); > + * smp_mb__after_atomic(); > + * smp_mb__before_atomic(); > + * [L] atomic_read(used_lists) > + * list_add() > + */ > + smp_mb__before_atomic(); > + return !atomic_read(&dlist->used_lists); > } > EXPORT_SYMBOL(dlock_lists_empty); > > @@ -177,11 +187,39 @@ void dlock_lists_add(struct dlock_list_node *node, > struct dlock_list_heads *dlist) > { > struct dlock_list_head *head = &dlist->heads[this_cpu_read(cpu2idx)]; > + bool list_empty_before_lock = false; > + > + /* > + * Optimistically bump the used_lists counter _before_ taking > + * the head->lock such that we don't miss a thread adding itself > + * to a list while spinning for the lock. > + * > + * Then, after taking the lock, recheck if the empty to non-empty > + * transition changed and (un)account for ourselves, accordingly. > + * Note that all these scenarios are corner cases, and not the > + * common scenario, where the lists are actually populated most > + * of the time. > + */ > + if (unlikely(list_empty_careful(&head->list))) { > + list_empty_before_lock = true; > + atomic_inc(&dlist->used_lists); > + smp_mb__after_atomic(); > + } > > /* > * There is no need to disable preemption > */ > spin_lock(&head->lock); > + > + if (unlikely(!list_empty_before_lock && list_empty(&head->list))) { > + atomic_inc(&dlist->used_lists); > + smp_mb__after_atomic(); > + } > + if (unlikely(list_empty_before_lock && !list_empty(&head->list))) { > + atomic_dec(&dlist->used_lists); > + smp_mb__after_atomic(); > + } > + > node->head = head; > list_add(&node->list, &head->list); > spin_unlock(&head->lock); > @@ -212,6 +250,15 @@ void dlock_lists_del(struct dlock_list_node *node) > spin_lock(&head->lock); > if (likely(head == node->head)) { > list_del_init(&node->list); > + > + if (unlikely(list_empty(&head->list))) { > + struct dlock_list_heads *dlist; > + dlist = node->head->dlist; > + > + atomic_dec(&dlist->used_lists); > + smp_mb__after_atomic(); > + } > + > node->head = NULL; > retry = false; > } else { > -- > 2.13.6 > -- Jan Kara <jack@xxxxxxxx> SUSE Labs, CR