On Tue, Dec 18, 2018 at 08:42:16AM -0700, Jens Axboe wrote: > We have to add each submitted polled request to the io_context > poll_submitted list, which means we have to grab the poll_lock. We > already use the block plug to batch submissions if we're doing a batch > of IO submissions, extend that to cover the poll requests internally as > well. The way we make this conditional looks rather odd. What about something like the untested patch below to clean this up a bit? I also think that we don't need the explicit aio_flush_state_reqs call in aio_submit_state_end given that blk_finish_plug already calls the supplied callback. index d22913b7de74..ebf5d787d03b 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -235,18 +235,18 @@ struct aio_kiocb { }; struct aio_submit_state { +#ifdef CONFIG_BLOCK struct kioctx *ctx; struct blk_plug plug; -#ifdef CONFIG_BLOCK struct blk_plug_cb plug_cb; -#endif /* * Polled iocbs that have been submitted, but not added to the ctx yet */ struct list_head req_list; unsigned int req_count; +#endif }; /*------ sysctl variables----*/ @@ -263,15 +263,6 @@ static struct vfsmount *aio_mnt; static const struct file_operations aio_ring_fops; static const struct address_space_operations aio_ctx_aops; -/* - * We rely on block level unplugs to flush pending requests, if we schedule - */ -#ifdef CONFIG_BLOCK -static const bool aio_use_state_req_list = true; -#else -static const bool aio_use_state_req_list = false; -#endif - static void aio_iopoll_reap_events(struct kioctx *); static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages) @@ -1849,52 +1840,55 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret) } } +#ifdef CONFIG_BLOCK /* - * Called either at the end of IO submission, or through a plug callback - * because we're going to schedule. Moves out local batch of requests to - * the ctx poll list, so they can be found for polling + reaping. + * Called either at the end of IO submission, or through a plug callback because + * we're going to schedule. Moves out local batch of requests to the ctx poll + * list, so they can be found for polling + reaping. */ -static void aio_flush_state_reqs(struct kioctx *ctx, - struct aio_submit_state *state) +static void aio_flush_state_reqs(struct aio_submit_state *state) { - spin_lock(&ctx->poll_lock); - list_splice_tail_init(&state->req_list, &ctx->poll_submitted); - spin_unlock(&ctx->poll_lock); + spin_lock(&state->ctx->poll_lock); + list_splice_tail_init(&state->req_list, &state->ctx->poll_submitted); + spin_unlock(&state->ctx->poll_lock); state->req_count = 0; } +static void aio_iopoll_add_iocb_to_state(struct aio_submit_state *state, + struct aio_kiocb *kiocb) +{ + /* + * For fast devices, IO may have already completed. If it has, add it to + * the front so we find it first. We can't add to the poll_done list as + * that is used without locking from the completion side. + */ + if (test_bit(KIOCB_F_POLL_COMPLETED, &kiocb->ki_flags)) + list_add(&kiocb->ki_list, &state->req_list); + else + list_add_tail(&kiocb->ki_list, &state->req_list); + if (++state->req_count >= AIO_IOPOLL_BATCH) + aio_flush_state_reqs(state); +} +#else +#define aio_iopoll_add_iocb_to_state(state, kiocb) do { } while (0) +#endif + /* * After the iocb has been issued, it's safe to be found on the poll list. * Adding the kiocb to the list AFTER submission ensures that we don't * find it from a io_getevents() thread before the issuer is done accessing * the kiocb cookie. */ -static void aio_iopoll_iocb_issued(struct aio_submit_state *state, - struct aio_kiocb *kiocb) +static void aio_iopoll_add_iocb_to_list(struct aio_kiocb *kiocb) { - /* - * For fast devices, IO may have already completed. If it has, add - * it to the front so we find it first. We can't add to the poll_done - * list as that's unlocked from the completion side. - */ - const int front = test_bit(KIOCB_F_POLL_COMPLETED, &kiocb->ki_flags); struct kioctx *ctx = kiocb->ki_ctx; - if (!state || !aio_use_state_req_list) { - spin_lock(&ctx->poll_lock); - if (front) - list_add(&kiocb->ki_list, &ctx->poll_submitted); - else - list_add_tail(&kiocb->ki_list, &ctx->poll_submitted); - spin_unlock(&ctx->poll_lock); - } else { - if (front) - list_add(&kiocb->ki_list, &state->req_list); - else - list_add_tail(&kiocb->ki_list, &state->req_list); - if (++state->req_count >= AIO_IOPOLL_BATCH) - aio_flush_state_reqs(ctx, state); - } + spin_lock(&ctx->poll_lock); + if (test_bit(KIOCB_F_POLL_COMPLETED, &kiocb->ki_flags)) + list_add(&kiocb->ki_list, &ctx->poll_submitted); + else + list_add_tail(&kiocb->ki_list, &ctx->poll_submitted); + spin_unlock(&ctx->poll_lock); } static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb, @@ -2294,7 +2288,11 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb, ret = -EAGAIN; goto out_put_req; } - aio_iopoll_iocb_issued(state, req); + + if (state) + aio_iopoll_add_iocb_to_state(state, req); + else + aio_iopoll_add_iocb_to_list(req); } return 0; out_put_req: @@ -2325,9 +2323,8 @@ static void aio_state_unplug(struct blk_plug_cb *cb, bool from_schedule) state = container_of(cb, struct aio_submit_state, plug_cb); if (!list_empty(&state->req_list)) - aio_flush_state_reqs(state->ctx, state); + aio_flush_state_reqs(state); } -#endif /* * Batched submission is done, ensure local IO is flushed out. @@ -2336,24 +2333,27 @@ static void aio_submit_state_end(struct aio_submit_state *state) { blk_finish_plug(&state->plug); if (!list_empty(&state->req_list)) - aio_flush_state_reqs(state->ctx, state); + aio_flush_state_reqs(state); } /* * Start submission side cache. */ -static void aio_submit_state_start(struct aio_submit_state *state, - struct kioctx *ctx) +static struct aio_submit_state * +aio_submit_state_start(struct aio_submit_state *state, struct kioctx *ctx) { state->ctx = ctx; INIT_LIST_HEAD(&state->req_list); state->req_count = 0; -#ifdef CONFIG_BLOCK state->plug_cb.callback = aio_state_unplug; blk_start_plug(&state->plug); list_add(&state->plug_cb.list, &state->plug.cb_list); -#endif + return state; } +#else +#define aio_submit_state_end(state) do { } while (0) +#define aio_submit_state_start(state, ctx) (state) +#endif /* CONFIG_BLOCK */ /* sys_io_submit: * Queue the nr iocbs pointed to by iocbpp for processing. Returns @@ -2387,10 +2387,9 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr, if (nr > ctx->nr_events) nr = ctx->nr_events; - if (nr > AIO_PLUG_THRESHOLD) { - aio_submit_state_start(&state, ctx); - statep = &state; - } + if (IS_ENABLED(CONFIG_BLOCK) && nr > AIO_PLUG_THRESHOLD) + statep = aio_submit_state_start(&state, ctx); + for (i = 0; i < nr; i++) { struct iocb __user *user_iocb;