With defer taskrun we store aux cqes into a cache array and then flush into the CQ, and we also maintain the ordering so aux cqes are flushed before request completions. Why do we need the cache instead of pushing them directly? We acutally don't, so let's kill it. One nuance is synchronisation -- the path we touch here is only for DEFER_TASKRUN and guaranteed to be executed in the task context, and all cqe posting is serialised by that. We also don't need locks because of that, see __io_cq_lock(). Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- include/linux/io_uring_types.h | 4 +-- io_uring/io_uring.c | 55 ++++------------------------------ 2 files changed, 8 insertions(+), 51 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 854ad67a5f70..35c2945cb1bf 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -204,7 +204,9 @@ struct io_submit_state { bool plug_started; bool need_plug; + bool flush_cqes; unsigned short submit_nr; + unsigned int cqes_count; struct blk_plug plug; }; @@ -338,8 +340,6 @@ struct io_ring_ctx { unsigned cq_last_tm_flush; } ____cacheline_aligned_in_smp; - struct io_uring_cqe completion_cqes[16]; - spinlock_t completion_lock; /* IRQ completion list, under ->completion_lock */ diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 86761ec623f9..07f683368855 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -173,7 +173,7 @@ static struct ctl_table kernel_io_uring_disabled_table[] = { static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { if (!wq_list_empty(&ctx->submit_state.compl_reqs) || - ctx->submit_state.cqes_count) + ctx->submit_state.flush_cqes) __io_submit_flush_completions(ctx); } @@ -886,31 +886,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, return false; } -static void __io_flush_post_cqes(struct io_ring_ctx *ctx) - __must_hold(&ctx->uring_lock) -{ - struct io_submit_state *state = &ctx->submit_state; - unsigned int i; - - lockdep_assert_held(&ctx->uring_lock); - for (i = 0; i < state->cqes_count; i++) { - struct io_uring_cqe *cqe = &ctx->completion_cqes[i]; - - if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { - if (ctx->lockless_cq) { - spin_lock(&ctx->completion_lock); - io_cqring_event_overflow(ctx, cqe->user_data, - cqe->res, cqe->flags, 0, 0); - spin_unlock(&ctx->completion_lock); - } else { - io_cqring_event_overflow(ctx, cqe->user_data, - cqe->res, cqe->flags, 0, 0); - } - } - } - state->cqes_count = 0; -} - static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, bool allow_overflow) { @@ -938,32 +913,15 @@ bool io_fill_cqe_req_aux(struct io_kiocb *req, bool defer, s32 res, u32 cflags) { struct io_ring_ctx *ctx = req->ctx; u64 user_data = req->cqe.user_data; - struct io_uring_cqe *cqe; if (!defer) return __io_post_aux_cqe(ctx, user_data, res, cflags, false); lockdep_assert_held(&ctx->uring_lock); + io_lockdep_assert_cq_locked(ctx); - if (ctx->submit_state.cqes_count == ARRAY_SIZE(ctx->completion_cqes)) { - __io_cq_lock(ctx); - __io_flush_post_cqes(ctx); - /* no need to flush - flush is deferred */ - __io_cq_unlock_post(ctx); - } - - /* For defered completions this is not as strict as it is otherwise, - * however it's main job is to prevent unbounded posted completions, - * and in that it works just as well. - */ - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) - return false; - - cqe = &ctx->completion_cqes[ctx->submit_state.cqes_count++]; - cqe->user_data = user_data; - cqe->res = res; - cqe->flags = cflags; - return true; + ctx->submit_state.flush_cqes = true; + return io_fill_cqe_aux(ctx, user_data, res, cflags); } static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) @@ -1546,9 +1504,7 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node; __io_cq_lock(ctx); - /* must come first to preserve CQE ordering in failure cases */ - if (state->cqes_count) - __io_flush_post_cqes(ctx); + __wq_list_for_each(node, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -1570,6 +1526,7 @@ void __io_submit_flush_completions(struct io_ring_ctx *ctx) io_free_batch_list(ctx, state->compl_reqs.first); INIT_WQ_LIST(&state->compl_reqs); } + ctx->submit_state.flush_cqes = false; } static unsigned io_cqring_events(struct io_ring_ctx *ctx) -- 2.43.0