Re: [RFC PATCH 03/21] list: Annotate lockless list primitives with data_race()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Mar 24, 2020 at 5:59 PM Greg KH <greg@xxxxxxxxx> wrote:
> On Tue, Mar 24, 2020 at 05:38:30PM +0100, Jann Horn wrote:
> > On Tue, Mar 24, 2020 at 5:26 PM Greg KH <greg@xxxxxxxxx> wrote:
> > > On Tue, Mar 24, 2020 at 05:20:45PM +0100, Jann Horn wrote:
> > > > On Tue, Mar 24, 2020 at 4:37 PM Will Deacon <will@xxxxxxxxxx> wrote:
> > > > > Some list predicates can be used locklessly even with the non-RCU list
> > > > > implementations, since they effectively boil down to a test against
> > > > > NULL. For example, checking whether or not a list is empty is safe even
> > > > > in the presence of a concurrent, tearing write to the list head pointer.
> > > > > Similarly, checking whether or not an hlist node has been hashed is safe
> > > > > as well.
> > > > >
> > > > > Annotate these lockless list predicates with data_race() and READ_ONCE()
> > > > > so that KCSAN and the compiler are aware of what's going on. The writer
> > > > > side can then avoid having to use WRITE_ONCE() in the non-RCU
> > > > > implementation.
> > > > [...]
> > > > >  static inline int list_empty(const struct list_head *head)
> > > > >  {
> > > > > -       return READ_ONCE(head->next) == head;
> > > > > +       return data_race(READ_ONCE(head->next) == head);
> > > > >  }
> > > > [...]
> > > > >  static inline int hlist_unhashed(const struct hlist_node *h)
> > > > >  {
> > > > > -       return !READ_ONCE(h->pprev);
> > > > > +       return data_race(!READ_ONCE(h->pprev));
> > > > >  }
> > > >
> > > > This is probably valid in practice for hlist_unhashed(), which
> > > > compares with NULL, as long as the most significant byte of all kernel
> > > > pointers is non-zero; but I think list_empty() could realistically
> > > > return false positives in the presence of a concurrent tearing store?
> > > > This could break the following code pattern:
> > > >
> > > > /* optimistic lockless check */
> > > > if (!list_empty(&some_list)) {
> > > >   /* slowpath */
> > > >   mutex_lock(&some_mutex);
> > > >   list_for_each(tmp, &some_list) {
> > > >     ...
> > > >   }
> > > >   mutex_unlock(&some_mutex);
> > > > }
> > > >
> > > > (I'm not sure whether patterns like this appear commonly though.)
> > >
> > >
> > > I would hope not as the list could go "empty" before the lock is
> > > grabbed.  That pattern would be wrong.
> >
> > If the list becomes empty in between, the loop just iterates over
> > nothing, and the effect is no different from what you'd get if you had
> > bailed out before. But sure, you have to be aware that that can
> > happen.
>
> Doh, yeah, so it is safe, crazy, but safe :)

Here's an example of that pattern, I think (which I think is
technically incorrect if what peterz said is accurate?):

/**
 * waitqueue_active -- locklessly test for waiters on the queue
 * @wq_head: the waitqueue to test for waiters
 *
 * returns true if the wait list is not empty
 *
 * NOTE: this function is lockless and requires care, incorrect usage _will_
 * lead to sporadic and non-obvious failure.
 *
 * Use either while holding wait_queue_head::lock or when used for wakeups
 * with an extra smp_mb() like::
 *
 *      CPU0 - waker                    CPU1 - waiter
 *
 *                                      for (;;) {
 *      @cond = true;                     prepare_to_wait(&wq_head,
&wait, state);
 *      smp_mb();                         // smp_mb() from set_current_state()
 *      if (waitqueue_active(wq_head))         if (@cond)
 *        wake_up(wq_head);                      break;
 *                                        schedule();
 *                                      }
 *                                      finish_wait(&wq_head, &wait);
 *
 * Because without the explicit smp_mb() it's possible for the
 * waitqueue_active() load to get hoisted over the @cond store such that we'll
 * observe an empty wait list while the waiter might not observe @cond.
 *
 * Also note that this 'optimization' trades a spin_lock() for an smp_mb(),
 * which (when the lock is uncontended) are of roughly equal cost.
 */
static inline int waitqueue_active(struct wait_queue_head *wq_head)
{
        return !list_empty(&wq_head->head);
}

void signalfd_cleanup(struct sighand_struct *sighand)
{
        wait_queue_head_t *wqh = &sighand->signalfd_wqh;
        /*
         * The lockless check can race with remove_wait_queue() in progress,
         * but in this case its caller should run under rcu_read_lock() and
         * sighand_cachep is SLAB_TYPESAFE_BY_RCU, we can safely return.
         */
        if (likely(!waitqueue_active(wqh)))
                return;

        /* wait_queue_entry_t->func(POLLFREE) should do remove_wait_queue() */
        wake_up_poll(wqh, EPOLLHUP | POLLFREE);
}

and __add_wait_queue() just uses plain list_add(&wq_entry->entry,
&wq_head->head) under a lock.



[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux