Add a new ubuf_info callback io_uring_tx_zerocopy_callback(), which should post an CQE when it completes. Also, implement some infrastructuire for allocating and managing struct ubuf_info. Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 114 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 6 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index a01f91e70fa5..6ca02e60fa48 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -329,6 +329,11 @@ struct io_submit_state { }; struct io_tx_notifier { + struct ubuf_info uarg; + struct work_struct commit_work; + struct percpu_ref *fixed_rsrc_refs; + u64 tag; + u32 seq; }; struct io_tx_ctx { @@ -1275,15 +1280,20 @@ static void io_rsrc_refs_refill(struct io_ring_ctx *ctx) percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH); } +static inline void io_set_rsrc_node(struct percpu_ref **rsrc_refs, + struct io_ring_ctx *ctx) +{ + *rsrc_refs = &ctx->rsrc_node->refs; + ctx->rsrc_cached_refs--; + if (unlikely(ctx->rsrc_cached_refs < 0)) + io_rsrc_refs_refill(ctx); +} + static inline void io_req_set_rsrc_node(struct io_kiocb *req, struct io_ring_ctx *ctx) { - if (!req->fixed_rsrc_refs) { - req->fixed_rsrc_refs = &ctx->rsrc_node->refs; - ctx->rsrc_cached_refs--; - if (unlikely(ctx->rsrc_cached_refs < 0)) - io_rsrc_refs_refill(ctx); - } + if (!req->fixed_rsrc_refs) + io_set_rsrc_node(&req->fixed_rsrc_refs, ctx); } static void io_refs_resurrect(struct percpu_ref *ref, struct completion *compl) @@ -1930,6 +1940,76 @@ static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, return __io_fill_cqe(ctx, user_data, res, cflags); } +static void io_zc_tx_work_callback(struct work_struct *work) +{ + struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier, + commit_work); + struct io_ring_ctx *ctx = notifier->uarg.ctx; + + spin_lock(&ctx->completion_lock); + io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0); + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + io_cqring_ev_posted(ctx); + + percpu_ref_put(notifier->fixed_rsrc_refs); + percpu_ref_put(&ctx->refs); + kfree(notifier); +} + +static void io_uring_tx_zerocopy_callback(struct sk_buff *skb, + struct ubuf_info *uarg, + bool success) +{ + struct io_tx_notifier *notifier; + + notifier = container_of(uarg, struct io_tx_notifier, uarg); + if (!refcount_dec_and_test(&uarg->refcnt)) + return; + + if (in_interrupt()) { + INIT_WORK(¬ifier->commit_work, io_zc_tx_work_callback); + queue_work(system_unbound_wq, ¬ifier->commit_work); + } else { + io_zc_tx_work_callback(¬ifier->commit_work); + } +} + +static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx, + struct io_tx_ctx *tx_ctx) +{ + struct io_tx_notifier *notifier; + struct ubuf_info *uarg; + + notifier = kmalloc(sizeof(*notifier), GFP_ATOMIC); + if (!notifier) + return NULL; + + WARN_ON_ONCE(!current->io_uring); + notifier->seq = tx_ctx->seq++; + notifier->tag = tx_ctx->tag; + io_set_rsrc_node(¬ifier->fixed_rsrc_refs, ctx); + + uarg = ¬ifier->uarg; + uarg->ctx = ctx; + uarg->flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN; + uarg->callback = io_uring_tx_zerocopy_callback; + refcount_set(&uarg->refcnt, 1); + percpu_ref_get(&ctx->refs); + return notifier; +} + +__attribute__((unused)) +static inline struct io_tx_notifier *io_get_tx_notifier(struct io_ring_ctx *ctx, + struct io_tx_ctx *tx_ctx) +{ + if (tx_ctx->notifier) + return tx_ctx->notifier; + + tx_ctx->notifier = io_alloc_tx_notifier(ctx, tx_ctx); + return tx_ctx->notifier; +} + static void io_req_complete_post(struct io_kiocb *req, s32 res, u32 cflags) { @@ -9212,11 +9292,27 @@ static int io_buffer_validate(struct iovec *iov) return 0; } +static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx) +{ + struct io_tx_ctx *tx_ctx; + int i; + + for (i = 0; i < ctx->nr_tx_ctxs; i++) { + tx_ctx = &ctx->tx_ctxs[i]; + if (!tx_ctx->notifier) + continue; + io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg, + true); + tx_ctx->notifier = NULL; + } +} + static int io_sqe_tx_ctx_unregister(struct io_ring_ctx *ctx) { if (!ctx->nr_tx_ctxs) return -ENXIO; + io_sqe_tx_ctx_kill_ubufs(ctx); kvfree(ctx->tx_ctxs); ctx->tx_ctxs = NULL; ctx->nr_tx_ctxs = 0; @@ -9608,6 +9704,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) io_sq_thread_unpark(sqd); } + if (READ_ONCE(ctx->nr_tx_ctxs)) { + mutex_lock(&ctx->uring_lock); + io_sqe_tx_ctx_kill_ubufs(ctx); + mutex_unlock(&ctx->uring_lock); + } + io_req_caches_free(ctx); if (WARN_ON_ONCE(time_after(jiffies, timeout))) { -- 2.34.0