Use the exported helper for queueing task_work for message passing, rather than rolling our own. Note that this is only done for strict data messages for now, file descriptor passing messages still rely on the kernel task_work. It could get converted at some point if it's performance critical. This improves peak performance of message passing by about 5x in some basic testing, with 2 threads just sending messages to each other. Before this change, it was capped at around 700K/sec, with the change it's at over 4M/sec. Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- io_uring/msg_ring.c | 90 +++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c index 9fdb0cc19bfd..ad7d67d44461 100644 --- a/io_uring/msg_ring.c +++ b/io_uring/msg_ring.c @@ -13,7 +13,6 @@ #include "filetable.h" #include "msg_ring.h" - /* All valid masks for MSG_RING */ #define IORING_MSG_RING_MASK (IORING_MSG_RING_CQE_SKIP | \ IORING_MSG_RING_FLAGS_PASS) @@ -71,54 +70,43 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx) return target_ctx->task_complete; } -static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func) +static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts) { - struct io_ring_ctx *ctx = req->file->private_data; - struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); - struct task_struct *task = READ_ONCE(ctx->submitter_task); - - if (unlikely(!task)) - return -EOWNERDEAD; + struct io_ring_ctx *ctx = req->ctx; - init_task_work(&msg->tw, func); - if (task_work_add(task, &msg->tw, TWA_SIGNAL)) - return -EOWNERDEAD; + io_add_aux_cqe(ctx, req->cqe.user_data, req->cqe.res, req->cqe.flags); + kmem_cache_free(req_cachep, req); + percpu_ref_put(&ctx->refs); +} - return IOU_ISSUE_SKIP_COMPLETE; +static void io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req, + int res, u32 cflags, u64 user_data) +{ + req->cqe.user_data = user_data; + io_req_set_res(req, res, cflags); + percpu_ref_get(&ctx->refs); + req->ctx = ctx; + req->task = READ_ONCE(ctx->submitter_task); + req->io_task_work.func = io_msg_tw_complete; + io_req_task_work_add_remote(req, ctx, IOU_F_TWQ_LAZY_WAKE); } -static void io_msg_tw_complete(struct callback_head *head) +static int io_msg_data_remote(struct io_kiocb *req) { - struct io_msg *msg = container_of(head, struct io_msg, tw); - struct io_kiocb *req = cmd_to_io_kiocb(msg); struct io_ring_ctx *target_ctx = req->file->private_data; - int ret = 0; - - if (current->flags & PF_EXITING) { - ret = -EOWNERDEAD; - } else { - u32 flags = 0; - - if (msg->flags & IORING_MSG_RING_FLAGS_PASS) - flags = msg->cqe_flags; - - /* - * If the target ring is using IOPOLL mode, then we need to be - * holding the uring_lock for posting completions. Other ring - * types rely on the regular completion locking, which is - * handled while posting. - */ - if (target_ctx->flags & IORING_SETUP_IOPOLL) - mutex_lock(&target_ctx->uring_lock); - if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags)) - ret = -EOVERFLOW; - if (target_ctx->flags & IORING_SETUP_IOPOLL) - mutex_unlock(&target_ctx->uring_lock); - } + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct io_kiocb *target; + u32 flags = 0; - if (ret < 0) - req_set_fail(req); - io_req_queue_tw_complete(req, ret); + target = kmem_cache_alloc(req_cachep, GFP_KERNEL); + if (unlikely(!target)) + return -ENOMEM; + + if (msg->flags & IORING_MSG_RING_FLAGS_PASS) + flags = msg->cqe_flags; + + io_msg_remote_post(target_ctx, target, msg->len, flags, msg->user_data); + return 0; } static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags) @@ -136,7 +124,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags) return -EBADFD; if (io_msg_need_remote(target_ctx)) - return io_msg_exec_remote(req, io_msg_tw_complete); + return io_msg_data_remote(req); if (msg->flags & IORING_MSG_RING_FLAGS_PASS) flags = msg->cqe_flags; @@ -216,6 +204,22 @@ static void io_msg_tw_fd_complete(struct callback_head *head) io_req_queue_tw_complete(req, ret); } +static int io_msg_fd_remote(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->file->private_data; + struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg); + struct task_struct *task = READ_ONCE(ctx->submitter_task); + + if (unlikely(!task)) + return -EOWNERDEAD; + + init_task_work(&msg->tw, io_msg_tw_fd_complete); + if (task_work_add(task, &msg->tw, TWA_SIGNAL)) + return -EOWNERDEAD; + + return IOU_ISSUE_SKIP_COMPLETE; +} + static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) { struct io_ring_ctx *target_ctx = req->file->private_data; @@ -238,7 +242,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) } if (io_msg_need_remote(target_ctx)) - return io_msg_exec_remote(req, io_msg_tw_fd_complete); + return io_msg_fd_remote(req); return io_msg_install_complete(req, issue_flags); } -- 2.43.0