From: Hao Xu <haoxu@xxxxxxxxxxxxxxxxx> [ Upstream commit 3d4e4face9c1548752a2891e98b38b100feee336 ] There is an acct->nr_worker visit without lock protection. Think about the case: two callers call io_wqe_wake_worker(), one is the original context and the other one is an io-worker(by calling io_wqe_enqueue(wqe, linked)), on two cpus paralelly, this may cause nr_worker to be larger than max_worker. Let's fix it by adding lock for it, and let's do nr_workers++ before create_io_worker. There may be a edge cause that the first caller fails to create an io-worker, but the second caller doesn't know it and then quit creating io-worker as well: say nr_worker = max_worker - 1 cpu 0 cpu 1 io_wqe_wake_worker() io_wqe_wake_worker() nr_worker < max_worker nr_worker++ create_io_worker() nr_worker == max_worker failed return return But the chance of this case is very slim. Fixes: 685fe7feedb9 ("io-wq: eliminate the need for a manager thread") Signed-off-by: Hao Xu <haoxu@xxxxxxxxxxxxxxxxx> [axboe: fix unconditional create_io_worker() call] Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> Signed-off-by: Sasha Levin <sashal@xxxxxxxxxx> --- fs/io-wq.c | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/fs/io-wq.c b/fs/io-wq.c index 9efecdf025b9..e00ac0969470 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -248,10 +248,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) ret = io_wqe_activate_free_worker(wqe); rcu_read_unlock(); - if (!ret && acct->nr_workers < acct->max_workers) { - atomic_inc(&acct->nr_running); - atomic_inc(&wqe->wq->worker_refs); - create_io_worker(wqe->wq, wqe, acct->index); + if (!ret) { + bool do_create = false; + + raw_spin_lock_irq(&wqe->lock); + if (acct->nr_workers < acct->max_workers) { + atomic_inc(&acct->nr_running); + atomic_inc(&wqe->wq->worker_refs); + acct->nr_workers++; + do_create = true; + } + raw_spin_unlock_irq(&wqe->lock); + if (do_create) + create_io_worker(wqe->wq, wqe, acct->index); } } @@ -640,6 +649,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) kfree(worker); fail: atomic_dec(&acct->nr_running); + raw_spin_lock_irq(&wqe->lock); + acct->nr_workers--; + raw_spin_unlock_irq(&wqe->lock); io_worker_ref_put(wq); return; } @@ -655,9 +667,8 @@ fail: worker->flags |= IO_WORKER_F_FREE; if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; - if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) + if ((acct->nr_workers == 1) && (worker->flags & IO_WORKER_F_BOUND)) worker->flags |= IO_WORKER_F_FIXED; - acct->nr_workers++; raw_spin_unlock_irq(&wqe->lock); wake_up_new_task(tsk); } -- 2.30.2