tctx in submission part is always synchronised because is executed from the task's context, so we can batch allocate tctx/task references and store them across syscall boundaries. It avoids enough of operations, including an atomic for getting task ref and a percpu_counter_add() function call, which still fallback to spinlock for large batching cases (around >=32). Should be good for SQPOLL submitting in small portions and coming at some moment bpf submissions. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 692c91d03054..23b15ed98815 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -110,6 +110,8 @@ IOSQE_IO_HARDLINK | IOSQE_ASYNC | \ IOSQE_BUFFER_SELECT) +#define IO_TCTX_REFS_CACHE_NR (1U << 10) + struct io_uring { u32 head ____cacheline_aligned_in_smp; u32 tail ____cacheline_aligned_in_smp; @@ -472,6 +474,7 @@ struct io_ring_ctx { struct io_uring_task { /* submission side */ + int cached_refs; struct xarray xa; struct wait_queue_head wait; const struct io_ring_ctx *last; @@ -6702,16 +6705,23 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx) static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) { + struct io_uring_task *tctx; int submitted = 0; /* make sure SQ entry isn't read before tail */ nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx)); - if (!percpu_ref_tryget_many(&ctx->refs, nr)) return -EAGAIN; - percpu_counter_add(¤t->io_uring->inflight, nr); - refcount_add(nr, ¤t->usage); + tctx = current->io_uring; + tctx->cached_refs -= nr; + if (unlikely(tctx->cached_refs < 0)) { + unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR; + + percpu_counter_add(&tctx->inflight, refill); + refcount_add(refill, ¤t->usage); + tctx->cached_refs += refill; + } io_submit_state_start(&ctx->submit_state, nr); while (submitted < nr) { @@ -6737,12 +6747,10 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) if (unlikely(submitted != nr)) { int ref_used = (submitted == -EAGAIN) ? 0 : submitted; - struct io_uring_task *tctx = current->io_uring; int unused = nr - ref_used; + current->io_uring->cached_refs += unused; percpu_ref_put_many(&ctx->refs, unused); - percpu_counter_sub(&tctx->inflight, unused); - put_task_struct_many(current, unused); } io_submit_state_end(&ctx->submit_state, ctx); @@ -7924,7 +7932,7 @@ static int io_uring_alloc_task_context(struct task_struct *task, struct io_uring_task *tctx; int ret; - tctx = kmalloc(sizeof(*tctx), GFP_KERNEL); + tctx = kzalloc(sizeof(*tctx), GFP_KERNEL); if (unlikely(!tctx)) return -ENOMEM; @@ -7944,13 +7952,11 @@ static int io_uring_alloc_task_context(struct task_struct *task, xa_init(&tctx->xa); init_waitqueue_head(&tctx->wait); - tctx->last = NULL; atomic_set(&tctx->in_idle, 0); atomic_set(&tctx->inflight_tracked, 0); task->io_uring = tctx; spin_lock_init(&tctx->task_lock); INIT_WQ_LIST(&tctx->task_list); - tctx->task_state = 0; init_task_work(&tctx->task_work, tctx_task_work); return 0; } @@ -7961,6 +7967,7 @@ void __io_uring_free(struct task_struct *tsk) WARN_ON_ONCE(!xa_empty(&tctx->xa)); WARN_ON_ONCE(tctx->io_wq); + WARN_ON_ONCE(tctx->cached_refs); percpu_counter_destroy(&tctx->inflight); kfree(tctx); @@ -9105,6 +9112,16 @@ static void io_uring_try_cancel(bool cancel_all) } } +static void io_uring_drop_tctx_refs(struct task_struct *task) +{ + struct io_uring_task *tctx = task->io_uring; + unsigned int refs = tctx->cached_refs; + + tctx->cached_refs = 0; + percpu_counter_sub(&tctx->inflight, refs); + put_task_struct_many(task, refs); +} + /* should only be called by SQPOLL task */ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd) { @@ -9120,6 +9137,7 @@ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd) WARN_ON_ONCE(!sqd || sqd->thread != current); + io_uring_drop_tctx_refs(current); atomic_inc(&tctx->in_idle); do { /* read completions before cancelations */ @@ -9157,6 +9175,7 @@ void __io_uring_cancel(struct files_struct *files) io_wq_exit_start(tctx->io_wq); /* make sure overflow events are dropped */ + io_uring_drop_tctx_refs(current); atomic_inc(&tctx->in_idle); do { /* read completions before cancelations */ -- 2.31.1