Completions may be done in batches, where io_commit_cqring() is called only once at the end. It means, that timeout sequence checks are done only once and don't consider events in between, so potentially failing to trigger some timeouts. Do a separate CQ sequence accounting in u64. On timeout sequence checking look up to UINT_MAX sequence behind, which it could have missed. It's safe to do, because sqe->off is u32 and so can't wrap around to used [seq - UINT_MAX, seq] window. It's also necessary to decouple CQ timeout sequences from ctx->cq_cached_tail for implementing "single CQE per link" feature and others. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index fb8ec4b00375..f09c1d3a7e63 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -298,6 +298,7 @@ struct io_ring_ctx { unsigned cq_entries; unsigned cq_mask; atomic_t cq_timeouts; + u64 cq_seq; unsigned long cq_check_overflow; struct wait_queue_head cq_wait; struct fasync_struct *cq_fasync; @@ -385,7 +386,7 @@ struct io_timeout { u64 addr; int flags; u32 off; - u32 target_seq; + u64 target_seq; }; struct io_rw { @@ -1081,6 +1082,7 @@ static void io_kill_timeout(struct io_kiocb *req) ret = hrtimer_try_to_cancel(&req->io->timeout.timer); if (ret != -1) { atomic_inc(&req->ctx->cq_timeouts); + req->ctx->cq_seq--; list_del_init(&req->list); req->flags |= REQ_F_COMP_LOCKED; io_cqring_fill_event(req, 0); @@ -1098,16 +1100,31 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx) spin_unlock_irq(&ctx->completion_lock); } +static inline bool io_check_in_range(u64 pos, u64 start, u64 end) +{ + /* if @end < @start, check for [end, MAX_U64] + [MAX_U64, start] */ + return (pos - start) <= (end - start); +} + static void __io_flush_timeouts(struct io_ring_ctx *ctx) { + u64 start_seq = ctx->cq_seq; + + + /* + * Batched CQ commit may have left some pending timeouts sequences + * behind @cq_sqe. Look back to find them. Note, that sqe->off is u32, + * and it uses u64 to not falsely trigger timeouts with large off. + */ + start_seq -= UINT_MAX; do { struct io_kiocb *req = list_first_entry(&ctx->timeout_list, struct io_kiocb, list); if (req->flags & REQ_F_TIMEOUT_NOSEQ) break; - if (req->timeout.target_seq != ctx->cached_cq_tail - - atomic_read(&ctx->cq_timeouts)) + if (!io_check_in_range(req->timeout.target_seq, start_seq, + ctx->cq_seq)) break; list_del_init(&req->list); @@ -1143,6 +1160,7 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) return NULL; ctx->cached_cq_tail++; + ctx->cq_seq++; return &rings->cqes[tail & ctx->cq_mask]; } @@ -4537,6 +4555,8 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer) atomic_inc(&ctx->cq_timeouts); spin_lock_irqsave(&ctx->completion_lock, flags); + ctx->cq_seq--; + /* * We could be racing with timeout deletion. If the list is empty, * then timeout lookup already found it and will be handling it. @@ -4660,7 +4680,7 @@ static int io_timeout(struct io_kiocb *req) struct io_ring_ctx *ctx = req->ctx; struct io_timeout_data *data = &req->io->timeout; struct list_head *entry; - u32 tail, off = req->timeout.off; + u32 off = req->timeout.off; spin_lock_irq(&ctx->completion_lock); @@ -4675,8 +4695,7 @@ static int io_timeout(struct io_kiocb *req) goto add; } - tail = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts); - req->timeout.target_seq = tail + off; + req->timeout.target_seq = ctx->cq_seq + off; /* * Insertion sort, ensuring the first entry in the list is always @@ -4684,7 +4703,7 @@ static int io_timeout(struct io_kiocb *req) */ list_for_each_prev(entry, &ctx->timeout_list) { struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list); - u32 nxt_off = nxt->timeout.target_seq - tail; + u32 nxt_off = (u32)(nxt->timeout.target_seq - ctx->cq_seq); if (!(nxt->flags & REQ_F_TIMEOUT_NOSEQ) && (off >= nxt_off)) break; -- 2.24.0