[RFC] io_uring: avoid ring quiesce while registering/unregistering eventfd

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Acquire completion_lock at the start of __io_uring_register before
registering/unregistering eventfd and release it at the end. Hence
all calls to io_cqring_ev_posted which adds to the eventfd counter
will finish before acquiring the spin_lock in io_uring_register, and
all new calls will wait till the eventfd is registered. This avoids
ring quiesce which is much more expensive than acquiring the spin_lock.

On the system tested with this patch, io_uring_reigster with
IORING_REGISTER_EVENTFD takes less than 1ms, compared to 15ms before.

Signed-off-by: Usama Arif <usama.arif@xxxxxxxxxxxxx>
Reviewed-by: Fam Zheng <fam.zheng@xxxxxxxxxxxxx>
---
 fs/io_uring.c | 50 ++++++++++++++++++++++++++++++++++----------------
 1 file changed, 34 insertions(+), 16 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 2e04f718319d..e75d8abd225a 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -1803,11 +1803,11 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 			   ctx->rings->sq_flags & ~IORING_SQ_CQ_OVERFLOW);
 	}
 
-	if (posted)
+	if (posted) {
 		io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
-	if (posted)
 		io_cqring_ev_posted(ctx);
+	}
+	spin_unlock(&ctx->completion_lock);
 	return all_flushed;
 }
 
@@ -1971,8 +1971,8 @@ static void io_req_complete_post(struct io_kiocb *req, s32 res,
 	spin_lock(&ctx->completion_lock);
 	__io_req_complete_post(req, res, cflags);
 	io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
+	spin_unlock(&ctx->completion_lock);
 }
 
 static inline void io_req_complete_state(struct io_kiocb *req, s32 res,
@@ -2231,11 +2231,11 @@ static void __io_req_find_next_prep(struct io_kiocb *req)
 
 	spin_lock(&ctx->completion_lock);
 	posted = io_disarm_next(req);
-	if (posted)
+	if (posted) {
 		io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
-	if (posted)
 		io_cqring_ev_posted(ctx);
+	}
+	spin_unlock(&ctx->completion_lock);
 }
 
 static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
@@ -2272,8 +2272,8 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
 static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx)
 {
 	io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
+	spin_unlock(&ctx->completion_lock);
 }
 
 static void handle_prev_tw_list(struct io_wq_work_node *node,
@@ -2535,8 +2535,8 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
 		}
 
 		io_commit_cqring(ctx);
-		spin_unlock(&ctx->completion_lock);
 		io_cqring_ev_posted(ctx);
+		spin_unlock(&ctx->completion_lock);
 		state->flush_cqes = false;
 	}
 
@@ -5541,10 +5541,12 @@ static int io_poll_check_events(struct io_kiocb *req)
 			filled = io_fill_cqe_aux(ctx, req->user_data, mask,
 						 IORING_CQE_F_MORE);
 			io_commit_cqring(ctx);
-			spin_unlock(&ctx->completion_lock);
-			if (unlikely(!filled))
+			if (unlikely(!filled)) {
+				spin_unlock(&ctx->completion_lock);
 				return -ECANCELED;
+			}
 			io_cqring_ev_posted(ctx);
+			spin_unlock(&ctx->completion_lock);
 		} else if (req->result) {
 			return 0;
 		}
@@ -5579,8 +5581,8 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
 	hash_del(&req->hash_node);
 	__io_req_complete_post(req, req->result, 0);
 	io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
+	spin_unlock(&ctx->completion_lock);
 }
 
 static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
@@ -8351,8 +8353,8 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
 			spin_lock(&ctx->completion_lock);
 			io_fill_cqe_aux(ctx, prsrc->tag, 0, 0);
 			io_commit_cqring(ctx);
-			spin_unlock(&ctx->completion_lock);
 			io_cqring_ev_posted(ctx);
+			spin_unlock(&ctx->completion_lock);
 			io_ring_submit_unlock(ctx, lock_ring);
 		}
 
@@ -9639,11 +9641,11 @@ static __cold bool io_kill_timeouts(struct io_ring_ctx *ctx,
 		}
 	}
 	spin_unlock_irq(&ctx->timeout_lock);
-	if (canceled != 0)
+	if (canceled != 0) {
 		io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
-	if (canceled != 0)
 		io_cqring_ev_posted(ctx);
+	}
+	spin_unlock(&ctx->completion_lock);
 	return canceled != 0;
 }
 
@@ -10970,6 +10972,8 @@ static bool io_register_op_must_quiesce(int op)
 	case IORING_REGISTER_IOWQ_AFF:
 	case IORING_UNREGISTER_IOWQ_AFF:
 	case IORING_REGISTER_IOWQ_MAX_WORKERS:
+	case IORING_REGISTER_EVENTFD:
+	case IORING_UNREGISTER_EVENTFD:
 		return false;
 	default:
 		return true;
@@ -11030,6 +11034,17 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			return -EACCES;
 	}
 
+	/*
+	 * Acquire completion_lock at the start of __io_uring_register before
+	 * registering/unregistering eventfd and release it at the end. Any
+	 * completion events pending before this call will finish before acquiring
+	 * the spin_lock here, and all new completion events will wait till the
+	 * eventfd is registered. This avoids ring quiesce which is much more
+	 * expensive then acquiring spin_lock.
+	 */
+	if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD)
+		spin_lock(&ctx->completion_lock);
+
 	if (io_register_op_must_quiesce(opcode)) {
 		ret = io_ctx_quiesce(ctx);
 		if (ret)
@@ -11141,6 +11156,9 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 		break;
 	}
 
+	if (opcode == IORING_REGISTER_EVENTFD || opcode == IORING_UNREGISTER_EVENTFD)
+		spin_unlock(&ctx->completion_lock);
+
 	if (io_register_op_must_quiesce(opcode)) {
 		/* bring the ctx back to life */
 		percpu_ref_reinit(&ctx->refs);
-- 
2.25.1




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux