sequence mode timeouts wait not for sqe->off CQEs, but rather sqe->off + number of prior inflight requests with a quirk ignoring other timeouts completions. Wait exactly for sqe->off using completion count (tail) for accounting. Reported-by: Jens Axboe <axboe@xxxxxxxxx> Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 120 +++++++++++++++++++------------------------------- 1 file changed, 46 insertions(+), 74 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 65458eda2127..148f61734572 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -384,7 +384,8 @@ struct io_timeout { struct file *file; u64 addr; int flags; - u32 count; + u32 off; + u32 target_seq; }; struct io_rw { @@ -986,23 +987,6 @@ static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx) return NULL; } -static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx) -{ - struct io_kiocb *req; - - req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list); - if (req) { - if (req->flags & REQ_F_TIMEOUT_NOSEQ) - return NULL; - if (!__req_need_defer(req)) { - list_del_init(&req->list); - return req; - } - } - - return NULL; -} - static void __io_commit_cqring(struct io_ring_ctx *ctx) { struct io_rings *rings = ctx->rings; @@ -1118,12 +1102,42 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx) spin_unlock_irq(&ctx->completion_lock); } +static inline bool io_check_in_range(u32 pos, u32 start, u32 end) +{ + /* if @end < @start, check for [end, MAX_UINT] + [MAX_UINT, start] */ + return (pos - start) <= (end - start); +} + +static void __io_flush_timeouts(struct io_ring_ctx *ctx) +{ + u32 end, start; + + start = end = ctx->cached_cq_tail; + do { + struct io_kiocb *req = list_first_entry(&ctx->timeout_list, + struct io_kiocb, list); + + if (req->flags & REQ_F_TIMEOUT_NOSEQ) + break; + /* + * multiple timeouts may have the same target, + * check that @req is in [first_tail, cur_tail] + */ + if (!io_check_in_range(req->timeout.target_seq, start, end)) + break; + + list_del_init(&req->list); + io_kill_timeout(req); + end = ctx->cached_cq_tail; + } while (!list_empty(&ctx->timeout_list)); +} + static void io_commit_cqring(struct io_ring_ctx *ctx) { struct io_kiocb *req; - while ((req = io_get_timeout_req(ctx)) != NULL) - io_kill_timeout(req); + if (!list_empty(&ctx->timeout_list)) + __io_flush_timeouts(ctx); __io_commit_cqring(ctx); @@ -4582,20 +4596,8 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) * We could be racing with timeout deletion. If the list is empty, * then timeout lookup already found it and will be handling it. */ - if (!list_empty(&req->list)) { - struct io_kiocb *prev; - - /* - * Adjust the reqs sequence before the current one because it - * will consume a slot in the cq_ring and the cq_tail - * pointer will be increased, otherwise other timeout reqs may - * return in advance without waiting for enough wait_nr. - */ - prev = req; - list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list) - prev->sequence++; + if (!list_empty(&req->list)) list_del_init(&req->list); - } io_cqring_fill_event(req, -ETIME); io_commit_cqring(ctx); @@ -4675,18 +4677,19 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, { struct io_timeout_data *data; unsigned flags; + u32 off = READ_ONCE(sqe->off); if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EINVAL; if (sqe->ioprio || sqe->buf_index || sqe->len != 1) return -EINVAL; - if (sqe->off && is_timeout_link) + if (off && is_timeout_link) return -EINVAL; flags = READ_ONCE(sqe->timeout_flags); if (flags & ~IORING_TIMEOUT_ABS) return -EINVAL; - req->timeout.count = READ_ONCE(sqe->off); + req->timeout.off = off; if (!req->io && io_alloc_async_ctx(req)) return -ENOMEM; @@ -4710,68 +4713,37 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, static int io_timeout(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - struct io_timeout_data *data; + struct io_timeout_data *data = &req->io->timeout; struct list_head *entry; - unsigned span = 0; - u32 count = req->timeout.count; - u32 seq = req->sequence; + u32 tail, off = req->timeout.off; - data = &req->io->timeout; + spin_lock_irq(&ctx->completion_lock); /* * sqe->off holds how many events that need to occur for this * timeout event to be satisfied. If it isn't set, then this is * a pure timeout request, sequence isn't used. */ - if (!count) { + if (!off) { req->flags |= REQ_F_TIMEOUT_NOSEQ; - spin_lock_irq(&ctx->completion_lock); entry = ctx->timeout_list.prev; goto add; } - req->sequence = seq + count; + tail = ctx->cached_cq_tail; + req->timeout.target_seq = tail + off; /* * Insertion sort, ensuring the first entry in the list is always * the one we need first. */ - spin_lock_irq(&ctx->completion_lock); list_for_each_prev(entry, &ctx->timeout_list) { struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list); - unsigned nxt_seq; - long long tmp, tmp_nxt; - u32 nxt_offset = nxt->timeout.count; - - if (nxt->flags & REQ_F_TIMEOUT_NOSEQ) - continue; - - /* - * Since seq + count can overflow, use type long - * long to store it. - */ - tmp = (long long)seq + count; - nxt_seq = nxt->sequence - nxt_offset; - tmp_nxt = (long long)nxt_seq + nxt_offset; + u32 nxt_off = nxt->timeout.target_seq - tail; - /* - * cached_sq_head may overflow, and it will never overflow twice - * once there is some timeout req still be valid. - */ - if (seq < nxt_seq) - tmp += UINT_MAX; - - if (tmp > tmp_nxt) + if (!(nxt->flags & REQ_F_TIMEOUT_NOSEQ) && (off >= nxt_off)) break; - - /* - * Sequence of reqs after the insert one and itself should - * be adjusted because each timeout req consumes a slot. - */ - span++; - nxt->sequence++; } - req->sequence -= span; add: list_add(&req->list, entry); data->timer.function = io_timeout_fn; -- 2.24.0