This adds the process-level throttling for the block layer for async buffered writes to io-uring.In io_write the code now checks if the write needs to be throttled. If this is required, it adds the request to the list of pending io requests and starts a timer. After the timer expires, it submits the list of pending writes. - Add new list called pending_ios for delayed writes (throttled writes) to struct io_uring_task. The list is protected by the task_lock spin lock. - Add new timer to struct io_uring_task. Signed-off-by: Stefan Roesch <shr@xxxxxx> --- fs/io_uring.c | 98 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 91 insertions(+), 7 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 792ca4b6834d..8a48e5ee4e5e 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -461,6 +461,11 @@ struct io_ring_ctx { }; }; +struct pending_list { + struct list_head list; + struct io_kiocb *req; +}; + struct io_uring_task { /* submission side */ int cached_refs; @@ -477,6 +482,9 @@ struct io_uring_task { struct io_wq_work_list prior_task_list; struct callback_head task_work; bool task_running; + + struct pending_list pending_ios; + struct timer_list timer; }; /* @@ -1134,13 +1142,14 @@ static void io_rsrc_put_work(struct work_struct *work); static void io_req_task_queue(struct io_kiocb *req); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); -static int io_req_prep_async(struct io_kiocb *req); +static int io_req_prep_async(struct io_kiocb *req, bool force); static int io_install_fixed_file(struct io_kiocb *req, struct file *file, unsigned int issue_flags, u32 slot_index); static int io_close_fixed(struct io_kiocb *req, unsigned int issue_flags); static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer); +static void delayed_write_fn(struct timer_list *tmr); static struct kmem_cache *req_cachep; @@ -2462,6 +2471,31 @@ static void io_req_task_queue_reissue(struct io_kiocb *req) io_req_task_work_add(req, false); } +static int io_req_task_queue_reissue_delayed(struct io_kiocb *req) +{ + struct io_uring_task *tctx = req->task->io_uring; + struct pending_list *pending = kmalloc(sizeof(struct pending_list), GFP_KERNEL); + bool empty; + + if (!pending) + return -ENOMEM; + pending->req = req; + + spin_lock_irq(&tctx->task_lock); + empty = list_empty(&tctx->pending_ios.list); + list_add_tail(&pending->list, &tctx->pending_ios.list); + + if (empty) { + timer_setup(&tctx->timer, delayed_write_fn, 0); + + tctx->timer.expires = current->bdp_pause; + add_timer(&tctx->timer); + } + spin_unlock_irq(&tctx->task_lock); + + return 0; +} + static inline void io_queue_next(struct io_kiocb *req) { struct io_kiocb *nxt = io_req_find_next(req); @@ -2770,7 +2804,7 @@ static bool io_resubmit_prep(struct io_kiocb *req) struct io_async_rw *rw = req->async_data; if (!req_has_async_data(req)) - return !io_req_prep_async(req); + return !io_req_prep_async(req, false); iov_iter_restore(&rw->s.iter, &rw->s.iter_state); return true; } @@ -3751,6 +3785,38 @@ static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return io_prep_rw(req, sqe); } +static inline unsigned long write_delay(void) +{ + if (likely(current->bdp_nr_dirtied_pause == -1 || + !time_before(jiffies, current->bdp_pause))) + return 0; + + return current->bdp_pause; +} + +static void delayed_write_fn(struct timer_list *tmr) +{ + struct io_uring_task *tctx = from_timer(tctx, tmr, timer); + struct list_head *curr; + struct list_head *next; + LIST_HEAD(pending_ios); + + /* Move list to temporary list. */ + spin_lock_irq(&tctx->task_lock); + list_splice_init(&tctx->pending_ios.list, &pending_ios); + spin_unlock_irq(&tctx->task_lock); + + list_for_each_safe(curr, next, &pending_ios) { + struct pending_list *io; + + io = list_entry(curr, struct pending_list, list); + io_req_task_queue_reissue(io->req); + + list_del(curr); + kfree(io); + } +} + static int io_write(struct io_kiocb *req, unsigned int issue_flags) { struct io_rw_state __s, *s = &__s; @@ -3759,6 +3825,18 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK; ssize_t ret, ret2; + /* Write throttling active? */ + if (unlikely(write_delay()) && !(kiocb->ki_flags & IOCB_DIRECT)) { + int ret = io_req_prep_async(req, true); + + if (unlikely(ret)) + io_req_complete_failed(req, ret); + else + ret = io_req_task_queue_reissue_delayed(req); + + return ret; + } + if (!req_has_async_data(req)) { ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags); if (unlikely(ret < 0)) @@ -6596,9 +6674,9 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return -EINVAL; } -static int io_req_prep_async(struct io_kiocb *req) +static int io_req_prep_async(struct io_kiocb *req, bool force) { - if (!io_op_defs[req->opcode].needs_async_setup) + if (!force && !io_op_defs[req->opcode].needs_async_setup) return 0; if (WARN_ON_ONCE(req_has_async_data(req))) return -EFAULT; @@ -6608,6 +6686,10 @@ static int io_req_prep_async(struct io_kiocb *req) switch (req->opcode) { case IORING_OP_READV: return io_rw_prep_async(req, READ); + case IORING_OP_WRITE: + if (!force) + break; + fallthrough; case IORING_OP_WRITEV: return io_rw_prep_async(req, WRITE); case IORING_OP_SENDMSG: @@ -6617,6 +6699,7 @@ static int io_req_prep_async(struct io_kiocb *req) case IORING_OP_CONNECT: return io_connect_prep_async(req); } + printk_once(KERN_WARNING "io_uring: prep_async() bad opcode %d\n", req->opcode); return -EFAULT; @@ -6650,7 +6733,7 @@ static __cold void io_drain_req(struct io_kiocb *req) } spin_unlock(&ctx->completion_lock); - ret = io_req_prep_async(req); + ret = io_req_prep_async(req, false); if (ret) { fail: io_req_complete_failed(req, ret); @@ -7145,7 +7228,7 @@ static void io_queue_sqe_fallback(struct io_kiocb *req) } else if (unlikely(req->ctx->drain_active)) { io_drain_req(req); } else { - int ret = io_req_prep_async(req); + int ret = io_req_prep_async(req, false); if (unlikely(ret)) io_req_complete_failed(req, ret); @@ -7344,7 +7427,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, struct io_kiocb *head = link->head; if (!(req->flags & REQ_F_FAIL)) { - ret = io_req_prep_async(req); + ret = io_req_prep_async(req, false); if (unlikely(ret)) { req_fail_link_node(req, ret); if (!(head->flags & REQ_F_FAIL)) @@ -8784,6 +8867,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task, INIT_WQ_LIST(&tctx->task_list); INIT_WQ_LIST(&tctx->prior_task_list); init_task_work(&tctx->task_work, tctx_task_work); + INIT_LIST_HEAD(&tctx->pending_ios.list); return 0; } -- 2.30.2