On 3/18/20 12:30 PM, Jens Axboe wrote: > RFC patch for now - I've tested it, works for me. Basically it allows > io-wq to grab a whole chain of identically hashed writes, avoiding > hammering on the wqe->lock for detaching hashed work. Here's a v2. Changes: - Fix overlapped hashed work, if we restarted - Wake new worker if we have mixed hashed/unhashed. Unhashed work can always proceed, and if the hashed work is all done without needing IO, then unhashed should not have to wait. diff --git a/fs/io-wq.c b/fs/io-wq.c index 9541df2729de..674be5a3841b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -380,32 +380,56 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) return work->flags >> IO_WQ_HASH_SHIFT; } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, + struct io_wq_work_list *list) __must_hold(wqe->lock) { - struct io_wq_work_node *node, *prev; - struct io_wq_work *work; - unsigned int hash; + struct io_wq_work *work, *ret; + unsigned int new_hash, hash = -1U; - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); + ret = NULL; + while (!wq_list_empty(&wqe->work_list)) { + work = container_of(wqe->work_list.first, struct io_wq_work, + list); /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_node_del(&wqe->work_list, node, prev); - return work; + /* already have hashed work, let new worker get this */ + if (ret) { + struct io_wqe_acct *acct; + + /* get new worker for unhashed, if none now */ + acct = io_work_get_acct(wqe, work); + if (!atomic_read(&acct->nr_running)) + io_wqe_wake_worker(wqe, acct); + break; + } + wq_node_del(&wqe->work_list, &work->list, NULL); + ret = work; + break; } /* hashed, can run if not already running */ - hash = io_get_work_hash(work); - if (!(wqe->hash_map & BIT(hash))) { + new_hash = io_get_work_hash(work); + if (wqe->hash_map & BIT(new_hash)) + break; + + if (hash == -1U) { + hash = new_hash; wqe->hash_map |= BIT(hash); - wq_node_del(&wqe->work_list, node, prev); - return work; + } else if (hash != new_hash) { + break; } + + wq_node_del(&wqe->work_list, &work->list, NULL); + /* return first node, add subsequent same hash to the list */ + if (ret) + wq_list_add_tail(&work->list, list); + else + ret = work; } - return NULL; + return ret; } static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) @@ -481,6 +505,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { + struct io_wq_work_list list = { .first = NULL, .last = NULL }; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; @@ -495,7 +520,7 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe); + work = io_get_next_work(wqe, &list); if (work) __io_worker_busy(wqe, worker, work); else if (!wq_list_empty(&wqe->work_list)) @@ -504,6 +529,7 @@ static void io_worker_handle_work(struct io_worker *worker) spin_unlock_irq(&wqe->lock); if (!work) break; +got_work: io_assign_current_work(worker, work); /* handle a whole dependent link */ @@ -530,6 +556,14 @@ static void io_worker_handle_work(struct io_worker *worker) work = NULL; } if (hash != -1U) { + if (!wq_list_empty(&list)) { + work = container_of(list.first, + struct io_wq_work, + list); + wq_node_del(&list, &work->list, NULL); + goto got_work; + } + spin_lock_irq(&wqe->lock); wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; -- Jens Axboe