[PATCH 5/5] io_uring: request refcounting skipping

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The intention here is to skip request refcounting in most hot cases,
and use only when it's really needed. So, it should save 2 refcount
atomics per request for IOPOLL and IRQ completions, and 1 atomic per
req for inline completions.

There are cases, however, where the refcounting is enabled back:
- Polling, including apoll. Because double poll entries takes a ref.
  Might get relaxed in the near future.
- Link timeouts, for both, the timeout and the request it's bound to,
  because they work in-parallel and we need to synchronise to cancel one
  of them on completion.
- When in io-wq, because it doesn't hold uring_lock, so the guarantees
  described below doesn't work, and there is also io_wq_free_work().

TL;DR;
Requests were always given with two references. One is called completion
and is generally put at the moment I/O is completed. The second is
submission reference, which is usually put during submission, e.g. by
__io_queue_sqe(). It was needed, because the code actually issuing a
request (e.g. fs, the block layer, etc.), may punt it for async
execution, but still access associated memory while unwinding back to
io_uring.

First, let's notice that now all requests are returned back into the
request cache and not actually kfreed(). Also, we take requests out of
it only under ->uring_lock. So, if I/O submission is also protected by
the mutex, and even if io_issue_sqe() completes a request deep down the
stack, the memory is not going to be freed and the request won't be
re-allocated until the submission stack unwinds back and we unlock
the mutex. If it's not protected by the mutex, we're in io-wq, which
takes a reference anyway resembling submission referencing.

Same with other parts that may get accessed on the way back,
->async_data deallocation is delayed up to io_alloc_req(), and
rsrc deallocation is also now protected by the mutex.

With all that we can kill off submission references. And next, in most
cases we have only one reference travelling along the execution path,
so we can replace it with a flag allowing to enable refcounting when
needed.

Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx>
---
 fs/io_uring.c | 58 ++++++++++++++++++++++++++++++++-------------------
 1 file changed, 36 insertions(+), 22 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 5a08cc967199..d65621247709 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -712,6 +712,7 @@ enum {
 	REQ_F_REISSUE_BIT,
 	REQ_F_DONT_REISSUE_BIT,
 	REQ_F_CREDS_BIT,
+	REQ_F_REFCOUNT_BIT,
 	/* keep async read/write and isreg together and in order */
 	REQ_F_NOWAIT_READ_BIT,
 	REQ_F_NOWAIT_WRITE_BIT,
@@ -767,6 +768,8 @@ enum {
 	REQ_F_ISREG		= BIT(REQ_F_ISREG_BIT),
 	/* has creds assigned */
 	REQ_F_CREDS		= BIT(REQ_F_CREDS_BIT),
+	/* skip refcounting if not set */
+	REQ_F_REFCOUNT		= BIT(REQ_F_REFCOUNT_BIT),
 };
 
 struct async_poll {
@@ -1090,26 +1093,40 @@ EXPORT_SYMBOL(io_uring_get_socket);
 
 static inline bool req_ref_inc_not_zero(struct io_kiocb *req)
 {
+	WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
 	return atomic_inc_not_zero(&req->refs);
 }
 
 static inline bool req_ref_put_and_test(struct io_kiocb *req)
 {
+	if (likely(!(req->flags & REQ_F_REFCOUNT)))
+		return true;
+
 	WARN_ON_ONCE(req_ref_zero_or_close_to_overflow(req));
 	return atomic_dec_and_test(&req->refs);
 }
 
 static inline void req_ref_put(struct io_kiocb *req)
 {
+	WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
 	WARN_ON_ONCE(req_ref_put_and_test(req));
 }
 
 static inline void req_ref_get(struct io_kiocb *req)
 {
+	WARN_ON_ONCE(!(req->flags & REQ_F_REFCOUNT));
 	WARN_ON_ONCE(req_ref_zero_or_close_to_overflow(req));
 	atomic_inc(&req->refs);
 }
 
+static inline void io_req_refcount(struct io_kiocb *req)
+{
+	if (!(req->flags & REQ_F_REFCOUNT)) {
+		req->flags |= REQ_F_REFCOUNT;
+		atomic_set(&req->refs, 1);
+	}
+}
+
 static inline void io_req_set_rsrc_node(struct io_kiocb *req)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -1706,7 +1723,6 @@ static inline void io_req_complete(struct io_kiocb *req, long res)
 static void io_req_complete_failed(struct io_kiocb *req, long res)
 {
 	req_set_fail(req);
-	io_put_req(req);
 	io_req_complete_post(req, res, 0);
 }
 
@@ -1761,7 +1777,14 @@ static bool io_flush_cached_reqs(struct io_ring_ctx *ctx)
 	return nr != 0;
 }
 
+/*
+ * A request might get retired back into the request caches even before opcode
+ * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
+ * Because of that, io_alloc_req() should be called only under ->uring_lock
+ * and with extra caution to not get a request that is still worked on.
+ */
 static struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
+	__must_hold(&req->ctx->uring_lock)
 {
 	struct io_submit_state *state = &ctx->submit_state;
 	gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
@@ -1883,8 +1906,6 @@ static void io_fail_links(struct io_kiocb *req)
 
 		trace_io_uring_fail_link(req, link);
 		io_cqring_fill_event(link->ctx, link->user_data, -ECANCELED, 0);
-
-		io_put_req(link);
 		io_put_req_deferred(link);
 		link = nxt;
 	}
@@ -2166,8 +2187,6 @@ static void io_submit_flush_completions(struct io_ring_ctx *ctx)
 	for (i = 0; i < nr; i++) {
 		struct io_kiocb *req = state->compl_reqs[i];
 
-		/* submission and completion refs */
-		io_put_req(req);
 		if (req_ref_put_and_test(req))
 			io_req_free_batch(&rb, req, &ctx->submit_state);
 	}
@@ -2272,7 +2291,6 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 		if (READ_ONCE(req->result) == -EAGAIN && resubmit &&
 		    !(req->flags & REQ_F_DONT_REISSUE)) {
 			req->iopoll_completed = 0;
-			req_ref_get(req);
 			io_req_task_queue_reissue(req);
 			continue;
 		}
@@ -2787,7 +2805,6 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
 	if (check_reissue && (req->flags & REQ_F_REISSUE)) {
 		req->flags &= ~REQ_F_REISSUE;
 		if (io_resubmit_prep(req)) {
-			req_ref_get(req);
 			io_req_task_queue_reissue(req);
 		} else {
 			int cflags = 0;
@@ -3213,9 +3230,6 @@ static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
 
 	req->rw.kiocb.ki_flags &= ~IOCB_WAITQ;
 	list_del_init(&wait->entry);
-
-	/* submit ref gets dropped, acquire a new one */
-	req_ref_get(req);
 	io_req_task_queue(req);
 	return 1;
 }
@@ -5220,6 +5234,7 @@ static int io_arm_poll_handler(struct io_kiocb *req)
 	req->apoll = apoll;
 	req->flags |= REQ_F_POLLED;
 	ipt.pt._qproc = io_async_queue_proc;
+	io_req_refcount(req);
 
 	ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask,
 					io_async_wake);
@@ -5267,10 +5282,6 @@ static bool io_poll_remove_one(struct io_kiocb *req)
 		io_cqring_fill_event(req->ctx, req->user_data, -ECANCELED, 0);
 		io_commit_cqring(req->ctx);
 		req_set_fail(req);
-
-		/* non-poll requests have submit ref still */
-		if (req->opcode != IORING_OP_POLL_ADD)
-			io_put_req(req);
 		io_put_req_deferred(req);
 	}
 	return do_complete;
