From: Hao Xu <howeyxu@xxxxxxxxxxx> Add a new parameter when creating new workers to indicate if users want a normal or fixed worker. Signed-off-by: Hao Xu <howeyxu@xxxxxxxxxxx> --- io_uring/io-wq.c | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index bf9e9af8d9ca..048856eef4d4 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -137,7 +137,7 @@ struct io_cb_cancel_data { bool cancel_all; }; -static bool create_io_worker(struct io_wq *wq, int index); +static bool create_io_worker(struct io_wq *wq, int index, bool fixed); static void io_wq_dec_running(struct io_worker *worker); static bool io_acct_cancel_pending_work(struct io_wq *wq, struct io_wq_acct *acct, @@ -284,7 +284,8 @@ static bool io_wq_activate_free_worker(struct io_wq *wq, * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one. */ -static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) +static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct, + bool fixed) { /* * Most likely an attempt to queue unbounded work on an io_wq that @@ -302,7 +303,7 @@ static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) raw_spin_unlock(&wq->lock); atomic_inc(&acct->nr_running); atomic_inc(&wq->worker_refs); - return create_io_worker(wq, acct->index); + return create_io_worker(wq, acct->index, fixed); } static void io_wq_inc_running(struct io_worker *worker) @@ -312,6 +313,11 @@ static void io_wq_inc_running(struct io_worker *worker) atomic_inc(&acct->nr_running); } +static bool is_fixed_worker(struct io_worker *worker) +{ + return worker->flags & IO_WORKER_F_FIXED; +} + static void create_worker_cb(struct callback_head *cb) { struct io_worker *worker; @@ -331,7 +337,7 @@ static void create_worker_cb(struct callback_head *cb) } raw_spin_unlock(&wq->lock); if (do_create) { - create_io_worker(wq, worker->create_index); + create_io_worker(wq, worker->create_index, is_fixed_worker(worker)); } else { atomic_dec(&acct->nr_running); io_worker_ref_put(wq); @@ -398,6 +404,8 @@ static void io_wq_dec_running(struct io_worker *worker) return; if (!io_acct_run_queue(acct)) return; + if (is_fixed_worker(worker)) + return; atomic_inc(&acct->nr_running); atomic_inc(&wq->worker_refs); @@ -601,11 +609,6 @@ static bool is_worker_exiting(struct io_worker *worker) return worker->flags & IO_WORKER_F_EXIT; } -static bool is_fixed_worker(struct io_worker *worker) -{ - return worker->flags & IO_WORKER_F_FIXED; -} - static int io_wq_worker(void *data) { struct io_worker *worker = data; @@ -806,7 +809,7 @@ static void io_workqueue_create(struct work_struct *work) kfree(worker); } -static bool create_io_worker(struct io_wq *wq, int index) +static bool create_io_worker(struct io_wq *wq, int index, bool fixed) { struct io_wq_acct *acct = &wq->acct[index]; struct io_worker *worker; @@ -833,10 +836,14 @@ static bool create_io_worker(struct io_wq *wq, int index) if (index == IO_WQ_ACCT_BOUND) worker->flags |= IO_WORKER_F_BOUND; + if (fixed) + worker->flags |= IO_WORKER_F_FIXED; + tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE); if (!IS_ERR(tsk)) { - io_init_new_worker(wq, worker, tsk); - } else if (!io_should_retry_thread(PTR_ERR(tsk))) { + if (!fixed) + io_init_new_worker(wq, worker, tsk); + } else if (fixed || !io_should_retry_thread(PTR_ERR(tsk))) { kfree(worker); goto fail; } else { @@ -947,7 +954,7 @@ void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) !atomic_read(&acct->nr_running))) { bool did_create; - did_create = io_wq_create_worker(wq, acct); + did_create = io_wq_create_worker(wq, acct, false); if (likely(did_create)) return; -- 2.25.1