On 24/09/2019 11:36, Jens Axboe wrote: > On 9/24/19 2:27 AM, Jens Axboe wrote: >> On 9/24/19 2:02 AM, Jens Axboe wrote: >>> On 9/24/19 1:06 AM, Pavel Begunkov wrote: >>>> On 24/09/2019 02:00, Jens Axboe wrote: >>>>>> I think we can do the same thing, just wrapping the waitqueue in a >>>>>> structure with a count in it, on the stack. Got some flight time >>>>>> coming up later today, let me try and cook up a patch. >>>>> >>>>> Totally untested, and sent out 5 min before departure... But something >>>>> like this. >>>> Hmm, reminds me my first version. Basically that's the same thing but >>>> with macroses inlined. I wanted to make it reusable and self-contained, >>>> though. >>>> >>>> If you don't think it could be useful in other places, sure, we could do >>>> something like that. Is that so? >>> >>> I totally agree it could be useful in other places. Maybe formalized and >>> used with wake_up_nr() instead of adding a new primitive? Haven't looked >>> into that, I may be talking nonsense. >>> >>> In any case, I did get a chance to test it and it works for me. Here's >>> the "finished" version, slightly cleaned up and with a comment added >>> for good measure. >> >> Notes: >> >> This version gets the ordering right, you need exclusive waits to get >> fifo ordering on the waitqueue. >> >> Both versions (yours and mine) suffer from the problem of potentially >> waking too many. I don't think this is a real issue, as generally we >> don't do threaded access to the io_urings. But if you had the following >> tasks wait on the cqring: >> >> [min_events = 32], [min_events = 8], [min_events = 8] >> >> and we reach the io_cqring_events() == threshold, we'll wake all three. >> I don't see a good solution to this, so I suspect we just live with >> until proven an issue. Both versions are much better than what we have >> now. > > Forgot an issue around signal handling, version below adds the > right check for that too. It seems to be a good reason to not keep reimplementing "prepare_to_wait*() + wait loop" every time, but keep it in sched :) > > Curious what your test case was for this? You mean a performance test case? It's briefly described in a comment for the second patch. That's just rewritten io_uring-bench, with 1. a thread generating 1 request per call in a loop 2. and the second thread waiting for ~128 events. Both are pinned to the same core. > > > diff --git a/fs/io_uring.c b/fs/io_uring.c > index ca7570aca430..3fbab5692f14 100644 > --- a/fs/io_uring.c > +++ b/fs/io_uring.c > @@ -2768,6 +2768,42 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, > return submit; > } > > +struct io_wait_queue { > + struct wait_queue_entry wq; > + struct io_ring_ctx *ctx; > + struct task_struct *task; > + unsigned to_wait; > + unsigned nr_timeouts; > +}; > + > +static inline bool io_should_wake(struct io_wait_queue *iowq) > +{ > + struct io_ring_ctx *ctx = iowq->ctx; > + > + /* > + * Wake up if we have enough events, or if a timeout occured since we > + * started waiting. For timeouts, we always want to return to userspace, > + * regardless of event count. > + */ > + return io_cqring_events(ctx->rings) >= iowq->to_wait || > + atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; > +} > + > +static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, > + int wake_flags, void *key) > +{ > + struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, > + wq); > + > + if (io_should_wake(iowq)) { > + list_del_init(&curr->entry); > + wake_up_process(iowq->task); > + return 1; > + } > + > + return -1; > +} > + > /* > * Wait until events become available, if we don't already have some. The > * application must reap them itself, as they reside on the shared cq ring. > @@ -2775,8 +2811,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit, > static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, > const sigset_t __user *sig, size_t sigsz) > { > + struct io_wait_queue iowq = { > + .wq = { > + .func = io_wake_function, > + .entry = LIST_HEAD_INIT(iowq.wq.entry), > + }, > + .task = current, > + .ctx = ctx, > + .to_wait = min_events, > + }; > struct io_rings *rings = ctx->rings; > - unsigned nr_timeouts; > int ret; > > if (io_cqring_events(rings) >= min_events) > @@ -2795,15 +2839,18 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, > return ret; > } > > - nr_timeouts = atomic_read(&ctx->cq_timeouts); > - /* > - * Return if we have enough events, or if a timeout occured since > - * we started waiting. For timeouts, we always want to return to > - * userspace. > - */ > - ret = wait_event_interruptible(ctx->wait, > - io_cqring_events(rings) >= min_events || > - atomic_read(&ctx->cq_timeouts) != nr_timeouts); > + iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); > + prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); > + do { > + if (io_should_wake(&iowq)) > + break; > + schedule(); > + if (signal_pending(current)) > + break; > + set_current_state(TASK_INTERRUPTIBLE); > + } while (1); > + finish_wait(&ctx->wait, &iowq.wq); > + > restore_saved_sigmask_unless(ret == -ERESTARTSYS); > if (ret == -ERESTARTSYS) > ret = -EINTR; > -- Yours sincerely, Pavel Begunkov
Attachment:
signature.asc
Description: OpenPGP digital signature