[RFC v2 18/19] io_uring: task_work for notification delivery

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



workqueues are way too heavy for tx notification delivery. We still
need some non-irq context because ->completion_lock is not irq-safe, so
use task_work instead. Expectedly, performance for test cases with real
hardware and juggling lots of notifications the perfomance is
drastically better, e.g. profiles percetage of relevant parts drops
from 30% to less than 3%

Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx>
---
 fs/io_uring.c | 57 ++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 43 insertions(+), 14 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8cfa8ea161e4..ee496b463462 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -330,11 +330,16 @@ struct io_submit_state {
 
 struct io_tx_notifier {
 	struct ubuf_info	uarg;
-	struct work_struct	commit_work;
 	struct percpu_ref	*fixed_rsrc_refs;
 	u64			tag;
 	u32			seq;
 	struct list_head	cache_node;
+	struct task_struct	*task;
+
+	union {
+		struct callback_head	task_work;
+		struct work_struct	commit_work;
+	};
 };
 
 struct io_tx_ctx {
@@ -1965,19 +1970,17 @@ static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data,
 	return __io_fill_cqe(ctx, user_data, res, cflags);
 }
 
-static void io_zc_tx_work_callback(struct work_struct *work)
+static void io_zc_tx_notifier_finish(struct callback_head *cb)
 {
-	struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
-						       commit_work);
+	struct io_tx_notifier *notifier = container_of(cb, struct io_tx_notifier,
+						       task_work);
 	struct io_ring_ctx *ctx = notifier->uarg.ctx;
 	struct percpu_ref *rsrc_refs = notifier->fixed_rsrc_refs;
 
 	spin_lock(&ctx->completion_lock);
 	io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0);
-
 	list_add(&notifier->cache_node, &ctx->ubuf_list_locked);
 	ctx->ubuf_locked_nr++;
-
 	io_commit_cqring(ctx);
 	spin_unlock(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
@@ -1985,6 +1988,14 @@ static void io_zc_tx_work_callback(struct work_struct *work)
 	percpu_ref_put(rsrc_refs);
 }
 
+static void io_zc_tx_work_callback(struct work_struct *work)
+{
+	struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
+						       commit_work);
+
+	io_zc_tx_notifier_finish(&notifier->task_work);
+}
+
 static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
 					  struct ubuf_info *uarg,
 					  bool success)
@@ -1994,21 +2005,39 @@ static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
 
 	if (!refcount_dec_and_test(&uarg->refcnt))
 		return;
+	if (unlikely(!notifier->task))
+		goto fallback;
 
-	if (in_interrupt()) {
-		INIT_WORK(&notifier->commit_work, io_zc_tx_work_callback);
-		queue_work(system_unbound_wq, &notifier->commit_work);
-	} else {
-		io_zc_tx_work_callback(&notifier->commit_work);
+	put_task_struct(notifier->task);
+	notifier->task = NULL;
+
+	if (!in_interrupt()) {
+		io_zc_tx_notifier_finish(&notifier->task_work);
+		return;
 	}
+
+	init_task_work(&notifier->task_work, io_zc_tx_notifier_finish);
+	if (likely(!task_work_add(notifier->task, &notifier->task_work,
+				  TWA_SIGNAL)))
+		return;
+
+fallback:
+	INIT_WORK(&notifier->commit_work, io_zc_tx_work_callback);
+	queue_work(system_unbound_wq, &notifier->commit_work);
 }
 
-static void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+static inline void __io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
 {
 	io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg, true);
 	tx_ctx->notifier = NULL;
 }
 
+static inline void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+{
+	tx_ctx->notifier->task = get_task_struct(current);
+	__io_tx_kill_notification(tx_ctx);
+}
+
 static void io_notifier_splice(struct io_ring_ctx *ctx)
 {
 	spin_lock(&ctx->completion_lock);
@@ -2058,7 +2087,7 @@ static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx,
 	} else {
 		gfp_t gfp_flags = GFP_ATOMIC|GFP_KERNEL_ACCOUNT;
 
-		notifier = kmalloc(sizeof(*notifier), gfp_flags);
+		notifier = kzalloc(sizeof(*notifier), gfp_flags);
 		if (!notifier)
 			return NULL;
 		ctx->nr_tx_ctx++;
@@ -9502,7 +9531,7 @@ static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx)
 		tx_ctx = &ctx->tx_ctxs[i];
 
 		if (tx_ctx->notifier)
-			io_tx_kill_notification(tx_ctx);
+			__io_tx_kill_notification(tx_ctx);
 	}
 }
 
-- 
2.34.1




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux