RFC patch for now - I've tested it, works for me. Basically it allows io-wq to grab a whole chain of identically hashed writes, avoiding hammering on the wqe->lock for detaching hashed work. diff --git a/fs/io-wq.c b/fs/io-wq.c index 9541df2729de..d9a50670d47b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -380,32 +380,44 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work) return work->flags >> IO_WQ_HASH_SHIFT; } -static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) +static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, + struct io_wq_work_list *list) __must_hold(wqe->lock) { - struct io_wq_work_node *node, *prev; - struct io_wq_work *work; - unsigned int hash; + struct io_wq_work *work, *ret; + unsigned int new_hash, hash = -1U; - wq_list_for_each(node, prev, &wqe->work_list) { - work = container_of(node, struct io_wq_work, list); + ret = NULL; + while (!wq_list_empty(&wqe->work_list)) { + work = container_of(wqe->work_list.first, struct io_wq_work, + list); /* not hashed, can run anytime */ if (!io_wq_is_hashed(work)) { - wq_node_del(&wqe->work_list, node, prev); - return work; + if (hash != -1U) + break; + wq_node_del(&wqe->work_list, &work->list, NULL); + ret = work; + break; } /* hashed, can run if not already running */ - hash = io_get_work_hash(work); - if (!(wqe->hash_map & BIT(hash))) { - wqe->hash_map |= BIT(hash); - wq_node_del(&wqe->work_list, node, prev); - return work; - } + new_hash = io_get_work_hash(work); + if (hash == -1U) + hash = new_hash; + else if (hash != new_hash) + break; + + wqe->hash_map |= BIT(hash); + wq_node_del(&wqe->work_list, &work->list, NULL); + /* return first node, add subsequent same hash to the list */ + if (ret) + wq_list_add_tail(&work->list, list); + else + ret = work; } - return NULL; + return ret; } static void io_wq_switch_mm(struct io_worker *worker, struct io_wq_work *work) @@ -481,6 +493,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work); static void io_worker_handle_work(struct io_worker *worker) __releases(wqe->lock) { + struct io_wq_work_list list = { .first = NULL, .last = NULL }; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; @@ -495,7 +508,7 @@ static void io_worker_handle_work(struct io_worker *worker) * can't make progress, any work completion or insertion will * clear the stalled flag. */ - work = io_get_next_work(wqe); + work = io_get_next_work(wqe, &list); if (work) __io_worker_busy(wqe, worker, work); else if (!wq_list_empty(&wqe->work_list)) @@ -504,6 +517,7 @@ static void io_worker_handle_work(struct io_worker *worker) spin_unlock_irq(&wqe->lock); if (!work) break; +got_work: io_assign_current_work(worker, work); /* handle a whole dependent link */ @@ -530,6 +544,14 @@ static void io_worker_handle_work(struct io_worker *worker) work = NULL; } if (hash != -1U) { + if (!wq_list_empty(&list)) { + work = container_of(list.first, + struct io_wq_work, + list); + wq_node_del(&list, &work->list, NULL); + goto got_work; + } + spin_lock_irq(&wqe->lock); wqe->hash_map &= ~BIT_ULL(hash); wqe->flags &= ~IO_WQE_FLAG_STALLED; -- Jens Axboe