[RFC v2 11/19] io_uring: infrastructure for send zc notifications

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add a new ubuf_info callback io_uring_tx_zerocopy_callback(), which
should post an CQE when it completes. Also, implement some
infrastructuire for allocating and managing struct ubuf_info.

Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx>
---
 fs/io_uring.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 108 insertions(+), 6 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index a01f91e70fa5..92190679f3f6 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -329,6 +329,11 @@ struct io_submit_state {
 };
 
 struct io_tx_notifier {
+	struct ubuf_info	uarg;
+	struct work_struct	commit_work;
+	struct percpu_ref	*fixed_rsrc_refs;
+	u64			tag;
+	u32			seq;
 };
 
 struct io_tx_ctx {
@@ -1275,15 +1280,20 @@ static void io_rsrc_refs_refill(struct io_ring_ctx *ctx)
 	percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH);
 }
 
+static inline void io_set_rsrc_node(struct percpu_ref **rsrc_refs,
+				    struct io_ring_ctx *ctx)
+{
+	*rsrc_refs = &ctx->rsrc_node->refs;
+	ctx->rsrc_cached_refs--;
+	if (unlikely(ctx->rsrc_cached_refs < 0))
+		io_rsrc_refs_refill(ctx);
+}
+
 static inline void io_req_set_rsrc_node(struct io_kiocb *req,
 					struct io_ring_ctx *ctx)
 {
-	if (!req->fixed_rsrc_refs) {
-		req->fixed_rsrc_refs = &ctx->rsrc_node->refs;
-		ctx->rsrc_cached_refs--;
-		if (unlikely(ctx->rsrc_cached_refs < 0))
-			io_rsrc_refs_refill(ctx);
-	}
+	if (!req->fixed_rsrc_refs)
+		io_set_rsrc_node(&req->fixed_rsrc_refs, ctx);
 }
 
 static void io_refs_resurrect(struct percpu_ref *ref, struct completion *compl)
@@ -1930,6 +1940,76 @@ static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data,
 	return __io_fill_cqe(ctx, user_data, res, cflags);
 }
 
+static void io_zc_tx_work_callback(struct work_struct *work)
+{
+	struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
+						       commit_work);
+	struct io_ring_ctx *ctx = notifier->uarg.ctx;
+
+	spin_lock(&ctx->completion_lock);
+	io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0);
+	io_commit_cqring(ctx);
+	spin_unlock(&ctx->completion_lock);
+	io_cqring_ev_posted(ctx);
+
+	percpu_ref_put(notifier->fixed_rsrc_refs);
+	percpu_ref_put(&ctx->refs);
+	kfree(notifier);
+}
+
+static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
+					  struct ubuf_info *uarg,
+					  bool success)
+{
+	struct io_tx_notifier *notifier = container_of(uarg,
+						struct io_tx_notifier, uarg);
+
+	if (!refcount_dec_and_test(&uarg->refcnt))
+		return;
+
+	if (in_interrupt()) {
+		INIT_WORK(&notifier->commit_work, io_zc_tx_work_callback);
+		queue_work(system_unbound_wq, &notifier->commit_work);
+	} else {
+		io_zc_tx_work_callback(&notifier->commit_work);
+	}
+}
+
+static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx,
+						   struct io_tx_ctx *tx_ctx)
+{
+	struct io_tx_notifier *notifier;
+	struct ubuf_info *uarg;
+
+	notifier = kmalloc(sizeof(*notifier), GFP_ATOMIC);
+	if (!notifier)
+		return NULL;
+
+	WARN_ON_ONCE(!current->io_uring);
+	notifier->seq = tx_ctx->seq++;
+	notifier->tag = tx_ctx->tag;
+	io_set_rsrc_node(&notifier->fixed_rsrc_refs, ctx);
+
+	uarg = &notifier->uarg;
+	uarg->ctx = ctx;
+	uarg->flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
+	uarg->callback = io_uring_tx_zerocopy_callback;
+	refcount_set(&uarg->refcnt, 1);
+	percpu_ref_get(&ctx->refs);
+	return notifier;
+}
+
+__attribute__((unused))
+static inline struct io_tx_notifier *io_get_tx_notifier(struct io_ring_ctx *ctx,
+							struct io_tx_ctx *tx_ctx)
+{
+	if (tx_ctx->notifier)
+		return tx_ctx->notifier;
+
+	tx_ctx->notifier = io_alloc_tx_notifier(ctx, tx_ctx);
+	return tx_ctx->notifier;
+}
+
 static void io_req_complete_post(struct io_kiocb *req, s32 res,
 				 u32 cflags)
 {
@@ -9212,11 +9292,27 @@ static int io_buffer_validate(struct iovec *iov)
 	return 0;
 }
 
+static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx)
+{
+	struct io_tx_ctx *tx_ctx;
+	int i;
+
+	for (i = 0; i < ctx->nr_tx_ctxs; i++) {
+		tx_ctx = &ctx->tx_ctxs[i];
+		if (!tx_ctx->notifier)
+			continue;
+		io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg,
+					      true);
+		tx_ctx->notifier = NULL;
+	}
+}
+
 static int io_sqe_tx_ctx_unregister(struct io_ring_ctx *ctx)
 {
 	if (!ctx->nr_tx_ctxs)
 		return -ENXIO;
 
+	io_sqe_tx_ctx_kill_ubufs(ctx);
 	kvfree(ctx->tx_ctxs);
 	ctx->tx_ctxs = NULL;
 	ctx->nr_tx_ctxs = 0;
@@ -9608,6 +9704,12 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 			io_sq_thread_unpark(sqd);
 		}
 
+		if (READ_ONCE(ctx->nr_tx_ctxs)) {
+			mutex_lock(&ctx->uring_lock);
+			io_sqe_tx_ctx_kill_ubufs(ctx);
+			mutex_unlock(&ctx->uring_lock);
+		}
+
 		io_req_caches_free(ctx);
 
 		if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
-- 
2.34.1




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux