Add a spinlock for the list, and replace the lockless llist with the work list instead. This avoids needing to reverse items in the list before running them, as the io_wq_work_list is FIFO by nature whereas the llist is LIFO. Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- include/linux/io_uring_types.h | 13 ++- io_uring/io_uring.c | 194 ++++++++++++++++----------------- io_uring/io_uring.h | 2 +- 3 files changed, 104 insertions(+), 105 deletions(-) diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 011860ade268..e9ba99cb0ed0 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -335,8 +335,9 @@ struct io_ring_ctx { * regularly bounce b/w CPUs. */ struct { - struct llist_head work_llist; - struct llist_head retry_llist; + struct io_wq_work_list work_list; + spinlock_t work_lock; + int work_items; unsigned long check_cq; atomic_t cq_wait_nr; atomic_t cq_timeouts; @@ -566,7 +567,11 @@ enum { typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts); struct io_task_work { - struct llist_node node; + /* DEFER_TASKRUN uses work_node, regular task_work node */ + union { + struct io_wq_work_node work_node; + struct llist_node node; + }; io_req_tw_func_t func; }; @@ -622,8 +627,6 @@ struct io_kiocb { */ u16 buf_index; - unsigned nr_tw; - /* REQ_F_* flags */ io_req_flags_t flags; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index c3a7d0197636..b7eb962e9872 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -339,7 +339,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); - init_llist_head(&ctx->work_llist); + INIT_WQ_LIST(&ctx->work_list); + spin_lock_init(&ctx->work_lock); INIT_LIST_HEAD(&ctx->tctx_list); ctx->submit_state.free_list.next = NULL; INIT_HLIST_HEAD(&ctx->waitid_list); @@ -1066,25 +1067,31 @@ struct llist_node *io_handle_tw_list(struct llist_node *node, return node; } -static __cold void __io_fallback_tw(struct llist_node *node, bool sync) +static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync, + struct io_ring_ctx **last_ctx) { + if (sync && *last_ctx != req->ctx) { + if (*last_ctx) { + flush_delayed_work(&(*last_ctx)->fallback_work); + percpu_ref_put(&(*last_ctx)->refs); + } + *last_ctx = req->ctx; + percpu_ref_get(&(*last_ctx)->refs); + } + if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); +} + +static void io_fallback_tw(struct io_uring_task *tctx, bool sync) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); struct io_ring_ctx *last_ctx = NULL; struct io_kiocb *req; while (node) { req = container_of(node, struct io_kiocb, io_task_work.node); node = node->next; - if (sync && last_ctx != req->ctx) { - if (last_ctx) { - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); + __io_fallback_tw(req, sync, &last_ctx); } if (last_ctx) { @@ -1093,13 +1100,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync) } } -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) -{ - struct llist_node *node = llist_del_all(&tctx->task_list); - - __io_fallback_tw(node, sync); -} - struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count) @@ -1139,65 +1139,45 @@ void tctx_task_work(struct callback_head *cb) static inline void io_req_local_work_add(struct io_kiocb *req, struct io_ring_ctx *ctx, - unsigned flags) + unsigned tw_flags) { - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; + unsigned nr_tw, nr_tw_prev, nr_wait; + unsigned long flags; /* See comment above IO_CQ_WAKE_INIT */ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); /* - * We don't know how many reuqests is there in the link and whether - * they can even be queued lazily, fall back to non-lazy. + * We don't know how many requests are in the link and whether they can + * even be queued lazily, fall back to non-lazy. */ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) - flags &= ~IOU_F_TWQ_LAZY_WAKE; + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; - guard(rcu)(); + spin_lock_irqsave(&ctx->work_lock, flags); + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); + nr_tw_prev = ctx->work_items++; + spin_unlock_irqrestore(&ctx->work_lock, flags); - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); - - /* - * cmpxchg implies a full barrier, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wawke task state sync. - */ + nr_tw = nr_tw_prev + 1; + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; - if (!head) { + if (!nr_tw_prev) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd) io_eventfd_signal(ctx); } + /* + * We need a barrier after unlock, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wake task state sync. + */ + smp_mb(); nr_wait = atomic_read(&ctx->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx, static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) { - struct llist_node *node = llist_del_all(&ctx->work_llist); + struct io_ring_ctx *last_ctx = NULL; + struct io_wq_work_node *node; + unsigned long flags; - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); + spin_lock_irqsave(&ctx->work_lock, flags); + node = ctx->work_list.first; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irqrestore(&ctx->work_lock, flags); + + while (node) { + struct io_kiocb *req; + + req = container_of(node, struct io_kiocb, io_task_work.work_node); + node = node->next; + __io_fallback_tw(req, false, &last_ctx); + } + if (last_ctx) { + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } } static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, @@ -1272,52 +1268,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, return false; } -static int __io_run_local_work_loop(struct llist_node **node, - struct io_tw_state *ts, - int events) -{ - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - req, ts); - *node = next; - if (--events <= 0) - break; - } - - return events; -} - static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, int min_events) { - struct llist_node *node; + struct io_wq_work_node *node, *tail; + int ret, limit, nitems; unsigned int loops = 0; - int ret, limit; if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + ret = 0; limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events); again: - ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit); - if (ctx->retry_llist.first) + spin_lock_irq(&ctx->work_lock); + node = ctx->work_list.first; + tail = ctx->work_list.last; + nitems = ctx->work_items; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irq(&ctx->work_lock); + + while (node) { + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.work_node); + node = node->next; + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + req, ts); + nitems--; + if (++ret >= limit) + break; + } + + if (unlikely(node)) { + spin_lock_irq(&ctx->work_lock); + tail->next = ctx->work_list.first; + ctx->work_list.first = node; + if (!ctx->work_list.last) + ctx->work_list.last = tail; + ctx->work_items += nitems; + spin_unlock_irq(&ctx->work_lock); goto retry_done; + } - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret = __io_run_local_work_loop(&node, ts, ret); - ctx->retry_llist.first = node; loops++; - - ret = limit - ret; if (io_run_local_work_continue(ctx, ret, min_events)) goto again; retry_done: @@ -2413,7 +2409,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { atomic_set(&ctx->cq_wait_nr, 1); smp_mb(); - if (!llist_empty(&ctx->work_llist)) + if (io_local_work_pending(ctx)) goto out_wake; } diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 214f9f175102..2fae27803116 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -349,7 +349,7 @@ static inline int io_run_task_work(void) static inline bool io_local_work_pending(struct io_ring_ctx *ctx) { - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); + return READ_ONCE(ctx->work_list.first); } static inline bool io_task_work_pending(struct io_ring_ctx *ctx) -- 2.45.2