When we check if we have work to run, we grab the acct lock, check, drop it, and then return the result. If we do have work to run, then running the work will again grab acct->lock and get the work item. This causes us to grab acct->lock more frequently than we need to. If we have work to do, have io_acct_run_queue() return with the acct lock still acquired. io_worker_handle_work() is then always invoked with the acct lock already held. In a simple test cases that stats files (IORING_OP_STATX always hits io-wq), we see a nice reduction in locking overhead with this change: 19.32% -12.55% [kernel.kallsyms] [k] __cmpwait_case_32 20.90% -12.07% [kernel.kallsyms] [k] queued_spin_lock_slowpath Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- io_uring/io-wq.c | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 3e7025b9e0dd..18a049fc53ef 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -232,17 +232,25 @@ static void io_worker_exit(struct io_worker *worker) do_exit(0); } -static inline bool io_acct_run_queue(struct io_wq_acct *acct) +static inline bool __io_acct_run_queue(struct io_wq_acct *acct) { - bool ret = false; + return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) && + !wq_list_empty(&acct->work_list); +} +/* + * If there's work to do, returns true with acct->lock acquired. If not, + * returns false with no lock held. + */ +static inline bool io_acct_run_queue(struct io_wq_acct *acct) + __acquires(&acct->lock) +{ raw_spin_lock(&acct->lock); - if (!wq_list_empty(&acct->work_list) && - !test_bit(IO_ACCT_STALLED_BIT, &acct->flags)) - ret = true; - raw_spin_unlock(&acct->lock); + if (__io_acct_run_queue(acct)) + return true; - return ret; + raw_spin_unlock(&acct->lock); + return false; } /* @@ -397,6 +405,7 @@ static void io_wq_dec_running(struct io_worker *worker) if (!io_acct_run_queue(acct)) return; + raw_spin_unlock(&acct->lock); atomic_inc(&acct->nr_running); atomic_inc(&wq->worker_refs); io_queue_worker_create(worker, acct, create_worker_cb); @@ -521,9 +530,13 @@ static void io_assign_current_work(struct io_worker *worker, raw_spin_unlock(&worker->lock); } -static void io_worker_handle_work(struct io_worker *worker) +/* + * Called with acct->lock held, drops it before returning + */ +static void io_worker_handle_work(struct io_wq_acct *acct, + struct io_worker *worker) + __releases(&acct->lock) { - struct io_wq_acct *acct = io_wq_get_acct(worker); struct io_wq *wq = worker->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); @@ -537,7 +550,6 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - raw_spin_lock(&acct->lock); work = io_get_next_work(acct, worker); raw_spin_unlock(&acct->lock); if (work) { @@ -591,6 +603,10 @@ static void io_worker_handle_work(struct io_worker *worker) wake_up(&wq->hash->wait); } } while (work); + + if (!__io_acct_run_queue(acct)) + break; + raw_spin_lock(&acct->lock); } while (1); } @@ -611,8 +627,13 @@ static int io_wq_worker(void *data) long ret; set_current_state(TASK_INTERRUPTIBLE); + + /* + * If we have work to do, io_acct_run_queue() returns with + * the acct->lock held. If not, it will drop it. + */ while (io_acct_run_queue(acct)) - io_worker_handle_work(worker); + io_worker_handle_work(acct, worker); raw_spin_lock(&wq->lock); /* @@ -645,8 +666,8 @@ static int io_wq_worker(void *data) } } - if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) - io_worker_handle_work(worker); + if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) + io_worker_handle_work(acct, worker); io_worker_exit(worker); return 0; -- 2.40.1