On 15/11/2019 22:34, Jens Axboe wrote: > How about something like this? Should work (and be valid) to have any > sequence of timeout links, as long as there's something in front of it. > Commit message has more details. If you don't mind, I'll give a try rewriting this. A bit tight on time, so hopefully by this Sunday. In any case, there are enough of edge cases, I need to spend some time to review and check it. > > > commit 437fd0500d08f32444747b861c72cd58a4b833d5 > Author: Jens Axboe <axboe@xxxxxxxxx> > Date: Thu Nov 14 19:39:52 2019 -0700 > > io_uring: fix sequencing issues with linked timeouts > > We have an issue with timeout links that are deeper in the submit chain, > because we only handle it upfront, not from later submissions. Move the > prep + issue of the timeout link to the async work prep handler, and do > it normally for non-async queue. If we validate and prepare the timeout > links upfront when we first see them, there's nothing stopping us from > supporting any sort of nesting. > > Ensure that we handle the sequencing of linked timeouts correctly as > well. The loop in io_req_link_next() isn't necessary, and it will never > do anything, because we can't have both REQ_F_LINK_TIMEOUT set and have > dependent links. REQ1 -> LINKED_TIMEOUT -> REQ2 -> REQ3 Is this a valid case? Just to check that I got this "can't have both" right. If no, why so? I think there are a lot of use cases for this. > > Reported-by: Pavel Begunkov <asml.silence@xxxxxxxxx> > Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> > > diff --git a/fs/io_uring.c b/fs/io_uring.c > index a0204b48866f..47d61899b8ba 100644 > --- a/fs/io_uring.c > +++ b/fs/io_uring.c > @@ -352,6 +352,7 @@ struct io_kiocb { > #define REQ_F_MUST_PUNT 4096 /* must be punted even for NONBLOCK */ > #define REQ_F_INFLIGHT 8192 /* on inflight list */ > #define REQ_F_COMP_LOCKED 16384 /* completion under lock */ > +#define REQ_F_FREE_SQE 32768 > u64 user_data; > u32 result; > u32 sequence; > @@ -390,6 +391,8 @@ static void __io_free_req(struct io_kiocb *req); > static void io_put_req(struct io_kiocb *req); > static void io_double_put_req(struct io_kiocb *req); > static void __io_double_put_req(struct io_kiocb *req); > +static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req); > +static void io_queue_linked_timeout(struct io_kiocb *req); > > static struct kmem_cache *req_cachep; > > @@ -524,7 +527,8 @@ static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) > opcode == IORING_OP_WRITE_FIXED); > } > > -static inline bool io_prep_async_work(struct io_kiocb *req) > +static inline bool io_prep_async_work(struct io_kiocb *req, > + struct io_kiocb **link) > { > bool do_hashed = false; > > @@ -553,13 +557,17 @@ static inline bool io_prep_async_work(struct io_kiocb *req) > req->work.flags |= IO_WQ_WORK_NEEDS_USER; > } > > + *link = io_prep_linked_timeout(req); > return do_hashed; > } > > static inline void io_queue_async_work(struct io_kiocb *req) > { > - bool do_hashed = io_prep_async_work(req); > struct io_ring_ctx *ctx = req->ctx; > + struct io_kiocb *link; > + bool do_hashed; > + > + do_hashed = io_prep_async_work(req, &link); > > trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work, > req->flags); > @@ -569,6 +577,9 @@ static inline void io_queue_async_work(struct io_kiocb *req) > io_wq_enqueue_hashed(ctx->io_wq, &req->work, > file_inode(req->file)); > } > + > + if (link) > + io_queue_linked_timeout(link); > } > > static void io_kill_timeout(struct io_kiocb *req) > @@ -868,30 +879,26 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr) > * safe side. > */ > nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list); > - while (nxt) { > + if (nxt) { > list_del_init(&nxt->list); > if (!list_empty(&req->link_list)) { > INIT_LIST_HEAD(&nxt->link_list); > list_splice(&req->link_list, &nxt->link_list); > nxt->flags |= REQ_F_LINK; > + } else if (req->flags & REQ_F_LINK_TIMEOUT) { > + wake_ev = io_link_cancel_timeout(nxt); > + nxt = NULL; > } > > /* > * If we're in async work, we can continue processing the chain > * in this context instead of having to queue up new async work. > */ > - if (req->flags & REQ_F_LINK_TIMEOUT) { > - wake_ev = io_link_cancel_timeout(nxt); > - > - /* we dropped this link, get next */ > - nxt = list_first_entry_or_null(&req->link_list, > - struct io_kiocb, list); > - } else if (nxtptr && io_wq_current_is_worker()) { > - *nxtptr = nxt; > - break; > - } else { > - io_queue_async_work(nxt); > - break; > + if (nxt) { > + if (nxtptr && io_wq_current_is_worker()) > + *nxtptr = nxt; > + else > + io_queue_async_work(nxt); > } > } > > @@ -916,6 +923,9 @@ static void io_fail_links(struct io_kiocb *req) > > trace_io_uring_fail_link(req, link); > > + if (req->flags & REQ_F_FREE_SQE) > + kfree(link->submit.sqe); > + > if ((req->flags & REQ_F_LINK_TIMEOUT) && > link->submit.sqe->opcode == IORING_OP_LINK_TIMEOUT) { > io_link_cancel_timeout(link); > @@ -2651,8 +2661,12 @@ static void io_wq_submit_work(struct io_wq_work **workptr) > > /* if a dependent link is ready, pass it back */ > if (!ret && nxt) { > - io_prep_async_work(nxt); > + struct io_kiocb *link; > + > + io_prep_async_work(nxt, &link); > *workptr = &nxt->work; > + if (link) > + io_queue_linked_timeout(link); > } > } > > @@ -2832,7 +2846,6 @@ static int io_validate_link_timeout(struct io_kiocb *req) > static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) > { > struct io_kiocb *nxt; > - int ret; > > if (!(req->flags & REQ_F_LINK)) > return NULL; > @@ -2841,14 +2854,6 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) > if (!nxt || nxt->submit.sqe->opcode != IORING_OP_LINK_TIMEOUT) > return NULL; > > - ret = io_validate_link_timeout(nxt); > - if (ret) { > - list_del_init(&nxt->list); > - io_cqring_add_event(nxt, ret); > - io_double_put_req(nxt); > - return ERR_PTR(-ECANCELED); > - } > - > if (nxt->submit.sqe->timeout_flags & IORING_TIMEOUT_ABS) > nxt->timeout.data->mode = HRTIMER_MODE_ABS; > else > @@ -2862,16 +2867,9 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) > > static void __io_queue_sqe(struct io_kiocb *req) > { > - struct io_kiocb *nxt; > + struct io_kiocb *nxt = io_prep_linked_timeout(req); > int ret; > > - nxt = io_prep_linked_timeout(req); > - if (IS_ERR(nxt)) { > - ret = PTR_ERR(nxt); > - nxt = NULL; > - goto err; > - } > - > ret = __io_submit_sqe(req, NULL, true); > > /* > @@ -2899,10 +2897,6 @@ static void __io_queue_sqe(struct io_kiocb *req) > * submit reference when the iocb is actually submitted. > */ > io_queue_async_work(req); > - > - if (nxt) > - io_queue_linked_timeout(nxt); > - > return; > } > } > @@ -2947,6 +2941,10 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow) > int need_submit = false; > struct io_ring_ctx *ctx = req->ctx; > > + if (unlikely(req->flags & REQ_F_FAIL_LINK)) { > + ret = -ECANCELED; > + goto err; > + } > if (!shadow) { > io_queue_sqe(req); > return; > @@ -2961,9 +2959,11 @@ static void io_queue_link_head(struct io_kiocb *req, struct io_kiocb *shadow) > ret = io_req_defer(req); > if (ret) { > if (ret != -EIOCBQUEUED) { > +err: > io_cqring_add_event(req, ret); > io_double_put_req(req); > - __io_free_req(shadow); > + if (shadow) > + __io_free_req(shadow); > return; > } > } else { > @@ -3020,6 +3020,14 @@ static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state, > if (*link) { > struct io_kiocb *prev = *link; > > + if (READ_ONCE(s->sqe->opcode) == IORING_OP_LINK_TIMEOUT) { > + ret = io_validate_link_timeout(req); > + if (ret) { > + prev->flags |= REQ_F_FAIL_LINK; > + goto err_req; > + } > + } > + > sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL); > if (!sqe_copy) { > ret = -EAGAIN; > -- Pavel Begunkov
Attachment:
signature.asc
Description: OpenPGP digital signature