Instead of eagerly running all available local tw, limit the amount of local tw done to the max of IO_LOCAL_TW_DEFAULT_MAX (20) or wait_nr. The value of 20 is chosen as a reasonable heuristic to allow enough work batching but also keep latency down. Add a retry_llist that maintains a list of local tw that couldn't be done in time. No synchronisation is needed since it is only modified within the task context. Signed-off-by: David Wei <dw@xxxxxxxxxxx> --- include/linux/io_uring_types.h | 1 + io_uring/io_uring.c | 43 +++++++++++++++++++++++++--------- io_uring/io_uring.h | 2 +- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 593c10a02144..011860ade268 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -336,6 +336,7 @@ struct io_ring_ctx { */ struct { struct llist_head work_llist; + struct llist_head retry_llist; unsigned long check_cq; atomic_t cq_wait_nr; atomic_t cq_timeouts; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 83bf041d2648..c3a7d0197636 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -121,6 +121,7 @@ #define IO_COMPL_BATCH 32 #define IO_REQ_ALLOC_BATCH 8 +#define IO_LOCAL_TW_DEFAULT_MAX 20 struct io_defer_entry { struct list_head list; @@ -1255,6 +1256,8 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) struct llist_node *node = llist_del_all(&ctx->work_llist); __io_fallback_tw(node, false); + node = llist_del_all(&ctx->retry_llist); + __io_fallback_tw(node, false); } static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, @@ -1269,37 +1272,55 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, return false; } +static int __io_run_local_work_loop(struct llist_node **node, + struct io_tw_state *ts, + int events) +{ + while (*node) { + struct llist_node *next = (*node)->next; + struct io_kiocb *req = container_of(*node, struct io_kiocb, + io_task_work.node); + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + req, ts); + *node = next; + if (--events <= 0) + break; + } + + return events; +} + static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, int min_events) { struct llist_node *node; unsigned int loops = 0; - int ret = 0; + int ret, limit; if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events); again: + ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit); + if (ctx->retry_llist.first) + goto retry_done; + /* * llists are in reverse order, flip it back the right way before * running the pending items. */ node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - while (node) { - struct llist_node *next = node->next; - struct io_kiocb *req = container_of(node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - req, ts); - ret++; - node = next; - } + ret = __io_run_local_work_loop(&node, ts, ret); + ctx->retry_llist.first = node; loops++; + ret = limit - ret; if (io_run_local_work_continue(ctx, ret, min_events)) goto again; +retry_done: io_submit_flush_completions(ctx); if (io_run_local_work_continue(ctx, ret, min_events)) goto again; diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 69eb3b23a5a0..12abee607e4a 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -349,7 +349,7 @@ static inline int io_run_task_work(void) static inline bool io_local_work_pending(struct io_ring_ctx *ctx) { - return !llist_empty(&ctx->work_llist); + return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); } static inline bool io_task_work_pending(struct io_ring_ctx *ctx) -- 2.43.5