In preparation for being able to use these two elsewhere, factor out the helpers that io_req_local_work_add() uses to do wakeup batching. Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- io_uring/io_uring.c | 24 +++--------------------- io_uring/io_uring.h | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 94af56dd5344..499255ef62c7 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -1103,7 +1103,7 @@ void tctx_task_work(struct callback_head *cb) static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) { struct io_ring_ctx *ctx = req->ctx; - unsigned nr_wait, nr_tw, nr_tw_prev; + unsigned nr_tw, nr_tw_prev; struct llist_node *head; /* See comment above IO_CQ_WAKE_INIT */ @@ -1116,19 +1116,8 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) flags &= ~IOU_F_TWQ_LAZY_WAKE; - head = READ_ONCE(ctx->work_llist.first); do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } + head = io_defer_tw_count(ctx, &nr_tw_prev); /* * Theoretically, it can overflow, but that's fine as one of @@ -1158,14 +1147,7 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags) io_eventfd_signal(ctx); } - nr_wait = atomic_read(&ctx->cq_wait_nr); - /* not enough or no one is waiting */ - if (nr_tw < nr_wait) - return; - /* the previous add has already woken it up */ - if (nr_tw_prev >= nr_wait) - return; - wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE); + io_defer_wake(ctx, nr_tw, nr_tw_prev); } static void io_req_normal_work_add(struct io_kiocb *req) diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index cd43924eed04..fdcf1a2a6b8a 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -444,4 +444,48 @@ static inline bool io_has_work(struct io_ring_ctx *ctx) return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) || !llist_empty(&ctx->work_llist); } + +/* + * Return first request nr_tw field. Only applicable for users of + * ctx->work_llist, which is DEFER_TASKRUN. Must be called with the RCU read + * lock held. Returns the current task_work count and head of list, if any. + */ +static inline struct llist_node *io_defer_tw_count(struct io_ring_ctx *ctx, + unsigned *nr_tw_prev) +{ + struct llist_node *head = READ_ONCE(ctx->work_llist.first); + + *nr_tw_prev = 0; + if (head) { + struct io_kiocb *first; + + first = container_of(head, struct io_kiocb, io_task_work.node); + /* + * Might be executed at any moment, rely on + * SLAB_TYPESAFE_BY_RCU to keep it alive. + */ + *nr_tw_prev = READ_ONCE(first->nr_tw); + } + + return head; +} + +static inline void io_defer_wake(struct io_ring_ctx *ctx, unsigned nr_tw, + unsigned nr_tw_prev) +{ + struct task_struct *task = READ_ONCE(ctx->submitter_task); + unsigned nr_wait; + + /* add pending overflows, for MSG_RING */ + nr_tw += READ_ONCE(ctx->nr_overflow); + + nr_wait = atomic_read(&ctx->cq_wait_nr); + /* not enough or no one is waiting */ + if (nr_tw < nr_wait) + return; + /* the previous add has already woken it up */ + if (nr_tw_prev >= nr_wait) + return; + wake_up_state(task, TASK_INTERRUPTIBLE); +} #endif -- 2.43.0