On 5/3/20 6:24 AM, Roman Penyaev wrote: > On 2020-05-02 00:09, Jason Baron wrote: >> On 5/1/20 5:02 PM, Roman Penyaev wrote: >>> Hi Jason, >>> >>> That is indeed a nice catch. >>> Seems we need smp_rmb() pair between list_empty_careful(&rp->rdllist) and >>> READ_ONCE(ep->ovflist) for ep_events_available(), do we? >>> >> >> Hi Roman, >> >> Good point, even if we order those reads its still racy, since the >> read of the ready list could come after its been cleared and the >> read of the overflow could again come after its been cleared. > > You mean the second chunk? True. Sigh. > >> So I'm afraid we might need instead something like this to make >> sure they are read together: > > No, impossible, I can't believe in that :) We can't give up. > > All we need is to keep a mark, that ep->rdllist is not empty, > even we've just spliced it. ep_poll_callback() always takes > the ->ovflist path, if ->ovflist is not EP_UNACTIVE_PTR, but > ep_events_available() does not need to observe ->ovflist at > all, just a ->rdllist. > > Take a look at that, do I miss something? : > > diff --git a/fs/eventpoll.c b/fs/eventpoll.c > index aba03ee749f8..a8770f9a917e 100644 > --- a/fs/eventpoll.c > +++ b/fs/eventpoll.c > @@ -376,8 +376,7 @@ static void ep_nested_calls_init(struct nested_calls *ncalls) > */ > static inline int ep_events_available(struct eventpoll *ep) > { > - return !list_empty_careful(&ep->rdllist) || > - READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR; > + return !list_empty_careful(&ep->rdllist); > } > > #ifdef CONFIG_NET_RX_BUSY_POLL > @@ -683,7 +682,8 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, > { > __poll_t res; > struct epitem *epi, *nepi; > - LIST_HEAD(txlist); > + LIST_HEAD(rdllist); > + LIST_HEAD(ovflist); > > lockdep_assert_irqs_enabled(); > > @@ -704,14 +704,22 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, > * in a lockless way. > */ > write_lock_irq(&ep->lock); > - list_splice_init(&ep->rdllist, &txlist); > + /* > + * We do not call list_splice_init() because for lockless > + * ep_events_available() ->rdllist is still "not empty". > + * Otherwise the feature that there is something left in > + * the list can be lost which causes missed wakeup. > + */ > + list_splice(&ep->rdllist, &rdllist); > + /* > + * If ->rdllist was empty we should pretend it was not, > + * because after the unlock ->ovflist comes into play, > + * which is invisible for lockless ep_events_available(). > + */ > + ep->rdllist.next = LIST_POISON1; > WRITE_ONCE(ep->ovflist, NULL); > write_unlock_irq(&ep->lock); > > /* > * Now call the callback function. > */ > - res = (*sproc)(ep, &txlist, priv); > + res = (*sproc)(ep, &rdllist, priv); > > write_lock_irq(&ep->lock); > /* > @@ -724,7 +732,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, > /* > * We need to check if the item is already in the list. > * During the "sproc" callback execution time, items are > - * queued into ->ovflist but the "txlist" might already > + * queued into ->ovflist but the "rdllist" might already > * contain them, and the list_splice() below takes care of them. > */ > if (!ep_is_linked(epi)) { > @@ -732,7 +740,7 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, > * ->ovflist is LIFO, so we have to reverse it in order > * to keep in FIFO. > */ > - list_add(&epi->rdllink, &ep->rdllist); > + list_add(&epi->rdllink, &ovflist); > ep_pm_stay_awake(epi); > } > } > @@ -743,10 +751,11 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, > */ > WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR); > > - /* > - * Quickly re-inject items left on "txlist". > - */ > - list_splice(&txlist, &ep->rdllist); > + /* Events from ->ovflist happened later, thus splice to the tail */ > + list_splice_tail(&ovflist, &rdllist); > + /* Just replace list */ > + list_replace(&rdllist, &ep->rdllist); > + > __pm_relax(ep->ws); > write_unlock_irq(&ep->lock); > > @@ -1763,13 +1772,13 @@ static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head > * Trigger mode, we need to insert back inside > * the ready list, so that the next call to > * epoll_wait() will check again the events > - * availability. At this point, no one can insert > - * into ep->rdllist besides us. The epoll_ctl() > - * callers are locked out by > - * ep_scan_ready_list() holding "mtx" and the > - * poll callback will queue them in ep->ovflist. > + * availability. What we do here is simply > + * return the epi to the same position where > + * it was, the ep_scan_ready_list() will > + * re-inject the leftovers to the ->rdllist > + * under the proper lock. > */ > - list_add_tail(&epi->rdllink, &ep->rdllist); > + list_add_tail(&epi->rdllink, &tmp->rdllink); > ep_pm_stay_awake(epi); > } > } > > > -- > Roman > Hi Roman, I think this misses an important case - the initial ep_poll_callback() may queue to the overflow list. In this case ep_poll has no visibility into the event since its only checking ep->rdllist. Here's a different approach using seqcount that avoids expanding ep->lock region. Thanks, -Jason --- a/fs/eventpoll.c +++ b/fs/eventpoll.c @@ -221,6 +221,8 @@ struct eventpoll { struct list_head visited_list_link; int visited; + seqcount_t seq; + #ifdef CONFIG_NET_RX_BUSY_POLL /* used to track busy poll napi_id */ unsigned int napi_id; @@ -704,8 +706,10 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, * in a lockless way. */ write_lock_irq(&ep->lock); + write_seqcount_begin(&ep->seq); list_splice_init(&ep->rdllist, &txlist); WRITE_ONCE(ep->ovflist, NULL); + write_seqcount_end(&ep->seq); write_unlock_irq(&ep->lock); /* @@ -741,12 +745,14 @@ static __poll_t ep_scan_ready_list(struct eventpoll *ep, * releasing the lock, events will be queued in the normal way inside * ep->rdllist. */ + write_seqcount_begin(&ep->seq); WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR); /* * Quickly re-inject items left on "txlist". */ list_splice(&txlist, &ep->rdllist); + write_seqcount_end(&ep->seq); __pm_relax(ep->ws); write_unlock_irq(&ep->lock); @@ -1025,6 +1031,7 @@ static int ep_alloc(struct eventpoll **pep) ep->rbr = RB_ROOT_CACHED; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; + seqcount_init(&ep->seq); *pep = ep; @@ -1824,6 +1831,7 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, u64 slack = 0; wait_queue_entry_t wait; ktime_t expires, *to = NULL; + unsigned int seq; lockdep_assert_irqs_enabled(); @@ -1900,7 +1908,10 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, break; } - eavail = ep_events_available(ep); + do { + seq = read_seqcount_begin(&ep->seq); + eavail = ep_events_available(ep); + } while (read_seqcount_retry(&ep->seq, seq)); if (eavail) break; if (signal_pending(current)) {