@@ -5414,6 +5425,7 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
 	if (flags & ~IORING_POLL_ADD_MULTI)
 		return -EINVAL;
 
+	io_req_refcount(req);
 	poll->events = io_poll_parse_events(sqe, flags);
 	return 0;
 }
@@ -6290,6 +6302,10 @@ static void io_wq_submit_work(struct io_wq_work *work)
 	struct io_kiocb *timeout;
 	int ret = 0;
 
+	io_req_refcount(req);
+	/* will be dropped by ->io_free_work() after returning to io-wq */
+	req_ref_get(req);
+
 	timeout = io_prep_linked_timeout(req);
 	if (timeout)
 		io_queue_linked_timeout(timeout);
@@ -6312,11 +6328,8 @@ static void io_wq_submit_work(struct io_wq_work *work)
 	}
 
 	/* avoid locking problems by failing it from a clean context */
-	if (ret) {
-		/* io-wq is going to take one down */
-		req_ref_get(req);
+	if (ret)
 		io_req_task_queue_fail(req, ret);
-	}
 }
 
 static inline struct io_fixed_file *io_fixed_file_slot(struct io_file_table *table,
@@ -6450,6 +6463,11 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
 	    nxt->opcode != IORING_OP_LINK_TIMEOUT)
 		return NULL;
 
+	/* linked timeouts should have two refs once prep'ed */
+	io_req_refcount(req);
+	io_req_refcount(nxt);
+	req_ref_get(nxt);
+
 	nxt->timeout.head = req;
 	nxt->flags |= REQ_F_LTIMEOUT_ACTIVE;
 	req->flags |= REQ_F_LINK_TIMEOUT;
@@ -6478,8 +6496,6 @@ static void __io_queue_sqe(struct io_kiocb *req)
 			state->compl_reqs[state->compl_nr++] = req;
 			if (state->compl_nr == ARRAY_SIZE(state->compl_reqs))
 				io_submit_flush_completions(ctx);
-		} else {
-			io_put_req(req);
 		}
 	} else if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
 		switch (io_arm_poll_handler(req)) {
@@ -6559,8 +6575,6 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	req->user_data = READ_ONCE(sqe->user_data);
 	req->file = NULL;
 	req->fixed_rsrc_refs = NULL;
-	/* one is dropped after submission, the other at completion */
-	atomic_set(&req->refs, 2);
 	req->task = current;
 
 	/* enforce forwards compatibility on users */
-- 
2.32.0




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux