On Thu, Nov 23, 2023 at 06:40:01PM +0100, Christian Brauner wrote: > On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote: > > Unclear who's maintaining fs/aio.c these days - who wants to take this? > > -- >8 -- > > > > I've been observing workloads where IPIs due to wakeups in > > aio_complete() are ~15% of total CPU time in the profile. Most of those > > wakeups are unnecessary when completion batching is in use in > > io_getevents(). > > > > This plumbs min_nr through via the wait eventry, so that aio_complete() > > can avoid doing unnecessary wakeups. > > > > Signed-off-by: Kent Overstreet <kent.overstreet@xxxxxxxxx> > > Cc: Benjamin LaHaise <bcrl@xxxxxxxxx > > Cc: Christian Brauner <brauner@xxxxxxxxxx> > > Cc: linux-aio@xxxxxxxxx > > Cc: linux-fsdevel@xxxxxxxxxxxxxxx > > --- > > fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++--------- > > 1 file changed, 56 insertions(+), 10 deletions(-) > > > > diff --git a/fs/aio.c b/fs/aio.c > > index f8589caef9c1..c69e7caacd1b 100644 > > --- a/fs/aio.c > > +++ b/fs/aio.c > > @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) > > kmem_cache_free(kiocb_cachep, iocb); > > } > > > > +struct aio_waiter { > > + struct wait_queue_entry w; > > + size_t min_nr; > > +}; > > + > > /* aio_complete > > * Called when the io request on the given iocb is complete. > > */ > > @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb) > > struct kioctx *ctx = iocb->ki_ctx; > > struct aio_ring *ring; > > struct io_event *ev_page, *event; > > - unsigned tail, pos, head; > > + unsigned tail, pos, head, avail; > > unsigned long flags; > > > > /* > > @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb) > > ctx->completed_events++; > > if (ctx->completed_events > 1) > > refill_reqs_available(ctx, head, tail); > > + > > + avail = tail > head > > + ? tail - head > > + : tail + ctx->nr_events - head; > > spin_unlock_irqrestore(&ctx->completion_lock, flags); > > > > pr_debug("added to ring %p at [%u]\n", iocb, tail); > > @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb) > > */ > > smp_mb(); > > > > - if (waitqueue_active(&ctx->wait)) > > - wake_up(&ctx->wait); > > + if (waitqueue_active(&ctx->wait)) { > > + struct aio_waiter *curr, *next; > > + unsigned long flags; > > + > > + spin_lock_irqsave(&ctx->wait.lock, flags); > > + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry) > > + if (avail >= curr->min_nr) { > > + list_del_init_careful(&curr->w.entry); > > + wake_up_process(curr->w.private); > > + } > > + spin_unlock_irqrestore(&ctx->wait.lock, flags); > > + } > > } > > > > static inline void iocb_put(struct aio_kiocb *iocb) > > @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > > struct io_event __user *event, > > ktime_t until) > > { > > - long ret = 0; > > + struct hrtimer_sleeper t; > > + struct aio_waiter w; > > + long ret = 0, ret2 = 0; > > > > /* > > * Note that aio_read_events() is being called as the conditional - i.e. > > @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > > * the ringbuffer empty. So in practice we should be ok, but it's > > * something to be aware of when touching this code. > > */ > > - if (until == 0) > > - aio_read_events(ctx, min_nr, nr, event, &ret); > > - else > > - wait_event_interruptible_hrtimeout(ctx->wait, > > - aio_read_events(ctx, min_nr, nr, event, &ret), > > - until); > > + aio_read_events(ctx, min_nr, nr, event, &ret); > > + if (until == 0 || ret < 0 || ret >= min_nr) > > + return ret; > > + > > + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL); > > + if (until != KTIME_MAX) { > > + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns); > > + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL); > > + } > > + > > + init_wait(&w.w); > > + > > + while (1) { > > + unsigned long nr_got = ret; > > + > > + w.min_nr = min_nr - ret; > > Hm, can this underflow? No, because if ret >= min_nr aio_read_events() returned true, and we're done. > > + > > + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?: > > + !t.task ? -ETIME : 0; > > I'd like to avoid the nested ?: as that's rather hard to read. > I _think_ this is equivalent to: > > if (!ret2 && !t.task) > ret = -ETIME; > > I can just fix this in-tree though. Did I parse that correctly? You did, except it needs to be ret2 = -ETIME - we don't return that to userspace.