On 8/6/21 9:19 PM, Jens Axboe wrote: > Currently we only wake the first waiter, even if we have enough entries > posted to satisfy multiple waiters. Improve that situation so that > every waiter knows how much the CQ tail has to advance before they can > be safely woken up. > > With this change, if we have N waiters each asking for 1 event and we get > 4 completions, then we wake up 4 waiters. If we have N waiters asking > for 2 completions and we get 4 completions, then we wake up the first > two. Previously, only the first waiter would've been woken up. > > Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> > > --- > > diff --git a/fs/io_uring.c b/fs/io_uring.c > index bf548af0426c..04df4fa3c75e 100644 > --- a/fs/io_uring.c > +++ b/fs/io_uring.c > @@ -1435,11 +1435,13 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) > > static void io_cqring_ev_posted(struct io_ring_ctx *ctx) > { > - /* see waitqueue_active() comment */ > - smp_mb(); > - > - if (waitqueue_active(&ctx->cq_wait)) > - wake_up(&ctx->cq_wait); > + /* > + * wake_up_all() may seem excessive, but io_wake_function() and > + * io_should_wake() handle the termination of the loop and only > + * wake as many waiters as we need to. > + */ > + if (wq_has_sleeper(&ctx->cq_wait)) > + wake_up_all(&ctx->cq_wait); > if (ctx->sq_data && waitqueue_active(&ctx->sq_data->wait)) > wake_up(&ctx->sq_data->wait); > if (io_should_trigger_evfd(ctx)) > @@ -6968,20 +6970,21 @@ static int io_sq_thread(void *data) > struct io_wait_queue { > struct wait_queue_entry wq; > struct io_ring_ctx *ctx; > - unsigned to_wait; > + unsigned cq_tail; > unsigned nr_timeouts; > }; > > static inline bool io_should_wake(struct io_wait_queue *iowq) > { > struct io_ring_ctx *ctx = iowq->ctx; > + unsigned tail = ctx->cached_cq_tail + atomic_read(&ctx->cq_timeouts); Seems, adding cq_timeouts can be dropped from here and iowq.cq_tail > > /* > * Wake up if we have enough events, or if a timeout occurred since we > * started waiting. For timeouts, we always want to return to userspace, > * regardless of event count. > */ > - return io_cqring_events(ctx) >= iowq->to_wait || Don't we miss smp_rmb() previously provided my io_cqring_events()? > + return tail >= iowq->cq_tail || tails might overflow > atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; > } > > @@ -7045,7 +7048,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, > .entry = LIST_HEAD_INIT(iowq.wq.entry), > }, > .ctx = ctx, > - .to_wait = min_events, > }; > struct io_rings *rings = ctx->rings; > signed long timeout = MAX_SCHEDULE_TIMEOUT; > @@ -7081,6 +7083,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, > } > > iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); > + iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events + > + iowq.nr_timeouts; > trace_io_uring_cqring_wait(ctx, min_events); > do { > /* if we can't even flush overflow, don't wait for more */ > -- Pavel Begunkov