[PATCH 6/9] io_uring/msg_ring: add an alloc cache for CQE entries

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



io_uring accounts the memory allocated, which is quite expensive. Wrap
the allocation and frees in the provided alloc cache framework. The
target ctx needs to be locked anyway for posting the overflow entry,
so just move the overflow alloc inside that section. Flushing the
entries has it locked as well, so io_cache_alloc_free() can be used.

In a simple test, most of the overhead of DEFER_TASKRUN message passing
ends up being accounting for allocation and free, and with this change
it's completely gone.

Signed-off-by: Jens Axboe <axboe@xxxxxxxxx>
---
 include/linux/io_uring_types.h |  7 ++++
 io_uring/io_uring.c            |  7 +++-
 io_uring/msg_ring.c            | 76 +++++++++++++++++++++++-----------
 io_uring/msg_ring.h            |  3 ++
 4 files changed, 68 insertions(+), 25 deletions(-)

diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index d795f3f3a705..c7f3a330482d 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -350,6 +350,13 @@ struct io_ring_ctx {
 	struct io_alloc_cache	futex_cache;
 #endif
 
+	/*
+	 * Unlike the other caches, this one is used by the sender of messages
+	 * to this ring, not by the ring itself. As such, protection for this
+	 * cache is under ->completion_lock, not ->uring_lock.
+	 */
+	struct io_alloc_cache	msg_cache;
+
 	const struct cred	*sq_creds;	/* cred used for __io_sq_thread() */
 	struct io_sq_data	*sq_data;	/* if using sq thread polling */
 
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 499255ef62c7..3c77d96fc6d7 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -95,6 +95,7 @@
 #include "futex.h"
 #include "napi.h"
 #include "uring_cmd.h"
+#include "msg_ring.h"
 #include "memmap.h"
 
 #include "timeout.h"
@@ -316,6 +317,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	ret |= io_alloc_cache_init(&ctx->uring_cache, IO_ALLOC_CACHE_MAX,
 			    sizeof(struct uring_cache));
 	ret |= io_futex_cache_init(ctx);
+	ret |= io_msg_cache_init(ctx);
 	if (ret)
 		goto err;
 	init_completion(&ctx->ref_comp);
@@ -352,6 +354,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
 	io_alloc_cache_free(&ctx->uring_cache, kfree);
 	io_futex_cache_free(ctx);
+	io_msg_cache_free(ctx);
 	kfree(ctx->cancel_table.hbs);
 	kfree(ctx->cancel_table_locked.hbs);
 	xa_destroy(&ctx->io_bl_xa);
@@ -619,7 +622,8 @@ static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
 		}
 		list_del(&ocqe->list);
 		ctx->nr_overflow--;
-		kfree(ocqe);
+		if (!io_alloc_cache_put(&ctx->msg_cache, ocqe))
+			kfree(ocqe);
 	}
 
 	if (list_empty(&ctx->cq_overflow_list)) {
@@ -2556,6 +2560,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
 	io_alloc_cache_free(&ctx->uring_cache, kfree);
 	io_futex_cache_free(ctx);
+	io_msg_cache_free(ctx);
 	io_destroy_buffers(ctx);
 	mutex_unlock(&ctx->uring_lock);
 	if (ctx->sq_creds)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 1ee89bdbbb5b..a33228f8c364 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -11,6 +11,7 @@
 #include "io_uring.h"
 #include "rsrc.h"
 #include "filetable.h"
+#include "alloc_cache.h"
 #include "msg_ring.h"
 
 
@@ -72,20 +73,28 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
 }
 
 static struct io_overflow_cqe *io_alloc_overflow(struct io_ring_ctx *target_ctx)
