The improvements in this patch are as follows: - Move the io_sqring_entries() definition up such that this function can be used in the io_sqring_full() implementation. - Introduce the new function io_cqring_full(). - Change several memory barriers into load acquire / store release instructions since the latter are faster on non-x86 CPUs. Note: on x86 CPUs smp_rmb() and smp_wmb() only involve a compiler barrier. - Ensure completions have been reaped from use space before these are overwritten by using smp_load_acquire() in __io_cqring_events(). Preceding __io_cqring_events() with smp_rmb() is not sufficient because the CPU may reorder READ_ONCE() in __io_cqring_events() with later memory accesses. This patch has been tested by running the liburing test suite on an x86 system. Cc: Pavel Begunkov <asml.silence@xxxxxxxxx> Cc: Xiaoguang Wang <xiaoguang.wang@xxxxxxxxxxxxxxxxx> Signed-off-by: Bart Van Assche <bvanassche@xxxxxxx> --- fs/io_uring.c | 72 +++++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 03748faa5295..287ad97ff992 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1699,16 +1699,31 @@ static void io_commit_cqring(struct io_ring_ctx *ctx) __io_queue_deferred(ctx); } +static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) +{ + struct io_rings *rings = ctx->rings; + + /* make sure SQ entry isn't read before tail */ + return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; +} + static inline bool io_sqring_full(struct io_ring_ctx *ctx) { struct io_rings *r = ctx->rings; - return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == r->sq_ring_entries; + return io_sqring_entries(ctx) == r->sq_ring_entries; } -static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx) +static inline unsigned int io_cqring_events(struct io_ring_ctx *ctx) { - return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head); + return ctx->cached_cq_tail - smp_load_acquire(&ctx->rings->cq.head); +} + +static inline bool io_cqring_full(struct io_ring_ctx *ctx) +{ + struct io_rings *rings = ctx->rings; + + return io_cqring_events(ctx) == rings->cq_ring_entries; } static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) @@ -1717,11 +1732,12 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) unsigned tail; /* - * writes to the cq entry need to come after reading head; the - * control dependency is enough as we're using WRITE_ONCE to - * fill the cq entry + * Writes to the CQ entry must happen after reading the CQ head. The + * load-acquire in io_cqring_events() combined with the store-release + * in liburing guarantee that filling the CQ entry will happen after + * the cq entry has been read from user space. */ - if (__io_cqring_events(ctx) == rings->cq_ring_entries) + if (io_cqring_full(ctx)) return NULL; tail = ctx->cached_cq_tail++; @@ -1778,14 +1794,13 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, struct task_struct *tsk, struct files_struct *files) { - struct io_rings *rings = ctx->rings; struct io_kiocb *req, *tmp; struct io_uring_cqe *cqe; unsigned long flags; bool all_flushed, posted; LIST_HEAD(list); - if (!force && __io_cqring_events(ctx) == rings->cq_ring_entries) + if (!force && io_cqring_full(ctx)) return false; posted = false; @@ -2392,21 +2407,6 @@ static void io_double_put_req(struct io_kiocb *req) io_free_req(req); } -static unsigned io_cqring_events(struct io_ring_ctx *ctx) -{ - /* See comment at the top of this file */ - smp_rmb(); - return __io_cqring_events(ctx); -} - -static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx) -{ - struct io_rings *rings = ctx->rings; - - /* make sure SQ entry isn't read before tail */ - return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head; -} - static unsigned int io_put_kbuf(struct io_kiocb *req, struct io_buffer *kbuf) { unsigned int cflags; @@ -2464,15 +2464,12 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, struct io_kiocb *req; LIST_HEAD(again); - /* order with ->result store in io_complete_rw_iopoll() */ - smp_rmb(); - io_init_req_batch(&rb); while (!list_empty(done)) { int cflags = 0; req = list_first_entry(done, struct io_kiocb, inflight_entry); - if (READ_ONCE(req->result) == -EAGAIN) { + if (req->result == -EAGAIN) { req->result = 0; req->iopoll_completed = 0; list_move_tail(&req->inflight_entry, &again); @@ -2521,7 +2518,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, * If we find a request that requires polling, break out * and complete those lists first, if we have entries there. */ - if (READ_ONCE(req->iopoll_completed)) { + if (smp_load_acquire(&req->iopoll_completed)) { list_move_tail(&req->inflight_entry, &done); continue; } @@ -2533,7 +2530,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, break; /* iopoll may have completed current req */ - if (READ_ONCE(req->iopoll_completed)) + if (smp_load_acquire(&req->iopoll_completed)) list_move_tail(&req->inflight_entry, &done); if (ret && spin) @@ -2768,10 +2765,9 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) if (res != -EAGAIN && res != req->result) req_set_fail_links(req); - WRITE_ONCE(req->result, res); - /* order with io_poll_complete() checking ->result */ - smp_wmb(); - WRITE_ONCE(req->iopoll_completed, 1); + req->result = res; + /* order with io_poll_complete() checking ->iopoll_completed */ + smp_store_release(&req->iopoll_completed, 1); } /* @@ -2804,7 +2800,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, bool in_async) * For fast devices, IO may have already completed. If it has, add * it to the front so we find it first. */ - if (READ_ONCE(req->iopoll_completed)) + if (smp_load_acquire(&req->iopoll_completed)) list_add(&req->inflight_entry, &ctx->iopoll_list); else list_add_tail(&req->inflight_entry, &ctx->iopoll_list); @@ -6749,11 +6745,7 @@ static void io_commit_sqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; - /* - * Ensure any loads from the SQEs are done at this point, - * since once we write the new head, the application could - * write new data to them. - */ + /* Ensure SQE reads happen before user space sees the new SQ head. */ smp_store_release(&rings->sq.head, ctx->cached_sq_head); }