io_post_aux_cqe(), which is used for multishot requests, delays completions by putting CQEs into a temporary array for the purpose completion lock/flush batching. DEFER_TASKRUN doesn't need any locking, so for it we can put completions directly into the CQ and defer post completion handling with a flag. That leaves !DEFER_TASKRUN, which is not that interesting / hot for multishot requests, so have conditional locking with deferred flush for them. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- include/linux/io_uring_types.h | 3 +- io_uring/io_uring.c | 62 +++++++--------------------------- io_uring/io_uring.h | 2 +- 3 files changed, 15 insertions(+), 52 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 5a2afbc93887..ea7e5488b3be 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -205,6 +205,7 @@ struct io_submit_state { bool plug_started; bool need_plug; + bool cq_flush; unsigned short submit_nr; unsigned int cqes_count; struct blk_plug plug; @@ -342,8 +343,6 @@ struct io_ring_ctx { unsigned cq_last_tm_flush; } ____cacheline_aligned_in_smp; - struct io_uring_cqe completion_cqes[16]; - spinlock_t completion_lock; /* IRQ completion list, under ->completion_lock */ diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 9a4cc46582b2..f5e2b5bef10f 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -630,6 +630,12 @@ static inline void __io_cq_lock(struct io_ring_ctx *ctx) spin_lock(&ctx->completion_lock); } +static inline void __io_cq_unlock(struct io_ring_ctx *ctx) +{ + if (!ctx->lockless_cq) + spin_unlock(&ctx->completion_lock); +} + static inline void io_cq_lock(struct io_ring_ctx *ctx) __acquires(ctx->completion_lock) { @@ -882,31 +888,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, return false; } -static void __io_flush_post_cqes(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) -{ - struct io_submit_state *state = &ctx->submit_state; - unsigned int i; - - lockdep_assert_held(&ctx->uring_lock); - for (i = 0; i < state->cqes_count; i++) { - struct io_uring_cqe *cqe = &ctx->completion_cqes[i]; - - if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { - if (ctx->lockless_cq) { - spin_lock(&ctx->completion_lock); - io_cqring_event_overflow(ctx, cqe->user_data, - cqe->res, cqe->flags, 0, 0); - spin_unlock(&ctx->completion_lock); - } else { - io_cqring_event_overflow(ctx, cqe->user_data, - cqe->res, cqe->flags, 0, 0); - } - } - } - state->cqes_count = 0; -} - bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) { bool filled; @@ -927,31 +908,16 @@ bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags) { struct io_ring_ctx *ctx = req->ctx; - u64 user_data = req->cqe.user_data; - struct io_uring_cqe *cqe; + bool posted; lockdep_assert(!io_wq_current_is_worker()); lockdep_assert_held(&ctx->uring_lock); - if (ctx->submit_state.cqes_count == ARRAY_SIZE(ctx->completion_cqes)) { - __io_cq_lock(ctx); - __io_flush_post_cqes(ctx); - /* no need to flush - flush is deferred */ - __io_cq_unlock_post(ctx); - } - - /* For defered completions this is not as strict as it is otherwise, - * however it's main job is to prevent unbounded posted completions, - * and in that it works just as well. - */ - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) - return false; - - cqe = &ctx->completion_cqes[ctx->submit_state.cqes_count++]; - cqe->user_data = user_data; - cqe->res = res; - cqe->flags = cflags; - return true; + __io_cq_lock(ctx); + posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags); + ctx->submit_state.cq_flush = true; + __io_cq_unlock_post(ctx); + return posted; } static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) @@ -1545,9 +1511,6 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node; __io_cq_lock(ctx); - /* must come first to preserve CQE ordering in failure cases */ - if (state->cqes_count) - __io_flush_post_cqes(ctx); __wq_list_for_each(node, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -1569,6 +1532,7 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) io_free_batch_list(ctx, state->compl_reqs.first); INIT_WQ_LIST(&state->compl_reqs); } + ctx->submit_state.cq_flush = false; } static unsigned io_cqring_events(struct io_ring_ctx *ctx) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 460290e1bdec..5119265a11c2 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -156,7 +156,7 @@ static inline void io_req_task_work_add(struct io_kiocb *req) static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { if (!wq_list_empty(&ctx->submit_state.compl_reqs) || - ctx->submit_state.cqes_count) + ctx->submit_state.cq_flush) __io_submit_flush_completions(ctx); } -- 2.44.0