On 11/22/24 16:12, Jens Axboe wrote:
...
static inline void io_req_local_work_add(struct io_kiocb *req,
struct io_ring_ctx *ctx,
- unsigned flags)
+ unsigned tw_flags)
{
- unsigned nr_wait, nr_tw, nr_tw_prev;
- struct llist_node *head;
+ unsigned nr_tw, nr_tw_prev, nr_wait;
+ unsigned long flags;
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
/*
- * We don't know how many reuqests is there in the link and whether
- * they can even be queued lazily, fall back to non-lazy.
+ * We don't know how many requests are in the link and whether they can
+ * even be queued lazily, fall back to non-lazy.
*/
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
- flags &= ~IOU_F_TWQ_LAZY_WAKE;
+ tw_flags &= ~IOU_F_TWQ_LAZY_WAKE;
- guard(rcu)();
protects against ctx->task deallocation, see a comment in
io_ring_exit_work() -> synchronize_rcu()
+ spin_lock_irqsave(&ctx->work_lock, flags);
+ wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list);
+ nr_tw_prev = ctx->work_items++;
Is there a good reason why it changes the semantics of
what's stored across adds? It was assigning a corrected
nr_tw, this one will start heavily spamming with wake_up()
in some cases.
+ spin_unlock_irqrestore(&ctx->work_lock, flags);
- head = READ_ONCE(ctx->work_llist.first);
- do {
- nr_tw_prev = 0;
- if (head) {
- struct io_kiocb *first_req = container_of(head,
- struct io_kiocb,
- io_task_work.node);
- /*
- * Might be executed at any moment, rely on
- * SLAB_TYPESAFE_BY_RCU to keep it alive.
- */
- nr_tw_prev = READ_ONCE(first_req->nr_tw);
- }
-
- /*
- * Theoretically, it can overflow, but that's fine as one of
- * previous adds should've tried to wake the task.
- */
- nr_tw = nr_tw_prev + 1;
- if (!(flags & IOU_F_TWQ_LAZY_WAKE))
- nr_tw = IO_CQ_WAKE_FORCE;
-
- req->nr_tw = nr_tw;
- req->io_task_work.node.next = head;
- } while (!try_cmpxchg(&ctx->work_llist.first, &head,
- &req->io_task_work.node));
-
- /*
- * cmpxchg implies a full barrier, which pairs with the barrier
- * in set_current_state() on the io_cqring_wait() side. It's used
- * to ensure that either we see updated ->cq_wait_nr, or waiters
- * going to sleep will observe the work added to the list, which
- * is similar to the wait/wawke task state sync.
- */
+ nr_tw = nr_tw_prev + 1;
+ if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE))
+ nr_tw = IO_CQ_WAKE_FORCE;
- if (!head) {
+ if (!nr_tw_prev) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
io_eventfd_signal(ctx);
}
+ /*
+ * We need a barrier after unlock, which pairs with the barrier
+ * in set_current_state() on the io_cqring_wait() side. It's used
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
+ * going to sleep will observe the work added to the list, which
+ * is similar to the wait/wake task state sync.
+ */
+ smp_mb();
nr_wait = atomic_read(&ctx->cq_wait_nr);
/* not enough or no one is waiting */
if (nr_tw < nr_wait)
@@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
--
Pavel Begunkov