Allow to pass CQ index from SQE to the end CQE generators, but support only one CQ for now. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 113 ++++++++++++++++++++++------------ include/uapi/linux/io_uring.h | 1 + 2 files changed, 75 insertions(+), 39 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 4fecd9da689e..356a5dc90f46 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -90,6 +90,8 @@ #define IORING_MAX_ENTRIES 32768 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES) +#define IO_DEFAULT_CQ 0 + /* * Shift of 9 is 512 entries, or exactly one page on 64-bit archs */ @@ -416,6 +418,7 @@ struct io_ring_ctx { unsigned cq_extra; struct wait_queue_head cq_wait; struct io_cqring cqs[1]; + unsigned int cq_nr; struct { spinlock_t completion_lock; @@ -832,6 +835,7 @@ struct io_kiocb { struct io_kiocb *link; struct percpu_ref *fixed_rsrc_refs; + u16 cq_idx; /* used with ctx->iopoll_list with reads/writes */ struct list_head inflight_entry; @@ -1034,7 +1038,8 @@ static void io_uring_cancel_sqpoll(struct io_sq_data *sqd); static struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx); static bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data, - long res, unsigned int cflags); + long res, unsigned int cflags, + unsigned int cq_idx); static void io_put_req(struct io_kiocb *req); static void io_put_req_deferred(struct io_kiocb *req, int nr); static void io_dismantle_req(struct io_kiocb *req); @@ -1207,13 +1212,15 @@ static void io_account_cq_overflow(struct io_ring_ctx *ctx) static bool req_need_defer(struct io_kiocb *req, u32 seq) { - if (unlikely(req->flags & REQ_F_IO_DRAIN)) { - struct io_ring_ctx *ctx = req->ctx; - - return seq + READ_ONCE(ctx->cq_extra) != ctx->cqs[0].cached_tail; - } + struct io_ring_ctx *ctx = req->ctx; + u32 cnt = 0; + int i; - return false; + if (!(req->flags & REQ_F_IO_DRAIN)) + return false; + for (i = 0; i < ctx->cq_nr; i++) + cnt += ctx->cqs[i].cached_tail; + return seq + READ_ONCE(ctx->cq_extra) != cnt; } static void io_req_track_inflight(struct io_kiocb *req) @@ -1289,7 +1296,8 @@ static void io_kill_timeout(struct io_kiocb *req, int status) atomic_set(&req->ctx->cq_timeouts, atomic_read(&req->ctx->cq_timeouts) + 1); list_del_init(&req->timeout.list); - io_cqring_fill_event(req->ctx, req->user_data, status, 0); + io_cqring_fill_event(req->ctx, req->user_data, status, 0, + req->cq_idx); io_put_req_deferred(req, 1); } } @@ -1346,10 +1354,13 @@ static void io_flush_timeouts(struct io_ring_ctx *ctx) static void io_commit_cqring(struct io_ring_ctx *ctx) { + int i; + io_flush_timeouts(ctx); /* order cqe stores with ring update */ - smp_store_release(&ctx->rings->cq.tail, ctx->cqs[0].cached_tail); + for (i = 0; i < ctx->cq_nr; i++) + smp_store_release(&ctx->cqs[i].rings->cq.tail, ctx->cqs[i].cached_tail); if (unlikely(!list_empty(&ctx->defer_list))) __io_queue_deferred(ctx); @@ -1362,25 +1373,27 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx) return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == ctx->sq_entries; } -static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) +static inline unsigned int __io_cqring_events(struct io_cqring *cq) { - return ctx->cqs[0].cached_tail - READ_ONCE(ctx->rings->cq.head); + return cq->cached_tail - READ_ONCE(cq->rings->cq.head); } -static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx) +static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx, + unsigned int idx) { - struct io_rings *rings = ctx->rings; - unsigned tail, mask = ctx->cqs[0].entries - 1; + struct io_cqring *cq = &ctx->cqs[idx]; + struct io_rings *rings = cq->rings; + unsigned tail, mask = cq->entries - 1; /* * writes to the cq entry need to come after reading head; the * control dependency is enough as we're using WRITE_ONCE to * fill the cq entry */ - if (__io_cqring_events(ctx) == ctx->cqs[0].entries) + if (__io_cqring_events(cq) == cq->entries) return NULL; - tail = ctx->cqs[0].cached_tail++; + tail = cq->cached_tail++; return &rings->cqes[tail & mask]; } @@ -1432,16 +1445,18 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) { unsigned long flags; bool all_flushed, posted; + struct io_cqring *cq = &ctx->cqs[IO_DEFAULT_CQ]; - if (!force && __io_cqring_events(ctx) == ctx->cqs[0].entries) + if (!force && __io_cqring_events(cq) == cq->entries) return false; posted = false; spin_lock_irqsave(&ctx->completion_lock, flags); while (!list_empty(&ctx->cq_overflow_list)) { - struct io_uring_cqe *cqe = io_get_cqe(ctx); + struct io_uring_cqe *cqe = io_get_cqe(ctx, IO_DEFAULT_CQ); struct io_overflow_cqe *ocqe; + if (!cqe && !force) break; ocqe = list_first_entry(&ctx->cq_overflow_list, @@ -1523,12 +1538,17 @@ static inline void req_ref_get(struct io_kiocb *req) } static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, - long res, unsigned int cflags) + long res, unsigned int cflags, + unsigned int cq_idx) { struct io_overflow_cqe *ocqe; + if (cq_idx != IO_DEFAULT_CQ) + goto overflow; + ocqe = kmalloc(sizeof(*ocqe), GFP_ATOMIC | __GFP_ACCOUNT); if (!ocqe) { +overflow: /* * If we're in ring overflow flush mode, or in task cancel mode, * or cannot allocate an overflow entry, then we need to drop it @@ -1550,7 +1570,8 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data, } static inline bool __io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data, - long res, unsigned int cflags) + long res, unsigned int cflags, + unsigned int cq_idx) { struct io_uring_cqe *cqe; @@ -1561,21 +1582,22 @@ static inline bool __io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data * submission (by quite a lot). Increment the overflow count in * the ring. */ - cqe = io_get_cqe(ctx); + cqe = io_get_cqe(ctx, cq_idx); if (likely(cqe)) { WRITE_ONCE(cqe->user_data, user_data); WRITE_ONCE(cqe->res, res); WRITE_ONCE(cqe->flags, cflags); return true; } - return io_cqring_event_overflow(ctx, user_data, res, cflags); + return io_cqring_event_overflow(ctx, user_data, res, cflags, cq_idx); } /* not as hot to bloat with inlining */ static noinline bool io_cqring_fill_event(struct io_ring_ctx *ctx, u64 user_data, - long res, unsigned int cflags) + long res, unsigned int cflags, + unsigned int cq_idx) { - return __io_cqring_fill_event(ctx, user_data, res, cflags); + return __io_cqring_fill_event(ctx, user_data, res, cflags, cq_idx); } static void io_req_complete_post(struct io_kiocb *req, long res, @@ -1585,7 +1607,7 @@ static void io_req_complete_post(struct io_kiocb *req, long res, unsigned long flags; spin_lock_irqsave(&ctx->completion_lock, flags); - __io_cqring_fill_event(ctx, req->user_data, res, cflags); + __io_cqring_fill_event(ctx, req->user_data, res, cflags, req->cq_idx); /* * If we're the last reference to this request, add to our locked * free_list cache. @@ -1797,7 +1819,7 @@ static bool io_kill_linked_timeout(struct io_kiocb *req) link->timeout.head = NULL; if (hrtimer_try_to_cancel(&io->timer) != -1) { io_cqring_fill_event(link->ctx, link->user_data, - -ECANCELED, 0); + -ECANCELED, 0, link->cq_idx); io_put_req_deferred(link, 1); return true; } @@ -1816,7 +1838,8 @@ static void io_fail_links(struct io_kiocb *req) link->link = NULL; trace_io_uring_fail_link(req, link); - io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0); + io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0, + link->cq_idx); io_put_req_deferred(link, 2); link = nxt; } @@ -2138,7 +2161,7 @@ static void io_submit_flush_completions(struct io_comp_state *cs, for (i = 0; i < nr; i++) { req = cs->reqs[i]; __io_cqring_fill_event(ctx, req->user_data, req->result, - req->compl.cflags); + req->compl.cflags, req->cq_idx); } io_commit_cqring(ctx); spin_unlock_irq(&ctx->completion_lock); @@ -2201,7 +2224,7 @@ static unsigned io_cqring_events(struct io_ring_ctx *ctx) { /* See comment at the top of this file */ smp_rmb(); - return __io_cqring_events(ctx); + return __io_cqring_events(&ctx->cqs[IO_DEFAULT_CQ]); } static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) @@ -2278,7 +2301,8 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, if (req->flags & REQ_F_BUFFER_SELECTED) cflags = io_put_rw_kbuf(req); - __io_cqring_fill_event(ctx, req->user_data, req->result, cflags); + __io_cqring_fill_event(ctx, req->user_data, req->result, cflags, + req->cq_idx); (*nr_events)++; if (req_ref_put_and_test(req)) @@ -4911,7 +4935,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask) } if (req->poll.events & EPOLLONESHOT) flags = 0; - if (!io_cqring_fill_event(ctx, req->user_data, error, flags)) { + if (!io_cqring_fill_event(ctx, req->user_data, error, flags, req->cq_idx)) { io_poll_remove_waitqs(req); req->poll.done = true; flags = 0; @@ -5242,7 +5266,8 @@ static bool io_poll_remove_one(struct io_kiocb *req) do_complete = io_poll_remove_waitqs(req); if (do_complete) { - io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0); + io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0, + req->cq_idx); io_commit_cqring(req->ctx); req_set_fail_links(req); io_put_req_deferred(req, 1); @@ -5494,7 +5519,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) atomic_set(&req->ctx->cq_timeouts, atomic_read(&req->ctx->cq_timeouts) + 1); - io_cqring_fill_event(ctx, req->user_data, -ETIME, 0); + io_cqring_fill_event(ctx, req->user_data, -ETIME, 0, req->cq_idx); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); @@ -5536,7 +5561,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data) return PTR_ERR(req); req_set_fail_links(req); - io_cqring_fill_event(ctx, req->user_data, -ECANCELED, 0); + io_cqring_fill_event(ctx, req->user_data, -ECANCELED, 0, req->cq_idx); io_put_req_deferred(req, 1); return 0; } @@ -5609,7 +5634,7 @@ static int io_timeout_remove(struct io_kiocb *req, unsigned int issue_flags) ret = io_timeout_update(ctx, tr->addr, &tr->ts, io_translate_timeout_mode(tr->flags)); - io_cqring_fill_event(ctx, req->user_data, ret, 0); + io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx); io_commit_cqring(ctx); spin_unlock_irq(&ctx->completion_lock); io_cqring_ev_posted(ctx); @@ -5761,7 +5786,7 @@ static void io_async_find_and_cancel(struct io_ring_ctx *ctx, done: if (!ret) ret = success_ret; - io_cqring_fill_event(ctx, req->user_data, ret, 0); + io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx); io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); io_cqring_ev_posted(ctx); @@ -5818,7 +5843,7 @@ static int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags) spin_lock_irq(&ctx->completion_lock); done: - io_cqring_fill_event(ctx, req->user_data, ret, 0); + io_cqring_fill_event(ctx, req->user_data, ret, 0, req->cq_idx); io_commit_cqring(ctx); spin_unlock_irq(&ctx->completion_lock); io_cqring_ev_posted(ctx); @@ -6516,6 +6541,11 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, req->result = 0; req->work.creds = NULL; + req->cq_idx = READ_ONCE(sqe->cq_idx); + if (unlikely(req->cq_idx >= ctx->cq_nr)) { + req->cq_idx = IO_DEFAULT_CQ; + return -EINVAL; + } /* enforce forwards compatibility on users */ if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) return -EINVAL; @@ -7548,7 +7578,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) io_ring_submit_lock(ctx, lock_ring); spin_lock_irqsave(&ctx->completion_lock, flags); - io_cqring_fill_event(ctx, prsrc->tag, 0, 0); + io_cqring_fill_event(ctx, prsrc->tag, 0, 0, IO_DEFAULT_CQ); ctx->cq_extra++; io_commit_cqring(ctx); spin_unlock_irqrestore(&ctx->completion_lock, flags); @@ -9484,7 +9514,6 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, /* make sure these are sane, as we already accounted them */ ctx->sq_entries = p->sq_entries; - ctx->cqs[0].entries = p->cq_entries; size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset); if (size == SIZE_MAX) @@ -9501,6 +9530,11 @@ static int io_allocate_scq_urings(struct io_ring_ctx *ctx, rings->sq_ring_entries = p->sq_entries; rings->cq_ring_entries = p->cq_entries; + ctx->cqs[0].cached_tail = 0; + ctx->cqs[0].rings = rings; + ctx->cqs[0].entries = p->cq_entries; + ctx->cq_nr = 1; + size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); if (size == SIZE_MAX) { io_mem_free(ctx->rings); @@ -10164,6 +10198,7 @@ static int __init io_uring_init(void) BUILD_BUG_SQE_ELEM(40, __u16, buf_index); BUILD_BUG_SQE_ELEM(42, __u16, personality); BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in); + BUILD_BUG_SQE_ELEM(48, __u16, cq_idx); BUILD_BUG_ON(sizeof(struct io_uring_files_update) != sizeof(struct io_uring_rsrc_update)); diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index e1ae46683301..c2dfb179360a 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -58,6 +58,7 @@ struct io_uring_sqe { /* personality to use, if used */ __u16 personality; __s32 splice_fd_in; + __u16 cq_idx; }; __u64 __pad2[3]; }; -- 2.31.1