[PATCH next v1 2/2] io_uring: limit local tw done

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Instead of eagerly running all available local tw, limit the amount of
local tw done to the max of IO_LOCAL_TW_DEFAULT_MAX (20) or wait_nr. The
value of 20 is chosen as a reasonable heuristic to allow enough work
batching but also keep latency down.

Add a retry_llist that maintains a list of local tw that couldn't be
done in time. No synchronisation is needed since it is only modified
within the task context.

Signed-off-by: David Wei <dw@xxxxxxxxxxx>
---
 include/linux/io_uring_types.h |  1 +
 io_uring/io_uring.c            | 43 +++++++++++++++++++++++++---------
 io_uring/io_uring.h            |  2 +-
 3 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index 593c10a02144..011860ade268 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -336,6 +336,7 @@ struct io_ring_ctx {
 	 */
 	struct {
 		struct llist_head	work_llist;
+		struct llist_head	retry_llist;
 		unsigned long		check_cq;
 		atomic_t		cq_wait_nr;
 		atomic_t		cq_timeouts;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 83bf041d2648..c3a7d0197636 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -121,6 +121,7 @@
 
 #define IO_COMPL_BATCH			32
 #define IO_REQ_ALLOC_BATCH		8
+#define IO_LOCAL_TW_DEFAULT_MAX		20
 
 struct io_defer_entry {
 	struct list_head	list;
@@ -1255,6 +1256,8 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
 	struct llist_node *node = llist_del_all(&ctx->work_llist);
 
 	__io_fallback_tw(node, false);
+	node = llist_del_all(&ctx->retry_llist);
+	__io_fallback_tw(node, false);
 }
 
 static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
@@ -1269,37 +1272,55 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
 	return false;
 }
 
+static int __io_run_local_work_loop(struct llist_node **node,
+				    struct io_tw_state *ts,
+				    int events)
+{
+	while (*node) {
+		struct llist_node *next = (*node)->next;
+		struct io_kiocb *req = container_of(*node, struct io_kiocb,
+						    io_task_work.node);
+		INDIRECT_CALL_2(req->io_task_work.func,
+				io_poll_task_func, io_req_rw_complete,
+				req, ts);
+		*node = next;
+		if (--events <= 0)
+			break;
+	}
+
+	return events;
+}
+
 static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts,
 			       int min_events)
 {
 	struct llist_node *node;
 	unsigned int loops = 0;
-	int ret = 0;
+	int ret, limit;
 
 	if (WARN_ON_ONCE(ctx->submitter_task != current))
 		return -EEXIST;
 	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
 		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
+	limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events);
 again:
+	ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit);
+	if (ctx->retry_llist.first)
+		goto retry_done;
+
 	/*
 	 * llists are in reverse order, flip it back the right way before
 	 * running the pending items.
 	 */
 	node = llist_reverse_order(llist_del_all(&ctx->work_llist));
-	while (node) {
-		struct llist_node *next = node->next;
-		struct io_kiocb *req = container_of(node, struct io_kiocb,
-						    io_task_work.node);
-		INDIRECT_CALL_2(req->io_task_work.func,
-				io_poll_task_func, io_req_rw_complete,
-				req, ts);
-		ret++;
-		node = next;
-	}
+	ret = __io_run_local_work_loop(&node, ts, ret);
+	ctx->retry_llist.first = node;
 	loops++;
 
+	ret = limit - ret;
 	if (io_run_local_work_continue(ctx, ret, min_events))
 		goto again;
+retry_done:
 	io_submit_flush_completions(ctx);
 	if (io_run_local_work_continue(ctx, ret, min_events))
 		goto again;
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 69eb3b23a5a0..12abee607e4a 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -349,7 +349,7 @@ static inline int io_run_task_work(void)
 
 static inline bool io_local_work_pending(struct io_ring_ctx *ctx)
 {
-	return !llist_empty(&ctx->work_llist);
+	return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist);
 }
 
 static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
-- 
2.43.5





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux