Even though we place the req_issued and req_complete in separate cachelines, there's considerable overhead in doing the atomics particularly on the completion side. Get rid of having the two counters, and just use a percpu_counter for this. That's what it was made for, after all. This considerably reduces the overhead in __io_free_req(). Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- fs/io_uring.c | 50 ++++++++++++++++++++++------------------ include/linux/io_uring.h | 7 ++---- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 5af6dafcc669..6ad04eaf24dd 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1810,7 +1810,7 @@ static void __io_free_req(struct io_kiocb *req) io_dismantle_req(req); - atomic_long_inc(&tctx->req_complete); + percpu_counter_dec(&tctx->inflight); if (tctx->in_idle) wake_up(&tctx->wait); put_task_struct(req->task); @@ -2089,7 +2089,9 @@ static void io_req_free_batch_finish(struct io_ring_ctx *ctx, if (rb->to_free) __io_req_free_batch_flush(ctx, rb); if (rb->task) { - atomic_long_add(rb->task_refs, &rb->task->io_uring->req_complete); + struct io_uring_task *tctx = rb->task->io_uring; + + percpu_counter_sub(&tctx->inflight, rb->task_refs); put_task_struct_many(rb->task, rb->task_refs); rb->task = NULL; } @@ -2106,7 +2108,9 @@ static void io_req_free_batch(struct req_batch *rb, struct io_kiocb *req) if (req->task != rb->task) { if (rb->task) { - atomic_long_add(rb->task_refs, &rb->task->io_uring->req_complete); + struct io_uring_task *tctx = rb->task->io_uring; + + percpu_counter_sub(&tctx->inflight, rb->task_refs); put_task_struct_many(rb->task, rb->task_refs); } rb->task = req->task; @@ -6517,7 +6521,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) if (!percpu_ref_tryget_many(&ctx->refs, nr)) return -EAGAIN; - atomic_long_add(nr, ¤t->io_uring->req_issue); + percpu_counter_add(¤t->io_uring->inflight, nr); refcount_add(nr, ¤t->usage); io_submit_state_start(&state, ctx, nr); @@ -6559,10 +6563,12 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr) if (unlikely(submitted != nr)) { int ref_used = (submitted == -EAGAIN) ? 0 : submitted; + struct io_uring_task *tctx = current->io_uring; + int unused = nr - ref_used; - percpu_ref_put_many(&ctx->refs, nr - ref_used); - atomic_long_sub(nr - ref_used, ¤t->io_uring->req_issue); - put_task_struct_many(current, nr - ref_used); + percpu_ref_put_many(&ctx->refs, unused); + percpu_counter_sub(&tctx->inflight, unused); + put_task_struct_many(current, unused); } if (link) io_queue_link_head(link, &state.comp); @@ -7680,17 +7686,22 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx, static int io_uring_alloc_task_context(struct task_struct *task) { struct io_uring_task *tctx; + int ret; tctx = kmalloc(sizeof(*tctx), GFP_KERNEL); if (unlikely(!tctx)) return -ENOMEM; + ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL); + if (unlikely(ret)) { + kfree(tctx); + return ret; + } + xa_init(&tctx->xa); init_waitqueue_head(&tctx->wait); tctx->last = NULL; tctx->in_idle = 0; - atomic_long_set(&tctx->req_issue, 0); - atomic_long_set(&tctx->req_complete, 0); io_init_identity(&tctx->__identity); tctx->identity = &tctx->__identity; task->io_uring = tctx; @@ -7705,6 +7716,7 @@ void __io_uring_free(struct task_struct *tsk) WARN_ON_ONCE(refcount_read(&tctx->identity->count) != 1); if (tctx->identity != &tctx->__identity) kfree(tctx->identity); + percpu_counter_destroy(&tctx->inflight); kfree(tctx); tsk->io_uring = NULL; } @@ -8689,12 +8701,6 @@ void __io_uring_files_cancel(struct files_struct *files) } } -static inline bool io_uring_task_idle(struct io_uring_task *tctx) -{ - return atomic_long_read(&tctx->req_issue) == - atomic_long_read(&tctx->req_complete); -} - /* * Find any io_uring fd that this task has registered or done IO on, and cancel * requests. @@ -8703,14 +8709,16 @@ void __io_uring_task_cancel(void) { struct io_uring_task *tctx = current->io_uring; DEFINE_WAIT(wait); - long completions; + s64 inflight; /* make sure overflow events are dropped */ tctx->in_idle = true; - while (!io_uring_task_idle(tctx)) { + do { /* read completions before cancelations */ - completions = atomic_long_read(&tctx->req_complete); + inflight = percpu_counter_sum(&tctx->inflight); + if (!inflight) + break; __io_uring_files_cancel(NULL); prepare_to_wait(&tctx->wait, &wait, TASK_UNINTERRUPTIBLE); @@ -8719,12 +8727,10 @@ void __io_uring_task_cancel(void) * If we've seen completions, retry. This avoids a race where * a completion comes in before we did prepare_to_wait(). */ - if (completions != atomic_long_read(&tctx->req_complete)) + if (inflight != percpu_counter_sum(&tctx->inflight)) continue; - if (io_uring_task_idle(tctx)) - break; schedule(); - } + } while (1); finish_wait(&tctx->wait, &wait); tctx->in_idle = false; diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h index 607d14f61132..28939820b6b0 100644 --- a/include/linux/io_uring.h +++ b/include/linux/io_uring.h @@ -23,13 +23,10 @@ struct io_uring_task { struct xarray xa; struct wait_queue_head wait; struct file *last; - atomic_long_t req_issue; + struct percpu_counter inflight; struct io_identity __identity; struct io_identity *identity; - - /* completion side */ - bool in_idle ____cacheline_aligned_in_smp; - atomic_long_t req_complete; + bool in_idle; }; #if defined(CONFIG_IO_URING) -- 2.28.0