On Mon, May 24, 2021 at 7:51 PM Pavel Begunkov <asml.silence@xxxxxxxxx> wrote: > > tctx in submission part is always synchronised because is executed from > the task's context, so we can batch allocate tctx/task references and > store them across syscall boundaries. It avoids enough of operations, > including an atomic for getting task ref and a percpu_counter_add() > function call, which still fallback to spinlock for large batching > cases (around >=32). Should be good for SQPOLL submitting in small > portions and coming at some moment bpf submissions. > > Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> > --- > fs/io_uring.c | 37 ++++++++++++++++++++++++++++--------- > 1 file changed, 28 insertions(+), 9 deletions(-) > > diff --git a/fs/io_uring.c b/fs/io_uring.c > index 2b2d70a58a87..a95d55a0f9be 100644 > --- a/fs/io_uring.c > +++ b/fs/io_uring.c > @@ -110,6 +110,8 @@ > IOSQE_IO_HARDLINK | IOSQE_ASYNC | \ > IOSQE_BUFFER_SELECT) > > +#define IO_TCTX_REFS_CACHE_NR (1U << 10) > + > struct io_uring { > u32 head ____cacheline_aligned_in_smp; > u32 tail ____cacheline_aligned_in_smp; > @@ -472,6 +474,7 @@ struct io_ring_ctx { > > struct io_uring_task { > /* submission side */ > + int cached_refs; > struct xarray xa; > struct wait_queue_head wait; > const struct io_ring_ctx *last; > @@ -6702,16 +6705,23 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) > > static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) > { > + struct io_uring_task *tctx; > int submitted = 0; > > /* make sure SQ entry isn't read before tail */ > nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx)); > - > if (!percpu_ref_tryget_many(&ctx->refs, nr)) > return -EAGAIN; > > - percpu_counter_add(¤t->io_uring->inflight, nr); > - refcount_add(nr, ¤t->usage); > + tctx = current->io_uring; > + tctx->cached_refs -= nr; > + if (unlikely(tctx->cached_refs < 0)) { > + unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR; Might be cleared to use: unsigned int refill = IO_TCTX_REFS_CACHE_NR - tctx->cached_refs; > + > + percpu_counter_add(&tctx->inflight, refill); > + refcount_add(refill, ¤t->usage); > + tctx->cached_refs += refill; > + } > io_submit_state_start(&ctx->submit_state, nr); > > while (submitted < nr) { > @@ -6737,12 +6747,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) > > if (unlikely(submitted != nr)) { > int ref_used = (submitted == -EAGAIN) ? 0 : submitted; > - struct io_uring_task *tctx = current->io_uring; > int unused = nr - ref_used; > > + current->io_uring->cached_refs += unused; > percpu_ref_put_many(&ctx->refs, unused); > - percpu_counter_sub(&tctx->inflight, unused); > - put_task_struct_many(current, unused); > } > > io_submit_state_end(&ctx->submit_state, ctx); > @@ -7924,7 +7932,7 @@ static int io_uring_alloc_task_context(struct task_struct *task, > struct io_uring_task *tctx; > int ret; > > - tctx = kmalloc(sizeof(*tctx), GFP_KERNEL); > + tctx = kzalloc(sizeof(*tctx), GFP_KERNEL); > if (unlikely(!tctx)) > return -ENOMEM; > > @@ -7944,13 +7952,11 @@ static int io_uring_alloc_task_context(struct task_struct *task, > > xa_init(&tctx->xa); > init_waitqueue_head(&tctx->wait); > - tctx->last = NULL; > atomic_set(&tctx->in_idle, 0); > atomic_set(&tctx->inflight_tracked, 0); > task->io_uring = tctx; > spin_lock_init(&tctx->task_lock); > INIT_WQ_LIST(&tctx->task_list); > - tctx->task_state = 0; > init_task_work(&tctx->task_work, tctx_task_work); > return 0; > } > @@ -7961,6 +7967,7 @@ void __io_uring_free(struct task_struct *tsk) > > WARN_ON_ONCE(!xa_empty(&tctx->xa)); > WARN_ON_ONCE(tctx->io_wq); > + WARN_ON_ONCE(tctx->cached_refs); > > percpu_counter_destroy(&tctx->inflight); > kfree(tctx); > @@ -9097,6 +9104,16 @@ static void io_uring_try_cancel(bool cancel_all) > } > } > > +static void io_uring_drop_tctx_refs(struct task_struct *task) > +{ > + struct io_uring_task *tctx = task->io_uring; > + unsigned int refs = tctx->cached_refs; > + > + tctx->cached_refs = 0; > + percpu_counter_sub(&tctx->inflight, refs); > + put_task_struct_many(task, refs); > +} > + > /* should only be called by SQPOLL task */ > static void io_uring_cancel_sqpoll(struct io_sq_data *sqd) > { > @@ -9112,6 +9129,7 @@ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd) > > WARN_ON_ONCE(!sqd || sqd->thread != current); > > + io_uring_drop_tctx_refs(current); > atomic_inc(&tctx->in_idle); > do { > /* read completions before cancelations */ > @@ -9149,6 +9167,7 @@ void __io_uring_cancel(struct files_struct *files) > io_wq_exit_start(tctx->io_wq); > > /* make sure overflow events are dropped */ > + io_uring_drop_tctx_refs(current); > atomic_inc(&tctx->in_idle); > do { > /* read completions before cancelations */ > -- > 2.31.1 >