On 9/24/19 2:27 AM, Jens Axboe wrote: > On 9/24/19 2:02 AM, Jens Axboe wrote: >> On 9/24/19 1:06 AM, Pavel Begunkov wrote: >>> On 24/09/2019 02:00, Jens Axboe wrote: >>>>> I think we can do the same thing, just wrapping the waitqueue in a >>>>> structure with a count in it, on the stack. Got some flight time >>>>> coming up later today, let me try and cook up a patch. >>>> >>>> Totally untested, and sent out 5 min before departure... But something >>>> like this. >>> Hmm, reminds me my first version. Basically that's the same thing but >>> with macroses inlined. I wanted to make it reusable and self-contained, >>> though. >>> >>> If you don't think it could be useful in other places, sure, we could do >>> something like that. Is that so? >> >> I totally agree it could be useful in other places. Maybe formalized and >> used with wake_up_nr() instead of adding a new primitive? Haven't looked >> into that, I may be talking nonsense. >> >> In any case, I did get a chance to test it and it works for me. Here's >> the "finished" version, slightly cleaned up and with a comment added >> for good measure. > > Notes: > > This version gets the ordering right, you need exclusive waits to get > fifo ordering on the waitqueue. > > Both versions (yours and mine) suffer from the problem of potentially > waking too many. I don't think this is a real issue, as generally we > don't do threaded access to the io_urings. But if you had the following > tasks wait on the cqring: > > [min_events = 32], [min_events = 8], [min_events = 8] > > and we reach the io_cqring_events() == threshold, we'll wake all three. > I don't see a good solution to this, so I suspect we just live with > until proven an issue. Both versions are much better than what we have > now. Forgot an issue around signal handling, version below adds the right check for that too. Curious what your test case was for this? diff --git a/fs/io_uring.c b/fs/io_uring.c index ca7570aca430..3fbab5692f14 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -2768,6 +2768,42 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, return submit; } +struct io_wait_queue { + struct wait_queue_entry wq; + struct io_ring_ctx *ctx; + struct task_struct *task; + unsigned to_wait; + unsigned nr_timeouts; +}; + +static inline bool io_should_wake(struct io_wait_queue *iowq) +{ + struct io_ring_ctx *ctx = iowq->ctx; + + /* + * Wake up if we have enough events, or if a timeout occured since we + * started waiting. For timeouts, we always want to return to userspace, + * regardless of event count. + */ + return io_cqring_events(ctx->rings) >= iowq->to_wait || + atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; +} + +static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, + int wake_flags, void *key) +{ + struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, + wq); + + if (io_should_wake(iowq)) { + list_del_init(&curr->entry); + wake_up_process(iowq->task); + return 1; + } + + return -1; +} + /* * Wait until events become available, if we don't already have some. The * application must reap them itself, as they reside on the shared cq ring. @@ -2775,8 +2811,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, const sigset_t __user *sig, size_t sigsz) { + struct io_wait_queue iowq = { + .wq = { + .func = io_wake_function, + .entry = LIST_HEAD_INIT(iowq.wq.entry), + }, + .task = current, + .ctx = ctx, + .to_wait = min_events, + }; struct io_rings *rings = ctx->rings; - unsigned nr_timeouts; int ret; if (io_cqring_events(rings) >= min_events) @@ -2795,15 +2839,18 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, return ret; } - nr_timeouts = atomic_read(&ctx->cq_timeouts); - /* - * Return if we have enough events, or if a timeout occured since - * we started waiting. For timeouts, we always want to return to - * userspace. - */ - ret = wait_event_interruptible(ctx->wait, - io_cqring_events(rings) >= min_events || - atomic_read(&ctx->cq_timeouts) != nr_timeouts); + iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); + prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); + do { + if (io_should_wake(&iowq)) + break; + schedule(); + if (signal_pending(current)) + break; + set_current_state(TASK_INTERRUPTIBLE); + } while (1); + finish_wait(&ctx->wait, &iowq.wq); + restore_saved_sigmask_unless(ret == -ERESTARTSYS); if (ret == -ERESTARTSYS) ret = -EINTR; -- Jens Axboe