In testing high frequency workloads with provided buffers, we spend a lot of time in allocating and freeing the buffer units themselves. Rather than repeatedly free and alloc them, add a recycling cache instead. There are two caches: - ctx->io_buffers_cache. This is the one we grab from in the submission path, and it's protected by ctx->uring_lock. For inline completions, we can recycle straight back to this cache and not need any extra locking. - ctx->io_buffers_comp. If we're not under uring_lock, then we use this list to recycle buffers. It's protected by the completion_lock. On adding a new buffer, check io_buffers_cache. If it's empty, check if we can splice entries from the io_buffers_comp_cache. This reduces about 5-10% of overhead from provided buffers, bringing it pretty close to the non-provided path. Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> --- diff --git a/fs/io_uring.c b/fs/io_uring.c index 23e7f93d3956..e8f5f380530c 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -384,6 +384,7 @@ struct io_ring_ctx { struct list_head ltimeout_list; struct list_head cq_overflow_list; struct xarray io_buffers; + struct list_head io_buffers_cache; struct xarray personalities; u32 pers_next; unsigned sq_thread_idle; @@ -426,6 +427,8 @@ struct io_ring_ctx { struct hlist_head *cancel_hash; unsigned cancel_hash_bits; bool poll_multi_queue; + + struct list_head io_buffers_comp; } ____cacheline_aligned_in_smp; struct io_restriction restrictions; @@ -441,6 +444,8 @@ struct io_ring_ctx { struct llist_head rsrc_put_llist; struct list_head rsrc_ref_list; spinlock_t rsrc_ref_lock; + + struct list_head io_buffers_pages; }; /* Keep this last, we don't need it for the fast path */ @@ -1279,24 +1284,56 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req, } } -static unsigned int __io_put_kbuf(struct io_kiocb *req) +static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list) { struct io_buffer *kbuf = req->kbuf; unsigned int cflags; - cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT; - cflags |= IORING_CQE_F_BUFFER; + cflags = IORING_CQE_F_BUFFER | (kbuf->bid << IORING_CQE_BUFFER_SHIFT); req->flags &= ~REQ_F_BUFFER_SELECTED; - kfree(kbuf); + list_add(&kbuf->list, list); req->kbuf = NULL; return cflags; } -static inline unsigned int io_put_kbuf(struct io_kiocb *req) +static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req) { if (likely(!(req->flags & REQ_F_BUFFER_SELECTED))) return 0; - return __io_put_kbuf(req); + return __io_put_kbuf(req, &req->ctx->io_buffers_comp); +} + +static inline unsigned int io_put_kbuf(struct io_kiocb *req, + unsigned issue_flags) +{ + unsigned int cflags; + + if (likely(!(req->flags & REQ_F_BUFFER_SELECTED))) + return 0; + + /* + * We can add this buffer back to two lists: + * + * 1) The io_buffers_cache list. This one is protected by the + * ctx->uring_lock. If we already hold this lock, add back to this + * list as we can grab it from issue as well. + * 2) The io_buffers_comp list. This one is protected by the + * ctx->completion_lock. + * + * We migrate buffers from the comp_list to the issue cache list + * when we need one. + */ + if (issue_flags & IO_URING_F_UNLOCKED) { + struct io_ring_ctx *ctx = req->ctx; + + spin_lock(&ctx->completion_lock); + cflags = __io_put_kbuf(req, &ctx->io_buffers_comp); + spin_unlock(&ctx->completion_lock); + } else { + cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache); + } + + return cflags; } static bool io_match_task(struct io_kiocb *head, struct task_struct *task, @@ -1444,6 +1481,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) init_waitqueue_head(&ctx->sqo_sq_wait); INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); + INIT_LIST_HEAD(&ctx->io_buffers_cache); init_completion(&ctx->ref_comp); xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1); xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1); @@ -1452,6 +1490,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->timeout_lock); INIT_WQ_LIST(&ctx->iopoll_list); + INIT_LIST_HEAD(&ctx->io_buffers_pages); + INIT_LIST_HEAD(&ctx->io_buffers_comp); INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); @@ -2330,7 +2370,8 @@ static void handle_prev_tw_list(struct io_wq_work_node *node, if (likely(*uring_locked)) req->io_task_work.func(req, uring_locked); else - __io_req_complete_post(req, req->result, io_put_kbuf(req)); + __io_req_complete_post(req, req->result, + io_put_kbuf_comp(req)); node = next; } while (node); @@ -2680,7 +2721,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin) if (unlikely(req->flags & REQ_F_CQE_SKIP)) continue; - __io_fill_cqe(req, req->result, io_put_kbuf(req)); + __io_fill_cqe(req, req->result, io_put_kbuf(req, 0)); nr_events++; } @@ -2856,14 +2897,14 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res) static inline void io_req_task_complete(struct io_kiocb *req, bool *locked) { - unsigned int cflags = io_put_kbuf(req); int res = req->result; if (*locked) { - io_req_complete_state(req, res, cflags); + io_req_complete_state(req, res, io_put_kbuf(req, 0)); io_req_add_compl_list(req); } else { - io_req_complete_post(req, res, cflags); + io_req_complete_post(req, res, + io_put_kbuf(req, IO_URING_F_UNLOCKED)); } } @@ -2872,7 +2913,8 @@ static void __io_complete_rw(struct io_kiocb *req, long res, { if (__io_complete_rw_common(req, res)) return; - __io_req_complete(req, issue_flags, req->result, io_put_kbuf(req)); + __io_req_complete(req, issue_flags, req->result, + io_put_kbuf(req, issue_flags)); } static void io_complete_rw(struct kiocb *kiocb, long res) @@ -4519,13 +4561,11 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf, nxt = list_first_entry(&buf->list, struct io_buffer, list); list_del(&nxt->list); - kfree(nxt); if (++i == nbufs) return i; cond_resched(); } i++; - kfree(buf); xa_erase(&ctx->io_buffers, bgid); return i; @@ -4591,17 +4631,63 @@ static int io_provide_buffers_prep(struct io_kiocb *req, return 0; } -static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head) +static int io_refill_buffer_cache(struct io_ring_ctx *ctx) +{ + struct io_buffer *buf; + struct page *page; + int bufs_in_page; + + /* + * Completions that don't happen inline (eg not under uring_lock) will + * add to ->io_buffers_comp. If we don't have any free buffers, check + * the completion list and splice those entries first. + */ + if (!list_empty_careful(&ctx->io_buffers_comp)) { + spin_lock(&ctx->completion_lock); + if (!list_empty(&ctx->io_buffers_comp)) { + list_splice_init(&ctx->io_buffers_comp, + &ctx->io_buffers_cache); + spin_unlock(&ctx->completion_lock); + return 0; + } + spin_unlock(&ctx->completion_lock); + } + + /* + * No free buffers and no completion entries either. Allocate a new + * page worth of buffer entries and add those to our freelist. + */ + page = alloc_page(GFP_KERNEL_ACCOUNT); + if (!page) + return -ENOMEM; + + list_add(&page->lru, &ctx->io_buffers_pages); + + buf = page_address(page); + bufs_in_page = PAGE_SIZE / sizeof(*buf); + while (bufs_in_page) { + list_add_tail(&buf->list, &ctx->io_buffers_cache); + buf++; + bufs_in_page--; + } + + return 0; +} + +static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf, + struct io_buffer **head) { struct io_buffer *buf; u64 addr = pbuf->addr; int i, bid = pbuf->bid; for (i = 0; i < pbuf->nbufs; i++) { - buf = kmalloc(sizeof(*buf), GFP_KERNEL_ACCOUNT); - if (!buf) + if (list_empty(&ctx->io_buffers_cache) && + io_refill_buffer_cache(ctx)) break; - + buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer, + list); + list_del(&buf->list); buf->addr = addr; buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT); buf->bid = bid; @@ -4633,7 +4719,7 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) list = head = xa_load(&ctx->io_buffers, p->bgid); - ret = io_add_buffers(p, &head); + ret = io_add_buffers(ctx, p, &head); if (ret >= 0 && !list) { ret = xa_insert(&ctx->io_buffers, p->bgid, head, GFP_KERNEL); if (ret < 0) @@ -5244,7 +5330,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags) if (kmsg->free_iov) kfree(kmsg->free_iov); req->flags &= ~REQ_F_NEED_CLEANUP; - __io_req_complete(req, issue_flags, ret, io_put_kbuf(req)); + __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags)); return 0; } @@ -5299,7 +5385,7 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags) out_free: req_set_fail(req); } - __io_req_complete(req, issue_flags, ret, io_put_kbuf(req)); + __io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags)); return 0; } @@ -6719,7 +6805,7 @@ static __cold void io_drain_req(struct io_kiocb *req) static void io_clean_op(struct io_kiocb *req) { if (req->flags & REQ_F_BUFFER_SELECTED) - io_put_kbuf(req); + io_put_kbuf_comp(req); if (req->flags & REQ_F_NEED_CLEANUP) { switch (req->opcode) { @@ -9494,6 +9580,14 @@ static void io_destroy_buffers(struct io_ring_ctx *ctx) xa_for_each(&ctx->io_buffers, index, buf) __io_remove_buffers(ctx, buf, index, -1U); + + while (!list_empty(&ctx->io_buffers_pages)) { + struct page *page; + + page = list_first_entry(&ctx->io_buffers_pages, struct page, lru); + list_del_init(&page->lru); + __free_page(page); + } } static void io_req_caches_free(struct io_ring_ctx *ctx) -- Jens Axboe