Acquire completion_lock at the start of __io_uring_register before registering/unregistering eventfd and release it at the end. Hence all calls to io_cqring_ev_posted which adds to the eventfd counter will finish before acquiring the spin_lock in io_uring_register, and all new calls will wait till the eventfd is registered. This avoids ring quiesce which is much more expensive than acquiring the spin_lock. On the system tested with this patch, io_uring_reigster with IORING_REGISTER_EVENTFD takes less than 1ms, compared to 15ms before. Signed-off-by: Usama Arif <usama.arif@xxxxxxxxxxxxx> Reviewed-by: Fam Zheng <fam.zheng@xxxxxxxxxxxxx> --- fs/io_uring.c | 50 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 2e04f718319d..e75d8abd225a 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1803,11 +1803,11 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) ctx->rings->sq_flags & ~IORING_SQ_CQ_OVERFLOW); } - if (posted) + if (posted) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - if (posted) io_cqring_ev_posted(ctx); + } + spin_unlock(&ctx->completion_lock); return all_flushed; } @@ -1971,8 +1971,8 @@ static void io_req_complete_post(struct io_kiocb *req, s32 res, spin_lock(&ctx->completion_lock); __io_req_complete_post(req, res, cflags); io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); } static inline void io_req_complete_state(struct io_kiocb *req, s32 res, @@ -2231,11 +2231,11 @@ static void __io_req_find_next_prep(struct io_kiocb *req) spin_lock(&ctx->completion_lock); posted = io_disarm_next(req); - if (posted) + if (posted) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - if (posted) io_cqring_ev_posted(ctx); + } + spin_unlock(&ctx->completion_lock); } static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) @@ -2272,8 +2272,8 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked) static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); } static void handle_prev_tw_list(struct io_wq_work_node *node, @@ -2535,8 +2535,8 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) } io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); state->flush_cqes = false; } @@ -5541,10 +5541,12 @@ static int io_poll_check_events(struct io_kiocb *req) filled = io_fill_cqe_aux(ctx, req->user_data, mask, IORING_CQE_F_MORE); io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - if (unlikely(!filled)) + if (unlikely(!filled)) { + spin_unlock(&ctx->completion_lock); return -ECANCELED; + } io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); } else if (req->result) { return 0; } @@ -5579,8 +5581,8 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked) hash_del(&req->hash_node); __io_req_complete_post(req, req->result, 0); io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); } static void io_apoll_task_func(struct io_kiocb *req, bool *locked) @@ -8351,8 +8353,8 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) spin_lock(&ctx->completion_lock); io_fill_cqe_aux(ctx, prsrc->tag, 0, 0); io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); + spin_unlock(&ctx->completion_lock); io_ring_submit_unlock(ctx, lock_ring); } @@ -9639,11 +9641,11 @@ static __cold bool io_kill_timeouts(struct io_ring_ctx *ctx, } } spin_unlock_irq(&ctx->timeout_lock); - if (canceled != 0) + if (canceled != 0) { io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); - if (canceled != 0) io_cqring_ev_posted(ctx); + } + spin_unlock(&ctx->completion_lock); return canceled != 0; } @@ -10970,6 +10972,8 @@ static bool io_register_op_must_quiesce(int op) case IORING_REGISTER_IOWQ_AFF: case IORING_UNREGISTER_IOWQ_AFF: case IORING_REGISTER_IOWQ_MAX_WORKERS: + case IORING_REGISTER_EVENTFD: + case IORING_UNREGISTER_EVENTFD: return false; default: return true; @@ -11030,6 +11034,17 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, return -EACCES; } + /* + * Acquire completion_lock at the start of __io_uring_register before + * registering/unregistering eventfd and release it at the end. Any + * completion events pending before this call will finish before acquiring + * the spin_lock here, and all new completion events will wait till the + * eventfd is registered. This avoids ring quiesce which is much more + * expensive then acquiring spin_lock. + */ + if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD) + spin_lock(&ctx->completion_lock); + if (io_register_op_must_quiesce(opcode)) { ret = io_ctx_quiesce(ctx); if (ret) @@ -11141,6 +11156,9 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; } + if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD) + spin_unlock(&ctx->completion_lock); + if (io_register_op_must_quiesce(opcode)) { /* bring the ctx back to life */ percpu_ref_reinit(&ctx->refs); -- 2.25.1