workqueues are way too heavy for tx notification delivery. We still need some non-irq context because ->completion_lock is not irq-safe, so use task_work instead. Expectedly, performance for test cases with real hardware and juggling lots of notifications the perfomance is drastically better, e.g. profiles percetage of relevant parts drops from 30% to less than 3% Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 57 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 8cfa8ea161e4..ee496b463462 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -330,11 +330,16 @@ struct io_submit_state { struct io_tx_notifier { struct ubuf_info uarg; - struct work_struct commit_work; struct percpu_ref *fixed_rsrc_refs; u64 tag; u32 seq; struct list_head cache_node; + struct task_struct *task; + + union { + struct callback_head task_work; + struct work_struct commit_work; + }; }; struct io_tx_ctx { @@ -1965,19 +1970,17 @@ static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, return __io_fill_cqe(ctx, user_data, res, cflags); } -static void io_zc_tx_work_callback(struct work_struct *work) +static void io_zc_tx_notifier_finish(struct callback_head *cb) { - struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier, - commit_work); + struct io_tx_notifier *notifier = container_of(cb, struct io_tx_notifier, + task_work); struct io_ring_ctx *ctx = notifier->uarg.ctx; struct percpu_ref *rsrc_refs = notifier->fixed_rsrc_refs; spin_lock(&ctx->completion_lock); io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0); - list_add(¬ifier->cache_node, &ctx->ubuf_list_locked); ctx->ubuf_locked_nr++; - io_commit_cqring(ctx); spin_unlock(&ctx->completion_lock); io_cqring_ev_posted(ctx); @@ -1985,6 +1988,14 @@ static void io_zc_tx_work_callback(struct work_struct *work) percpu_ref_put(rsrc_refs); } +static void io_zc_tx_work_callback(struct work_struct *work) +{ + struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier, + commit_work); + + io_zc_tx_notifier_finish(¬ifier->task_work); +} + static void io_uring_tx_zerocopy_callback(struct sk_buff *skb, struct ubuf_info *uarg, bool success) @@ -1994,21 +2005,39 @@ static void io_uring_tx_zerocopy_callback(struct sk_buff *skb, if (!refcount_dec_and_test(&uarg->refcnt)) return; + if (unlikely(!notifier->task)) + goto fallback; - if (in_interrupt()) { - INIT_WORK(¬ifier->commit_work, io_zc_tx_work_callback); - queue_work(system_unbound_wq, ¬ifier->commit_work); - } else { - io_zc_tx_work_callback(¬ifier->commit_work); + put_task_struct(notifier->task); + notifier->task = NULL; + + if (!in_interrupt()) { + io_zc_tx_notifier_finish(¬ifier->task_work); + return; } + + init_task_work(¬ifier->task_work, io_zc_tx_notifier_finish); + if (likely(!task_work_add(notifier->task, ¬ifier->task_work, + TWA_SIGNAL))) + return; + +fallback: + INIT_WORK(¬ifier->commit_work, io_zc_tx_work_callback); + queue_work(system_unbound_wq, ¬ifier->commit_work); } -static void io_tx_kill_notification(struct io_tx_ctx *tx_ctx) +static inline void __io_tx_kill_notification(struct io_tx_ctx *tx_ctx) { io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg, true); tx_ctx->notifier = NULL; } +static inline void io_tx_kill_notification(struct io_tx_ctx *tx_ctx) +{ + tx_ctx->notifier->task = get_task_struct(current); + __io_tx_kill_notification(tx_ctx); +} + static void io_notifier_splice(struct io_ring_ctx *ctx) { spin_lock(&ctx->completion_lock); @@ -2058,7 +2087,7 @@ static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx, } else { gfp_t gfp_flags = GFP_ATOMIC|GFP_KERNEL_ACCOUNT; - notifier = kmalloc(sizeof(*notifier), gfp_flags); + notifier = kzalloc(sizeof(*notifier), gfp_flags); if (!notifier) return NULL; ctx->nr_tx_ctx++; @@ -9502,7 +9531,7 @@ static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx) tx_ctx = &ctx->tx_ctxs[i]; if (tx_ctx->notifier) - io_tx_kill_notification(tx_ctx); + __io_tx_kill_notification(tx_ctx); } } -- 2.34.1