When only one task submits requests, most of CQEs are expected to be filled from that task context so we have natural serialisation. That would mean that in those cases we don't need spinlocking around CQE posting. One downside is that it also mean that io-wq workers can't emit CQEs directly but should do it through the original task context using task_works. That may hurt latency and performance and might matter much to some workloads, but it's not a huge deal in general as io-wq is a slow path and there is some additional merit from tw completion batching. The feature should be opted-in by the userspace by setting a new IORING_SETUP_PRIVATE_CQ flag. It doesn't work with IOPOLL, and also for now only the task that created a ring can submit requests to it. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 61 +++++++++++++++++++++++++++++++---- include/uapi/linux/io_uring.h | 1 + 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 5f895ad910b6..52a15b29f6e4 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2109,6 +2109,12 @@ static void io_req_complete_post(struct io_kiocb *req, s32 res, { struct io_ring_ctx *ctx = req->ctx; + /* todo cflags */ + if (ctx->flags & IORING_SETUP_PRIVATE_CQ) { + io_req_tw_queue_complete(req, res); + return; + } + spin_lock(&ctx->completion_lock); __io_req_complete_post(req, res, cflags); io_commit_cqring(ctx); @@ -2593,8 +2599,14 @@ static void io_req_task_complete(struct io_kiocb *req, bool *locked) io_req_complete_state(req, res, io_put_kbuf(req, 0)); io_req_add_compl_list(req); } else { - io_req_complete_post(req, res, + struct io_ring_ctx *ctx = req->ctx; + + spin_lock(&ctx->completion_lock); + __io_req_complete_post(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + io_cqring_ev_posted(ctx); } } @@ -2686,7 +2698,9 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_submit_state *state = &ctx->submit_state; if (state->flush_cqes) { - spin_lock(&ctx->completion_lock); + if (!(ctx->flags & IORING_SETUP_PRIVATE_CQ)) + spin_lock(&ctx->completion_lock); + wq_list_for_each(node, prev, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); @@ -2705,7 +2719,9 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) } io_commit_cqring(ctx); - spin_unlock(&ctx->completion_lock); + if (!(ctx->flags & IORING_SETUP_PRIVATE_CQ)) + spin_unlock(&ctx->completion_lock); + io_cqring_ev_posted(ctx); state->flush_cqes = false; } @@ -5895,8 +5911,10 @@ static int io_poll_check_events(struct io_kiocb *req) int v; /* req->task == current here, checking PF_EXITING is safe */ - if (unlikely(req->task->flags & PF_EXITING)) + if (unlikely(req->task->flags & PF_EXITING)) { io_poll_mark_cancelled(req); + return -ECANCELED; + } do { v = atomic_read(&req->poll_refs); @@ -9165,6 +9183,8 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, unsigned int done; bool needs_switch = false; + if (tags && (ctx->flags & IORING_SETUP_PRIVATE_CQ)) + return -EINVAL; if (!ctx->file_data) return -ENXIO; if (up->offset + nr_args > ctx->nr_user_files) @@ -9845,6 +9865,9 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx, __u32 done; int i, err; + if (tags && (ctx->flags & IORING_SETUP_PRIVATE_CQ)) + return -EINVAL; + if (!ctx->buf_data) return -ENXIO; if (up->offset + nr_args > ctx->nr_user_bufs) @@ -10389,6 +10412,23 @@ static __cold void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, } } +static __cold int io_uring_check_ctxs(struct io_ring_ctx *ctx) +{ + int nr_tctxs = 0, max_tctxs = 1; + struct list_head *pos; + + if (!(ctx->flags & IORING_SETUP_PRIVATE_CQ)) + return 0; + + if (ctx->flags & IORING_SETUP_IOPOLL) + return -EINVAL; + if (ctx->flags & IORING_SETUP_SQPOLL) + max_tctxs++; + list_for_each(pos, &ctx->tctx_list) + nr_tctxs++; + return nr_tctxs < max_tctxs ? 0 : -EINVAL; +} + static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) { struct io_uring_task *tctx = current->io_uring; @@ -10417,14 +10457,18 @@ static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) node->ctx = ctx; node->task = current; - ret = xa_err(xa_store(&tctx->xa, (unsigned long)ctx, + mutex_lock(&ctx->uring_lock); + ret = io_uring_check_ctxs(ctx); + if (!ret) { + ret = xa_err(xa_store(&tctx->xa, (unsigned long)ctx, node, GFP_KERNEL)); + } if (ret) { kfree(node); + mutex_unlock(&ctx->uring_lock); return ret; } - mutex_lock(&ctx->uring_lock); list_add(&node->ctx_node, &ctx->tctx_list); mutex_unlock(&ctx->uring_lock); } @@ -11349,7 +11393,8 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE | IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ | - IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL)) + IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL | + IORING_SETUP_PRIVATE_CQ)) return -EINVAL; return io_uring_create(entries, &p, params); @@ -11561,6 +11606,8 @@ static __cold int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg, /* keep it extendible */ if (size != sizeof(rr)) return -EINVAL; + if (rr.tags && (ctx->flags & IORING_SETUP_PRIVATE_CQ)) + return -EINVAL; memset(&rr, 0, sizeof(rr)); if (copy_from_user(&rr, arg, size)) diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index d2be4eb22008..342fab169b83 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -102,6 +102,7 @@ enum { #define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */ #define IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */ #define IORING_SETUP_SUBMIT_ALL (1U << 7) /* continue submit on error */ +#define IORING_SETUP_PRIVATE_CQ (1U << 8) enum { IORING_OP_NOP, -- 2.35.1