On 12/7/21 21:01, Pavel Begunkov wrote:
On 12/7/21 09:39, Hao Xu wrote:
In previous patches, we have already gathered some tw with
io_req_task_complete() as callback in prior_task_list, let's complete
them in batch while we cannot grab uring lock. In this way, we batch
the req_complete_post path.
[...]
+ if (likely(*uring_locked))
+ req->io_task_work.func(req, uring_locked);
+ else
+ __io_req_complete_post(req, req->result, io_put_kbuf(req));
I think there is the same issue as last time, first iteration of tctx_task_work()
sets ctx but doesn't get uring_lock. Then you go here, find a request with the
same ctx and end up here with locking.
Maybe something like below on top? Totally untested. We basically always
want *uring_locked != *compl_locked, so we don't even need to to store
both vars.
diff --git a/fs/io_uring.c b/fs/io_uring.c
index f224f8df77a1..dfa226bf2c53 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2233,27 +2233,28 @@ static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx)
}
static void handle_prior_tw_list(struct io_wq_work_node *node, struct io_ring_ctx **ctx,
- bool *uring_locked, bool *compl_locked)
+ bool *uring_locked)
{
+ if (*ctx && !*uring_locked)
+ spin_lock(&(*ctx)->completion_lock);
+
do {
struct io_wq_work_node *next = node->next;
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
if (req->ctx != *ctx) {
- if (unlikely(*compl_locked)) {
+ if (unlikely(!*uring_locked && *ctx))
ctx_commit_and_unlock(*ctx);
- *compl_locked = false;
- }
+
ctx_flush_and_put(*ctx, uring_locked);
*ctx = req->ctx;
/* if not contended, grab and improve batching */
*uring_locked = mutex_trylock(&(*ctx)->uring_lock);
- percpu_ref_get(&(*ctx)->refs);
- if (unlikely(!*uring_locked)) {
+ if (unlikely(!*uring_locked))
spin_lock(&(*ctx)->completion_lock);
- *compl_locked = true;
- }
+
+ percpu_ref_get(&(*ctx)->refs);
}
if (likely(*uring_locked))
req->io_task_work.func(req, uring_locked);
@@ -2262,10 +2263,8 @@ static void handle_prior_tw_list(struct io_wq_work_node *node, struct io_ring_ct
node = next;
} while (node);
- if (unlikely(*compl_locked)) {
+ if (unlikely(!*uring_locked))
ctx_commit_and_unlock(*ctx);
- *compl_locked = false;
- }
}
static void handle_tw_list(struct io_wq_work_node *node, struct io_ring_ctx **ctx, bool *locked)
@@ -2289,7 +2288,7 @@ static void handle_tw_list(struct io_wq_work_node *node, struct io_ring_ctx **ct
static void tctx_task_work(struct callback_head *cb)
{
- bool uring_locked = false, compl_locked = false;
+ bool uring_locked = false;
struct io_ring_ctx *ctx = NULL;
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
@@ -2313,7 +2312,7 @@ static void tctx_task_work(struct callback_head *cb)
break;
if (node1)
- handle_prior_tw_list(node1, &ctx, &uring_locked, &compl_locked);
+ handle_prior_tw_list(node1, &ctx, &uring_locked);
if (node2)
handle_tw_list(node2, &ctx, &uring_locked);