* Eric Wong (normalperson@xxxxxxxx) wrote: > I'm posting this lightly tested version since I may not be able to do > more testing/benchmarking until the weekend. > > Davide's totalmess is still running, so that's probably a good sign :) > http://www.xmailserver.org/totalmess.c > I will look for more ways to break this (and benchmark when I stop > finding ways to break it). No real applications tested, yet, and > I think I can improve upon this, too. > > This depends on a couple of patches sitting in -mm and a few > more I've posted on LKML, for convenience everything is here: > > http://yhbt.net/epoll-wfcqueue-v3.8.2-20130314.mbox > (should apply cleanly to 3.9-rc* since there's no epoll changes in that) > > --------------------------8<------------------------------- > From 139f0d4528c3fabc6a54e47be73ba9990b42cdd8 Mon Sep 17 00:00:00 2001 > From: Eric Wong <normalperson@xxxxxxxx> > Date: Thu, 14 Mar 2013 02:37:12 +0000 > Subject: [PATCH] epoll: avoid spinlock contention with wfcqueue > > This is not a proper commit, I've barely tested this. > > Replace the spinlock-protected linked list ready list with wfcqueue. > > There is still a per-epitem atomic variable which may still spin. The > likelyhood of contention is very low since it's not shared by the entire > structure, the state is private to each epitem. > > Things changed/removed: > > * ep->ovflist - the atomic, per-epitem state field prevents > event loss during ep_send_events. > > * ep_scan_ready_list - not enough generic code between users > anymore to warrant this. ep_poll_readyevents_proc (used for > poll) is read-only, ep_send_events (used for epoll_wait) > dequeues. > > * ep->lock renamed to ep->wqlock; this only protects waitqueues now. > (will experiment with making it trylock in some places, next) > > * EPOLL_CTL_DEL/close() on a ready file will not immediately release > epitem memory, epoll_wait() must be called since there's no way to > delete a ready item from wfcqueue in O(1) time. In practice this > should not be a problem, any valid app using epoll must call > epoll_wait occasionally. Unfreed epitems still count against > max_user_watches to protect against local DoS. This should be the > only possibly-noticeable change (in case there's an app that blindly > adds/deletes things from the rbtree but never calls epoll_wait) > > Barely-tested-by: Eric Wong <normalperson@xxxxxxxx> > Cc: Davide Libenzi <davidel@xxxxxxxxxxxxxxx> > Cc: Al Viro <viro@xxxxxxxxxxxxxxxxxx> > Cc: Andrew Morton <akpm@xxxxxxxxxxxxxxxxxxxx> > Cc: Mathieu Desnoyers <mathieu.desnoyers@xxxxxxxxxxxx> > --- > fs/eventpoll.c | 631 ++++++++++++++++++++++++++------------------------------- > 1 file changed, 292 insertions(+), 339 deletions(-) > > diff --git a/fs/eventpoll.c b/fs/eventpoll.c > index a4e4ad7..6159b85 100644 > --- a/fs/eventpoll.c > +++ b/fs/eventpoll.c > @@ -40,6 +40,7 @@ > #include <linux/atomic.h> > #include <linux/proc_fs.h> > #include <linux/seq_file.h> > +#include <linux/wfcqueue.h> > > /* > * LOCKING: > @@ -47,15 +48,13 @@ > * > * 1) epmutex (mutex) > * 2) ep->mtx (mutex) > - * 3) ep->lock (spinlock) > + * 3) ep->wqlock (spinlock) > * > - * The acquire order is the one listed above, from 1 to 3. > - * We need a spinlock (ep->lock) because we manipulate objects > - * from inside the poll callback, that might be triggered from > - * a wake_up() that in turn might be called from IRQ context. > - * So we can't sleep inside the poll callback and hence we need > - * a spinlock. During the event transfer loop (from kernel to > - * user space) we could end up sleeping due a copy_to_user(), so > + * The acquire order is the one listed above, from 1 to 2. > + * > + * We only have a spinlock (ep->wqlock) to manipulate the waitqueues. > + * During the event transfer loop (from kernel to user space) > + * we could end up sleeping due a copy_to_user(), so > * we need a lock that will allow us to sleep. This lock is a > * mutex (ep->mtx). It is acquired during the event transfer loop, > * during epoll_ctl(EPOLL_CTL_DEL) and during eventpoll_release_file(). > @@ -82,8 +81,8 @@ > * of epoll file descriptors, we use the current recursion depth as > * the lockdep subkey. > * It is possible to drop the "ep->mtx" and to use the global > - * mutex "epmutex" (together with "ep->lock") to have it working, > - * but having "ep->mtx" will make the interface more scalable. > + * mutex "epmutex" to have it working, but having "ep->mtx" will > + * make the interface more scalable. > * Events that require holding "epmutex" are very rare, while for > * normal operations the epoll private "ep->mtx" will guarantee > * a better scalability. > @@ -126,6 +125,23 @@ struct nested_calls { > }; > > /* > + * epitem state transitions callers > + * --------------------------------------------------------------------------- > + * EP_STATE_IDLE -> EP_STATE_READY ep_poll_callback, ep_enqueue_process > + * EP_STATE_READY -> EP_STATE_DEQUEUE ep_send_events > + * EP_STATE_DEQUEUE -> EP_STATE_IDLE ep_send_events > + * EP_STATE_IDLE -> kmem_cache_free ep_remove > + * EP_STATE_READY -> EP_STATE_ZOMBIE ep_remove > + * EP_STATE_ZOMBIE -> kmem_cache_free ep_send_events > + */ > +enum epoll_item_state { > + EP_STATE_ZOMBIE = -1, > + EP_STATE_IDLE = 0, > + EP_STATE_READY = 1, > + EP_STATE_DEQUEUE = 2 > +}; > + > +/* > * Each file descriptor added to the eventpoll interface will > * have an entry of this type linked to the "rbr" RB tree. > * Avoid increasing the size of this struct, there can be many thousands > @@ -136,19 +152,13 @@ struct epitem { > struct rb_node rbn; > > /* List header used to link this structure to the eventpoll ready list */ > - struct list_head rdllink; > - > - /* > - * Works together "struct eventpoll"->ovflist in keeping the > - * single linked chain of items. > - */ > - struct epitem *next; > + struct wfcq_node rdllink; > > /* The file descriptor information this item refers to */ > struct epoll_filefd ffd; > > - /* Number of active wait queue attached to poll operations */ > - int nwait; > + /* state of this item, see epoll_item_state */ > + atomic_t state; > > /* List containing poll wait queues */ > struct list_head pwqlist; > @@ -172,8 +182,8 @@ struct epitem { > * interface. > */ > struct eventpoll { > - /* Protect the access to this structure */ > - spinlock_t lock; > + /* head of the ready list */ > + struct wfcq_head rdlhead; > > /* > * This mutex is used to ensure that files are not removed > @@ -183,25 +193,18 @@ struct eventpoll { > */ > struct mutex mtx; > > + /* Protect the access to wait queues */ > + spinlock_t wqlock; > + > /* Wait queue used by sys_epoll_wait() */ > wait_queue_head_t wq; > > /* Wait queue used by file->poll() */ > wait_queue_head_t poll_wait; > > - /* List of ready file descriptors */ > - struct list_head rdllist; > - > /* RB tree root used to store monitored fd structs */ > struct rb_root rbr; > > - /* > - * This is a single linked list that chains all the "struct epitem" that > - * happened while transferring ready events to userspace w/out > - * holding ->lock. > - */ > - struct epitem *ovflist; > - > /* wakeup_source used when ep_scan_ready_list is running */ > struct wakeup_source *ws; > > @@ -213,6 +216,9 @@ struct eventpoll { > /* used to optimize loop detection check */ > int visited; > struct list_head visited_list_link; > + > + /* hopefully a different cache line than rdlhead */ > + struct wfcq_tail rdltail; You might want to try ____cacheline_aligned_in_smp for the rdltail field (see linux/cache.h). > }; > > /* Wait structure used by the poll hooks */ > @@ -347,6 +353,12 @@ static inline struct epitem *ep_item_from_epqueue(poll_table *p) > return container_of(p, struct ep_pqueue, pt)->epi; > } > > +/* Get the "struct epitem" from a wfcq node */ > +static inline struct epitem *ep_item_from_node(struct wfcq_node *p) > +{ > + return container_of(p, struct epitem, rdllink); > +} > + > /* Tells if the epoll_ctl(2) operation needs an event copy from userspace */ > static inline int ep_op_has_event(int op) > { > @@ -368,9 +380,9 @@ static void ep_nested_calls_init(struct nested_calls *ncalls) > * Returns: Returns a value different than zero if ready events are available, > * or zero otherwise. > */ > -static inline int ep_events_available(struct eventpoll *ep) > +static inline bool ep_events_available(struct eventpoll *ep) > { > - return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR; > + return !wfcq_empty(&ep->rdlhead, &ep->rdltail); > } > > /** > @@ -507,6 +519,27 @@ static void ep_poll_safewake(wait_queue_head_t *wq) > put_cpu(); > } > > +static void ep_notify_waiters(struct eventpoll *ep) > +{ > + int pwake; > + unsigned long flags; > + > + spin_lock_irqsave(&ep->wqlock, flags); > + > + /* Notify epoll_wait tasks */ > + if (waitqueue_active(&ep->wq)) > + wake_up_locked(&ep->wq); > + > + /* Notify the ->poll() wait list */ > + pwake = waitqueue_active(&ep->poll_wait); > + > + spin_unlock_irqrestore(&ep->wqlock, flags); > + > + /* We have to call this outside the lock */ > + if (pwake) > + ep_poll_safewake(&ep->poll_wait); > +} > + > static void ep_remove_wait_queue(struct eppoll_entry *pwq) > { > wait_queue_head_t *whead; > @@ -570,104 +603,17 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi) > rcu_read_unlock(); > } > > -/** > - * ep_scan_ready_list - Scans the ready list in a way that makes possible for > - * the scan code, to call f_op->poll(). Also allows for > - * O(NumReady) performance. > - * > - * @ep: Pointer to the epoll private data structure. > - * @sproc: Pointer to the scan callback. > - * @priv: Private opaque data passed to the @sproc callback. > - * @depth: The current depth of recursive f_op->poll calls. > - * > - * Returns: The same integer error code returned by the @sproc callback. > +/* > + * We mark epitems as zombies if they are deleted while in the ready list, > + * so the next invocation of ep_send_events can get rid of them. We cannot > + * delete abitrary elements from the wfcq ready list in O(1) time. > */ > -static int ep_scan_ready_list(struct eventpoll *ep, > - int (*sproc)(struct eventpoll *, > - struct list_head *, void *), > - void *priv, > - int depth) > +static noinline void ep_reap_zombie(struct eventpoll *ep, struct epitem *epi) > { > - int error, pwake = 0; > - unsigned long flags; > - struct epitem *epi, *nepi; > - LIST_HEAD(txlist); > - > - /* > - * We need to lock this because we could be hit by > - * eventpoll_release_file() and epoll_ctl(). > - */ > - mutex_lock_nested(&ep->mtx, depth); > - > - /* > - * Steal the ready list, and re-init the original one to the > - * empty list. Also, set ep->ovflist to NULL so that events > - * happening while looping w/out locks, are not lost. We cannot > - * have the poll callback to queue directly on ep->rdllist, > - * because we want the "sproc" callback to be able to do it > - * in a lockless way. > - */ > - spin_lock_irqsave(&ep->lock, flags); > - list_splice_init(&ep->rdllist, &txlist); > - ep->ovflist = NULL; > - spin_unlock_irqrestore(&ep->lock, flags); > - > - /* > - * Now call the callback function. > - */ > - error = (*sproc)(ep, &txlist, priv); > - > - spin_lock_irqsave(&ep->lock, flags); > - /* > - * During the time we spent inside the "sproc" callback, some > - * other events might have been queued by the poll callback. > - * We re-insert them inside the main ready-list here. > - */ > - for (nepi = ep->ovflist; (epi = nepi) != NULL; > - nepi = epi->next, epi->next = EP_UNACTIVE_PTR) { > - /* > - * We need to check if the item is already in the list. > - * During the "sproc" callback execution time, items are > - * queued into ->ovflist but the "txlist" might already > - * contain them, and the list_splice() below takes care of them. > - */ > - if (!ep_is_linked(&epi->rdllink)) { > - list_add_tail(&epi->rdllink, &ep->rdllist); > - ep_pm_stay_awake(epi); > - } > - } > - /* > - * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after > - * releasing the lock, events will be queued in the normal way inside > - * ep->rdllist. > - */ > - ep->ovflist = EP_UNACTIVE_PTR; > - > - /* > - * Quickly re-inject items left on "txlist". > - */ > - list_splice(&txlist, &ep->rdllist); > - __pm_relax(ep->ws); > - > - if (!list_empty(&ep->rdllist)) { > - /* > - * Wake up (if active) both the eventpoll wait list and > - * the ->poll() wait list (delayed after we release the lock). > - */ > - if (waitqueue_active(&ep->wq)) > - wake_up_locked(&ep->wq); > - if (waitqueue_active(&ep->poll_wait)) > - pwake++; > - } > - spin_unlock_irqrestore(&ep->lock, flags); > - > - mutex_unlock(&ep->mtx); > - > - /* We have to call this outside the lock */ > - if (pwake) > - ep_poll_safewake(&ep->poll_wait); > - > - return error; > + __wfcq_dequeue(&ep->rdlhead, &ep->rdltail, 0); > + wakeup_source_unregister(ep_wakeup_source(epi)); > + kmem_cache_free(epi_cache, epi); > + atomic_long_dec(&ep->user->epoll_watches); > } > > /* > @@ -676,17 +622,9 @@ static int ep_scan_ready_list(struct eventpoll *ep, > */ > static int ep_remove(struct eventpoll *ep, struct epitem *epi) > { > - unsigned long flags; > struct file *file = epi->ffd.file; > > - /* > - * Removes poll wait queue hooks. We _have_ to do this without holding > - * the "ep->lock" otherwise a deadlock might occur. This because of the > - * sequence of the lock acquisition. Here we do "ep->lock" then the wait > - * queue head lock when unregistering the wait queue. The wakeup callback > - * will run by holding the wait queue head lock and will call our callback > - * that will try to get "ep->lock". > - */ > + /* Removes poll wait queue hooks. */ > ep_unregister_pollwait(ep, epi); > > /* Remove the current item from the list of epoll hooks */ > @@ -697,19 +635,56 @@ static int ep_remove(struct eventpoll *ep, struct epitem *epi) > > rb_erase(&epi->rbn, &ep->rbr); > > - spin_lock_irqsave(&ep->lock, flags); > - if (ep_is_linked(&epi->rdllink)) > - list_del_init(&epi->rdllink); > - spin_unlock_irqrestore(&ep->lock, flags); > + /* rely on ep_send_events to cleanup if we got enqueued */ > + if (atomic_read(&epi->state) == EP_STATE_READY) { > + atomic_set(&epi->state, EP_STATE_ZOMBIE); > + } else { > + /* At this point it is safe to free the eventpoll item */ > + wakeup_source_unregister(ep_wakeup_source(epi)); > + kmem_cache_free(epi_cache, epi); > + atomic_long_dec(&ep->user->epoll_watches); > + } > > - wakeup_source_unregister(ep_wakeup_source(epi)); > + return 0; > +} > > - /* At this point it is safe to free the eventpoll item */ > - kmem_cache_free(epi_cache, epi); > +/* > + * Returns the first ready epitem, but does not dequeue it. > + * Returns NULL if there is nothing ready. > + */ > +static inline struct epitem *ep_item_ready_first(struct eventpoll *ep) > +{ > + struct wfcq_node *p; > > - atomic_long_dec(&ep->user->epoll_watches); > + p = __wfcq_first_nonblocking(&ep->rdlhead, &ep->rdltail); > > - return 0; > + return p ? ep_item_from_node(p) : NULL; > +} > + > +/* > + * We use the current tail as a sentinel value for iterating, > + * we do not want to end up looping over the same epitem as we > + * dequeue while ep_poll_callback may be enqueueing. > + */ > +static struct epitem *ep_item_ready_last(struct eventpoll *ep) > +{ > + struct wfcq_node *p = CMM_LOAD_SHARED(ep->rdltail.p); > + > + return ep_item_from_node(p); > +} > + > +/* > + * Only used for read-only iteration of the ready list (for our own poll()) > + * returns NULL upon reaching the end of the list. > + */ > +static struct epitem *ep_item_ready_next(struct eventpoll *ep, > + struct epitem *epi) > +{ > + struct wfcq_node *p; > + > + p = __wfcq_next_nonblocking(&ep->rdlhead, &ep->rdltail, &epi->rdllink); > + > + return p ? ep_item_from_node(p) : NULL; > } > > static void ep_free(struct eventpoll *ep) > @@ -744,19 +719,21 @@ static void ep_free(struct eventpoll *ep) > * Walks through the whole tree by freeing each "struct epitem". At this > * point we are sure no poll callbacks will be lingering around, and also by > * holding "epmutex" we can be sure that no file cleanup code will hit > - * us during this operation. So we can avoid the lock on "ep->lock". > + * us during this operation. > * We do not need to lock ep->mtx, either, we only do it to prevent > * a lockdep warning. > */ > mutex_lock(&ep->mtx); > while ((rbp = rb_first(&ep->rbr)) != NULL) { > epi = rb_entry(rbp, struct epitem, rbn); > + atomic_set(&epi->state, EP_STATE_IDLE); /* prevent zombies */ > ep_remove(ep, epi); > } > mutex_unlock(&ep->mtx); > > mutex_unlock(&epmutex); > mutex_destroy(&ep->mtx); > + mutex_destroy(&ep->rdlhead.lock); > free_uid(ep->user); > wakeup_source_unregister(ep->ws); > kfree(ep); > @@ -779,34 +756,49 @@ static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt) > return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events; > } > > -static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head, > - void *priv) > +/* used our own poll() implementation */ > +static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests) > { > - struct epitem *epi, *tmp; > + struct eventpoll *ep = priv; > + struct epitem *epi; > + int ret = 0; > poll_table pt; > > init_poll_funcptr(&pt, NULL); > > - list_for_each_entry_safe(epi, tmp, head, rdllink) { > - if (ep_item_poll(epi, &pt)) > - return POLLIN | POLLRDNORM; > - else { > + /* > + * this iteration should be safe because: > + * 1) ep->mtx is locked, preventing dequeue and epi invalidation > + * 2) stops at the tail when we started iteration > + */ > + mutex_lock_nested(&ep->mtx, call_nests + 1); > + for (epi = ep_item_ready_first(ep); > + epi; > + epi = ep_item_ready_next(ep, epi)) { > + > + /* zombie item, let ep_send_events cleanup */ > + if (atomic_read(&epi->state) == EP_STATE_ZOMBIE) > + continue; > + > + pt._key = epi->event.events; > + > + if (ep_item_poll(epi, &pt)) { > + ret = POLLIN | POLLRDNORM; > + break; > + } else { > /* > * Item has been dropped into the ready list by the poll > * callback, but it's not actually ready, as far as > - * caller requested events goes. We can remove it here. > + * caller requested events goes. > + * We cannot remove it here, ep_send_events will > + * recheck and remove if possible > */ > __pm_relax(ep_wakeup_source(epi)); > - list_del_init(&epi->rdllink); > } > } > + mutex_unlock(&ep->mtx); > > - return 0; > -} > - > -static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests) > -{ > - return ep_scan_ready_list(priv, ep_read_events_proc, NULL, call_nests + 1); > + return ret; > } > > static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait) > @@ -913,13 +905,12 @@ static int ep_alloc(struct eventpoll **pep) > if (unlikely(!ep)) > goto free_uid; > > - spin_lock_init(&ep->lock); > + spin_lock_init(&ep->wqlock); > mutex_init(&ep->mtx); > init_waitqueue_head(&ep->wq); > init_waitqueue_head(&ep->poll_wait); > - INIT_LIST_HEAD(&ep->rdllist); > + wfcq_init(&ep->rdlhead, &ep->rdltail); > ep->rbr = RB_ROOT; > - ep->ovflist = EP_UNACTIVE_PTR; > ep->user = user; > > *pep = ep; > @@ -960,6 +951,30 @@ static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd) > return epir; > } > > +/* returns true if successfully marked ready, false if it was already ready */ > +static inline bool ep_mark_ready(struct epitem *epi) > +{ > + int s; > + > + /* > + * We only want to go from idle -> ready > + * If another task is in the middle of dequeueing (in ep_send_events), > + * we busy wait until the dequeue finishes and sets EP_STATE_IDLE > + */ > + for (; ; cpu_relax()) { > + s = atomic_cmpxchg(&epi->state, EP_STATE_IDLE, EP_STATE_READY); > + if (s != EP_STATE_DEQUEUE) > + break; > + } > + return (s == EP_STATE_IDLE); > +} > + > +static inline void ep_enqueue(struct eventpoll *ep, struct epitem *epi) > +{ > + wfcq_node_init(&epi->rdllink); > + wfcq_enqueue(&ep->rdlhead, &ep->rdltail, &epi->rdllink); > +} > + > /* > * This is the callback that is passed to the wait queue wakeup > * mechanism. It is called by the stored file descriptors when they > @@ -967,8 +982,6 @@ static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd) > */ > static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) > { > - int pwake = 0; > - unsigned long flags; > struct epitem *epi = ep_item_from_wait(wait); > struct eventpoll *ep = epi->ep; > > @@ -983,7 +996,8 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *k > list_del_init(&wait->task_list); > } > > - spin_lock_irqsave(&ep->lock, flags); > + /* pairs with smp_mb in ep_modify */ > + smp_rmb(); > > /* > * If the event mask does not contain any poll(2) event, we consider the > @@ -992,7 +1006,7 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *k > * until the next EPOLL_CTL_MOD will be issued. > */ > if (!(epi->event.events & ~EP_PRIVATE_BITS)) > - goto out_unlock; > + return 1; > > /* > * Check the events coming with the callback. At this stage, not > @@ -1001,51 +1015,14 @@ static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *k > * test for "key" != NULL before the event match test. > */ > if (key && !((unsigned long) key & epi->event.events)) > - goto out_unlock; > + return 1; > > - /* > - * If we are transferring events to userspace, we can hold no locks > - * (because we're accessing user memory, and because of linux f_op->poll() > - * semantics). All the events that happen during that period of time are > - * chained in ep->ovflist and requeued later on. > - */ > - if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { > - if (epi->next == EP_UNACTIVE_PTR) { > - epi->next = ep->ovflist; > - ep->ovflist = epi; > - if (epi->ws) { > - /* > - * Activate ep->ws since epi->ws may get > - * deactivated at any time. > - */ > - __pm_stay_awake(ep->ws); > - } > - > - } > - goto out_unlock; > + if (ep_mark_ready(epi)) { > + ep_enqueue(ep, epi); > + ep_pm_stay_awake_rcu(epi); > } > > - /* If this file is already in the ready list we exit soon */ > - if (!ep_is_linked(&epi->rdllink)) { > - list_add_tail(&epi->rdllink, &ep->rdllist); > - ep_pm_stay_awake(epi); > - } > - > - /* > - * Wake up ( if active ) both the eventpoll wait list and the ->poll() > - * wait list. > - */ > - if (waitqueue_active(&ep->wq)) > - wake_up_locked(&ep->wq); > - if (waitqueue_active(&ep->poll_wait)) > - pwake++; > - > -out_unlock: > - spin_unlock_irqrestore(&ep->lock, flags); > - > - /* We have to call this outside the lock */ > - if (pwake) > - ep_poll_safewake(&ep->poll_wait); > + ep_notify_waiters(ep); > > return 1; > } > @@ -1060,16 +1037,19 @@ static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, > struct epitem *epi = ep_item_from_epqueue(pt); > struct eppoll_entry *pwq; > > - if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { > + if (atomic_read(&epi->state) == EP_STATE_ZOMBIE) > + return; > + > + pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL); > + if (pwq) { > init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); > RCU_INIT_POINTER(pwq->whead, whead); > pwq->base = epi; > add_wait_queue(whead, &pwq->wait); > list_add_tail(&pwq->llink, &epi->pwqlist); > - epi->nwait++; > } else { > /* We have to signal that an error occurred */ > - epi->nwait = -1; > + atomic_set(&epi->state, EP_STATE_ZOMBIE); > } > } > > @@ -1224,14 +1204,25 @@ static noinline void ep_destroy_wakeup_source(struct epitem *epi) > wakeup_source_unregister(ws); > } > > +/* enqueue an epitem in process context via epoll_ctl */ > +static bool ep_enqueue_process(struct eventpoll *ep, struct epitem *epi) > +{ > + if (!ep_mark_ready(epi)) > + return false; > + > + ep_enqueue(ep, epi); > + ep_pm_stay_awake(epi); > + ep_notify_waiters(ep); > + return true; > +} > + > /* > * Must be called with "mtx" held. > */ > static int ep_insert(struct eventpoll *ep, struct epoll_event *event, > struct file *tfile, int fd) > { > - int error, revents, pwake = 0; > - unsigned long flags; > + int error, revents; > long user_watches; > struct epitem *epi; > struct ep_pqueue epq; > @@ -1243,14 +1234,12 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, > return -ENOMEM; > > /* Item initialization follow here ... */ > - INIT_LIST_HEAD(&epi->rdllink); > INIT_LIST_HEAD(&epi->fllink); > INIT_LIST_HEAD(&epi->pwqlist); > + atomic_set(&epi->state, EP_STATE_IDLE); > epi->ep = ep; > ep_set_ffd(&epi->ffd, tfile, fd); > epi->event = *event; > - epi->nwait = 0; > - epi->next = EP_UNACTIVE_PTR; > if (epi->event.events & EPOLLWAKEUP) { > error = ep_create_wakeup_source(epi); > if (error) > @@ -1278,7 +1267,7 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, > * high memory pressure. > */ > error = -ENOMEM; > - if (epi->nwait < 0) > + if (atomic_read(&epi->state) == EP_STATE_ZOMBIE) > goto error_unregister; > > /* Add the current item to the list of active epoll hook for this file */ > @@ -1297,28 +1286,11 @@ static int ep_insert(struct eventpoll *ep, struct epoll_event *event, > if (reverse_path_check()) > goto error_remove_epi; > > - /* We have to drop the new item inside our item list to keep track of it */ > - spin_lock_irqsave(&ep->lock, flags); > - > - /* If the file is already "ready" we drop it inside the ready list */ > - if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { > - list_add_tail(&epi->rdllink, &ep->rdllist); > - ep_pm_stay_awake(epi); > - > - /* Notify waiting tasks that events are available */ > - if (waitqueue_active(&ep->wq)) > - wake_up_locked(&ep->wq); > - if (waitqueue_active(&ep->poll_wait)) > - pwake++; > - } > - > - spin_unlock_irqrestore(&ep->lock, flags); > - > atomic_long_inc(&ep->user->epoll_watches); > > - /* We have to call this outside the lock */ > - if (pwake) > - ep_poll_safewake(&ep->poll_wait); > + /* If the file is already "ready" we drop it inside the ready list */ > + if (revents & event->events) > + ep_enqueue_process(ep, epi); > > return 0; > > @@ -1331,18 +1303,20 @@ error_remove_epi: > rb_erase(&epi->rbn, &ep->rbr); > > error_unregister: > + /* ep->state cannot be set to EP_STATE_READY after this: */ > ep_unregister_pollwait(ep, epi); > > /* > + * Rely on ep_send_events to free epi if we got enqueued. > * We need to do this because an event could have been arrived on some > - * allocated wait queue. Note that we don't care about the ep->ovflist > - * list, since that is used/cleaned only inside a section bound by "mtx". > - * And ep_insert() is called with "mtx" held. > + * allocated wait queue. > */ > - spin_lock_irqsave(&ep->lock, flags); > - if (ep_is_linked(&epi->rdllink)) > - list_del_init(&epi->rdllink); > - spin_unlock_irqrestore(&ep->lock, flags); > + if (atomic_read(&epi->state) == EP_STATE_READY) { > + /* count the zombie against user watches to prevent OOM DoS */ > + atomic_long_inc(&ep->user->epoll_watches); > + atomic_set(&epi->state, EP_STATE_ZOMBIE); > + return error; > + } > > wakeup_source_unregister(ep_wakeup_source(epi)); > > @@ -1358,7 +1332,6 @@ error_create_wakeup_source: > */ > static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event) > { > - int pwake = 0; > unsigned int revents; > poll_table pt; > > @@ -1384,9 +1357,7 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_even > * 1) Flush epi changes above to other CPUs. This ensures > * we do not miss events from ep_poll_callback if an > * event occurs immediately after we call f_op->poll(). > - * We need this because we did not take ep->lock while > - * changing epi above (but ep_poll_callback does take > - * ep->lock). > + * This pairs with the first smb_rmb in ep_poll_callback > * > * 2) We also need to ensure we do not miss _past_ events > * when calling f_op->poll(). This barrier also > @@ -1408,49 +1379,50 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_even > * If the item is "hot" and it is not registered inside the ready > * list, push it inside. > */ > - if (revents & event->events) { > - spin_lock_irq(&ep->lock); > - if (!ep_is_linked(&epi->rdllink)) { > - list_add_tail(&epi->rdllink, &ep->rdllist); > - ep_pm_stay_awake(epi); > - > - /* Notify waiting tasks that events are available */ > - if (waitqueue_active(&ep->wq)) > - wake_up_locked(&ep->wq); > - if (waitqueue_active(&ep->poll_wait)) > - pwake++; > - } > - spin_unlock_irq(&ep->lock); > - } > - > - /* We have to call this outside the lock */ > - if (pwake) > - ep_poll_safewake(&ep->poll_wait); > + if (revents & event->events) > + ep_enqueue_process(ep, epi); > > return 0; > } > > -static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, > - void *priv) > +/* > + * If this file has been added with Level Trigger mode, > + * we need to insert back inside the ready list, so that the next call to > + * epoll_wait() will check again the events availability. > + */ > +static void ep_level_trigger(struct eventpoll *ep, struct epitem *epi) > +{ > + __wfcq_dequeue_nonblocking(&ep->rdlhead, &ep->rdltail); > + ep_enqueue(ep, epi); > + ep_pm_stay_awake(epi); > +} > + > +static int ep_send_events(struct eventpoll *ep, > + struct epoll_event __user *uevent, int maxevents) > { > - struct ep_send_events_data *esed = priv; > - int eventcnt; > + int eventcnt = 0; > unsigned int revents; > - struct epitem *epi; > - struct epoll_event __user *uevent; > + struct epitem *epi, *end; > struct wakeup_source *ws; > poll_table pt; > > init_poll_funcptr(&pt, NULL); > > /* > - * We can loop without lock because we are passed a task private list. > - * Items cannot vanish during the loop because ep_scan_ready_list() is > - * holding "mtx" during this call. > + * Items cannot vanish during the loop because we are holding > + * "mtx" during this call. > */ > - for (eventcnt = 0, uevent = esed->events; > - !list_empty(head) && eventcnt < esed->maxevents;) { > - epi = list_first_entry(head, struct epitem, rdllink); > + mutex_lock(&ep->mtx); > + for (epi = ep_item_ready_first(ep), end = ep_item_ready_last(ep); > + epi && eventcnt < maxevents; > + epi = epi == end ? NULL : ep_item_ready_first(ep)) { This is where you'll probably want to try iterating on a local list populated with splice, as I suggested in my previous email. Thanks, Mathieu > + int state = atomic_read(&epi->state); > + > + if (state == EP_STATE_ZOMBIE) { > + ep_reap_zombie(ep, epi); > + continue; > + } > + WARN_ON(state != EP_STATE_READY); > > /* > * Activate ep->ws before deactivating epi->ws to prevent > @@ -1459,7 +1431,7 @@ static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, > * > * This could be rearranged to delay the deactivation of epi->ws > * instead, but then epi->ws would temporarily be out of sync > - * with ep_is_linked(). > + * with epi->state. > */ > ws = ep_wakeup_source(epi); > if (ws) { > @@ -1468,8 +1440,6 @@ static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, > __pm_relax(ws); > } > > - list_del_init(&epi->rdllink); > - > revents = ep_item_poll(epi, &pt); > > /* > @@ -1481,46 +1451,37 @@ static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head, > if (revents) { > if (__put_user(revents, &uevent->events) || > __put_user(epi->event.data, &uevent->data)) { > - list_add(&epi->rdllink, head); > - ep_pm_stay_awake(epi); > - return eventcnt ? eventcnt : -EFAULT; > + if (!eventcnt) > + eventcnt = -EFAULT; > + break; > } > + > eventcnt++; > uevent++; > - if (epi->event.events & EPOLLONESHOT) > + > + if (epi->event.events & EPOLLONESHOT) { > epi->event.events &= EP_PRIVATE_BITS; > - else if (!(epi->event.events & EPOLLET)) { > - /* > - * If this file has been added with Level > - * Trigger mode, we need to insert back inside > - * the ready list, so that the next call to > - * epoll_wait() will check again the events > - * availability. At this point, no one can insert > - * into ep->rdllist besides us. The epoll_ctl() > - * callers are locked out by > - * ep_scan_ready_list() holding "mtx" and the > - * poll callback will queue them in ep->ovflist. > - */ > - list_add_tail(&epi->rdllink, &ep->rdllist); > - ep_pm_stay_awake(epi); > + } else if (!(epi->event.events & EPOLLET)) { > + ep_level_trigger(ep, epi); > + continue; > } > } > + > + /* > + * we set EP_STATE_DEQUEUE before dequeueing to prevent losing > + * events during ep_poll_callback that fire before we can set > + * EP_STATE_IDLE ep_poll_callback must spin while we're > + * EP_STATE_DEQUEUE (for the dequeue) > + */ > + atomic_set(&epi->state, EP_STATE_DEQUEUE); > + __wfcq_dequeue_nonblocking(&ep->rdlhead, &ep->rdltail); > + atomic_set(&epi->state, EP_STATE_IDLE); > } > + mutex_unlock(&ep->mtx); > > return eventcnt; > } > > -static int ep_send_events(struct eventpoll *ep, > - struct epoll_event __user *events, int maxevents) > -{ > - struct ep_send_events_data esed; > - > - esed.maxevents = maxevents; > - esed.events = events; > - > - return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0); > -} > - > static inline struct timespec ep_set_mstimeout(long ms) > { > struct timespec now, ts = { > @@ -1552,7 +1513,7 @@ static inline struct timespec ep_set_mstimeout(long ms) > static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, > int maxevents, long timeout) > { > - int res = 0, eavail, timed_out = 0; > + int res = 0; > unsigned long flags; > long slack = 0; > wait_queue_t wait; > @@ -1564,20 +1525,11 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, > slack = select_estimate_accuracy(&end_time); > to = &expires; > *to = timespec_to_ktime(end_time); > - } else if (timeout == 0) { > - /* > - * Avoid the unnecessary trip to the wait queue loop, if the > - * caller specified a non blocking operation. > - */ > - timed_out = 1; > - spin_lock_irqsave(&ep->lock, flags); > - goto check_events; > } > > -fetch_events: > - spin_lock_irqsave(&ep->lock, flags); > + if (!ep_events_available(ep) && timeout) { > +wait_queue_loop: > > - if (!ep_events_available(ep)) { > /* > * We don't have any available event to return to the caller. > * We need to sleep here, and we will be wake up by > @@ -1593,37 +1545,38 @@ fetch_events: > * to TASK_INTERRUPTIBLE before doing the checks. > */ > set_current_state(TASK_INTERRUPTIBLE); > - if (ep_events_available(ep) || timed_out) > + if (ep_events_available(ep) || !timeout) > break; > if (signal_pending(current)) { > res = -EINTR; > break; > } > > - spin_unlock_irqrestore(&ep->lock, flags); > if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) > - timed_out = 1; > - > - spin_lock_irqsave(&ep->lock, flags); > + timeout = 0; > } > + > + spin_lock_irqsave(&ep->wqlock, flags); > __remove_wait_queue(&ep->wq, &wait); > + spin_unlock_irqrestore(&ep->wqlock, flags); > > set_current_state(TASK_RUNNING); > } > -check_events: > - /* Is it worth to try to dig for events ? */ > - eavail = ep_events_available(ep); > - > - spin_unlock_irqrestore(&ep->lock, flags); > > /* > * Try to transfer events to user space. In case we get 0 events and > * there's still timeout left over, we go trying again in search of > * more luck. > */ > - if (!res && eavail && > - !(res = ep_send_events(ep, events, maxevents)) && !timed_out) > - goto fetch_events; > + if (!res) { > + res = ep_send_events(ep, events, maxevents); > + if (!res && timeout) > + goto wait_queue_loop; > + } > + > + /* we may not have transferred everything, wake up others */ > + if (ep_events_available(ep)) > + ep_notify_waiters(ep); > > return res; > } > -- > Eric Wong -- Mathieu Desnoyers EfficiOS Inc. http://www.efficios.com -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html