From: Hao Xu <howeyxu@xxxxxxxxxxx> In uringlet mode, we allow exact one worker to submit sqes at the same time. nr_running is not a good choice to aim that. Add an member wq->owner and its lock to achieve that, this avoids race condition between workers. Signed-off-by: Hao Xu <howeyxu@xxxxxxxxxxx> --- io_uring/io-wq.c | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/io_uring/io-wq.c b/io_uring/io-wq.c index 00a1cdefb787..9fcaeea7a478 100644 --- a/io_uring/io-wq.c +++ b/io_uring/io-wq.c @@ -96,6 +96,9 @@ struct io_wq { void *private; + raw_spinlock_t lock; + struct io_worker *owner; + struct io_wqe *wqes[]; }; @@ -381,6 +384,8 @@ static inline bool io_worker_test_submit(struct io_worker *worker) return worker->flags & IO_WORKER_F_SUBMIT; } +#define IO_WQ_OWNER_TRANSMIT ((struct io_worker *)-1) + static void io_wqe_dec_running(struct io_worker *worker) { struct io_wqe_acct *acct = io_wqe_get_acct(worker); @@ -401,6 +406,10 @@ static void io_wqe_dec_running(struct io_worker *worker) io_uringlet_end(wq->private); io_worker_set_scheduled(worker); + raw_spin_lock(&wq->lock); + wq->owner = IO_WQ_OWNER_TRANSMIT; + raw_spin_unlock(&wq->lock); + raw_spin_lock(&wqe->lock); rcu_read_lock(); activated = io_wqe_activate_free_worker(wqe, acct); @@ -674,6 +683,17 @@ static void io_wqe_worker_let(struct io_worker *worker) while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { unsigned int empty_count = 0; + struct io_worker *owner; + + raw_spin_lock(&wq->lock); + owner = wq->owner; + if (owner && owner != IO_WQ_OWNER_TRANSMIT && owner != worker) { + raw_spin_unlock(&wq->lock); + set_current_state(TASK_INTERRUPTIBLE); + goto sleep; + } + wq->owner = worker; + raw_spin_unlock(&wq->lock); __io_worker_busy(wqe, worker); set_current_state(TASK_INTERRUPTIBLE); @@ -697,6 +717,7 @@ static void io_wqe_worker_let(struct io_worker *worker) cond_resched(); } while (1); +sleep: raw_spin_lock(&wqe->lock); __io_worker_idle(wqe, worker); raw_spin_unlock(&wqe->lock); @@ -780,6 +801,14 @@ int io_uringlet_offload(struct io_wq *wq) struct io_wqe_acct *acct = io_get_acct(wqe, true); bool waken; + raw_spin_lock(&wq->lock); + if (wq->owner) { + raw_spin_unlock(&wq->lock); + return 0; + } + wq->owner = IO_WQ_OWNER_TRANSMIT; + raw_spin_unlock(&wq->lock); + raw_spin_lock(&wqe->lock); rcu_read_lock(); waken = io_wqe_activate_free_worker(wqe, acct); @@ -1248,6 +1277,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) wq->free_work = data->free_work; wq->do_work = data->do_work; wq->private = data->private; + raw_spin_lock_init(&wq->lock); ret = -ENOMEM; for_each_node(node) { -- 2.25.1