On Wed, Nov 22, 2023 at 06:42:53PM -0500, Kent Overstreet wrote: > Unclear who's maintaining fs/aio.c these days - who wants to take this? > -- >8 -- > > I've been observing workloads where IPIs due to wakeups in > aio_complete() are ~15% of total CPU time in the profile. Most of those > wakeups are unnecessary when completion batching is in use in > io_getevents(). > > This plumbs min_nr through via the wait eventry, so that aio_complete() > can avoid doing unnecessary wakeups. > > Signed-off-by: Kent Overstreet <kent.overstreet@xxxxxxxxx> > Cc: Benjamin LaHaise <bcrl@xxxxxxxxx > Cc: Christian Brauner <brauner@xxxxxxxxxx> > Cc: linux-aio@xxxxxxxxx > Cc: linux-fsdevel@xxxxxxxxxxxxxxx > --- > fs/aio.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++--------- > 1 file changed, 56 insertions(+), 10 deletions(-) > > diff --git a/fs/aio.c b/fs/aio.c > index f8589caef9c1..c69e7caacd1b 100644 > --- a/fs/aio.c > +++ b/fs/aio.c > @@ -1106,6 +1106,11 @@ static inline void iocb_destroy(struct aio_kiocb *iocb) > kmem_cache_free(kiocb_cachep, iocb); > } > > +struct aio_waiter { > + struct wait_queue_entry w; > + size_t min_nr; > +}; > + > /* aio_complete > * Called when the io request on the given iocb is complete. > */ > @@ -1114,7 +1119,7 @@ static void aio_complete(struct aio_kiocb *iocb) > struct kioctx *ctx = iocb->ki_ctx; > struct aio_ring *ring; > struct io_event *ev_page, *event; > - unsigned tail, pos, head; > + unsigned tail, pos, head, avail; > unsigned long flags; > > /* > @@ -1156,6 +1161,10 @@ static void aio_complete(struct aio_kiocb *iocb) > ctx->completed_events++; > if (ctx->completed_events > 1) > refill_reqs_available(ctx, head, tail); > + > + avail = tail > head > + ? tail - head > + : tail + ctx->nr_events - head; > spin_unlock_irqrestore(&ctx->completion_lock, flags); > > pr_debug("added to ring %p at [%u]\n", iocb, tail); > @@ -1176,8 +1185,18 @@ static void aio_complete(struct aio_kiocb *iocb) > */ > smp_mb(); > > - if (waitqueue_active(&ctx->wait)) > - wake_up(&ctx->wait); > + if (waitqueue_active(&ctx->wait)) { > + struct aio_waiter *curr, *next; > + unsigned long flags; > + > + spin_lock_irqsave(&ctx->wait.lock, flags); > + list_for_each_entry_safe(curr, next, &ctx->wait.head, w.entry) > + if (avail >= curr->min_nr) { > + list_del_init_careful(&curr->w.entry); > + wake_up_process(curr->w.private); > + } > + spin_unlock_irqrestore(&ctx->wait.lock, flags); > + } > } > > static inline void iocb_put(struct aio_kiocb *iocb) > @@ -1290,7 +1309,9 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > struct io_event __user *event, > ktime_t until) > { > - long ret = 0; > + struct hrtimer_sleeper t; > + struct aio_waiter w; > + long ret = 0, ret2 = 0; > > /* > * Note that aio_read_events() is being called as the conditional - i.e. > @@ -1306,12 +1327,37 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr, > * the ringbuffer empty. So in practice we should be ok, but it's > * something to be aware of when touching this code. > */ > - if (until == 0) > - aio_read_events(ctx, min_nr, nr, event, &ret); > - else > - wait_event_interruptible_hrtimeout(ctx->wait, > - aio_read_events(ctx, min_nr, nr, event, &ret), > - until); > + aio_read_events(ctx, min_nr, nr, event, &ret); > + if (until == 0 || ret < 0 || ret >= min_nr) > + return ret; > + > + hrtimer_init_sleeper_on_stack(&t, CLOCK_MONOTONIC, HRTIMER_MODE_REL); > + if (until != KTIME_MAX) { > + hrtimer_set_expires_range_ns(&t.timer, until, current->timer_slack_ns); > + hrtimer_sleeper_start_expires(&t, HRTIMER_MODE_REL); > + } > + > + init_wait(&w.w); > + > + while (1) { > + unsigned long nr_got = ret; > + > + w.min_nr = min_nr - ret; Hm, can this underflow? > + > + ret2 = prepare_to_wait_event(&ctx->wait, &w.w, TASK_INTERRUPTIBLE) ?: > + !t.task ? -ETIME : 0; I'd like to avoid the nested ?: as that's rather hard to read. I _think_ this is equivalent to: if (!ret2 && !t.task) ret = -ETIME; I can just fix this in-tree though. Did I parse that correctly? > + > + if (aio_read_events(ctx, min_nr, nr, event, &ret) || ret2) > + break; > + > + if (nr_got == ret) > + schedule(); > + } > + > + finish_wait(&ctx->wait, &w.w); > + hrtimer_cancel(&t.timer); > + destroy_hrtimer_on_stack(&t.timer); > + > return ret; > } > > -- > 2.42.0 >