+	__acquires(&target_ctx->completion_lock)
 {
-	bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
-	size_t cqe_size = sizeof(struct io_overflow_cqe);
 	struct io_overflow_cqe *ocqe;
 
-	if (is_cqe32)
-		cqe_size += sizeof(struct io_uring_cqe);
+	spin_lock(&target_ctx->completion_lock);
+
+	ocqe = io_alloc_cache_get(&target_ctx->msg_cache);
+	if (!ocqe) {
+		bool is_cqe32 = target_ctx->flags & IORING_SETUP_CQE32;
+		size_t cqe_size = sizeof(struct io_overflow_cqe);
 
-	ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
-	if (!ocqe)
-		return NULL;
+		if (is_cqe32)
+			cqe_size += sizeof(struct io_uring_cqe);
+
+		ocqe = kmalloc(cqe_size, GFP_ATOMIC | __GFP_ACCOUNT);
+		if (!ocqe)
+			return NULL;
 
-	if (is_cqe32)
-		ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+		/* just init at alloc time, won't change */
+		if (is_cqe32)
+			ocqe->cqe.big_cqe[0] = ocqe->cqe.big_cqe[1] = 0;
+	}
 
 	return ocqe;
 }
@@ -121,12 +130,13 @@ static int io_msg_fill_remote(struct io_msg *msg, unsigned int issue_flags,
 	struct io_overflow_cqe *ocqe;
 
 	ocqe = io_alloc_overflow(target_ctx);
-	if (!ocqe)
-		return -ENOMEM;
+	if (ocqe) {
+		io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
+		return 0;
+	}
 
-	spin_lock(&target_ctx->completion_lock);
-	io_msg_add_overflow(msg, target_ctx, ocqe, msg->len, flags);
-	return 0;
+	spin_unlock(&target_ctx->completion_lock);
+	return -ENOMEM;
 }
 
 static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
@@ -216,17 +226,17 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
 {
 	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
 	struct io_overflow_cqe *ocqe = NULL;
-	int ret;
+	int ret = -ENOMEM;
+
+	if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
+		return -EAGAIN;
 
 	if (!(msg->flags & IORING_MSG_RING_CQE_SKIP)) {
 		ocqe = io_alloc_overflow(target_ctx);
-		if (!ocqe)
-			return -ENOMEM;
-	}
-
-	if (unlikely(io_double_lock_ctx(target_ctx, issue_flags))) {
-		kfree(ocqe);
-		return -EAGAIN;
+		if (unlikely(!ocqe)) {
+			mutex_unlock(&target_ctx->uring_lock);
+			goto err;
+		}
 	}
 
 	ret = __io_fixed_fd_install(target_ctx, msg->src_file, msg->dst_fd);
@@ -236,12 +246,15 @@ static int io_msg_install_remote(struct io_kiocb *req, unsigned int issue_flags,
 		msg->src_file = NULL;
 		req->flags &= ~REQ_F_NEED_CLEANUP;
 		if (ocqe) {
-			spin_lock(&target_ctx->completion_lock);
 			io_msg_add_overflow(msg, target_ctx, ocqe, ret, 0);
 			return 0;
 		}
 	}
-	kfree(ocqe);
+	if (ocqe) {
+err:
+		spin_unlock(&target_ctx->completion_lock);
+		kfree(ocqe);
+	}
 	return ret;
 }
 
@@ -321,3 +334,18 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
 	io_req_set_res(req, ret, 0);
 	return IOU_OK;
 }
+
+int io_msg_cache_init(struct io_ring_ctx *ctx)
+{
+	size_t size = sizeof(struct io_overflow_cqe);
+
+	if (ctx->flags & IORING_SETUP_CQE32)
+		size += sizeof(struct io_uring_cqe);
+
+	return io_alloc_cache_init(&ctx->msg_cache, IO_ALLOC_CACHE_MAX, size);
+}
+
+void io_msg_cache_free(struct io_ring_ctx *ctx)
+{
+	io_alloc_cache_free(&ctx->msg_cache, kfree);
+}
diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h
index 3987ee6c0e5f..94f5716d522e 100644
--- a/io_uring/msg_ring.h
+++ b/io_uring/msg_ring.h
@@ -3,3 +3,6 @@
 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags);
 void io_msg_ring_cleanup(struct io_kiocb *req);
+
+int io_msg_cache_init(struct io_ring_ctx *ctx);
+void io_msg_cache_free(struct io_ring_ctx *ctx);
-- 
2.43.0





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux