Re: [PATCH 12/12] io_uring: add support for ring mapped supplied buffers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Sat, 2022-04-30 at 14:50 -0600, Jens Axboe wrote:
> Provided buffers allow an application to supply io_uring with buffers
> that can then be grabbed for a read/receive request, when the data
> source is ready to deliver data. The existing scheme relies on using
> IORING_OP_PROVIDE_BUFFERS to do that, but it can be difficult to use
> in real world applications. It's pretty efficient if the application
> is able to supply back batches of provided buffers when they have
> been
> consumed and the application is ready to recycle them, but if
> fragmentation occurs in the buffer space, it can become difficult to
> supply enough buffers at the time. This hurts efficiency.
> 
> Add a register op, IORING_REGISTER_PBUF_RING, which allows an
> application
> to setup a shared queue for each buffer group of provided buffers.
> The
> application can then supply buffers simply by adding them to this
> ring,
> and the kernel can consume then just as easily. The ring shares the
> head
> with the application, the tail remains private in the kernel.
> 
> Provided buffers setup with IORING_REGISTER_PBUF_RING cannot use
> IORING_OP_{PROVIDE,REMOVE}_BUFFERS for adding or removing entries to
> the
> ring, they must use the mapped ring. Mapped provided buffer rings can
> co-exist with normal provided buffers, just not within the same group
> ID.
> 
> To gauge overhead of the existing scheme and evaluate the mapped ring
> approach, a simple NOP benchmark was written. It uses a ring of 128
> entries, and submits/completes 32 at the time. 'Replenish' is how
> many buffers are provided back at the time after they have been
> consumed:
> 
> Test                    Replenish                       NOPs/sec
> ================================================================
> No provided buffers     NA                              ~30M
> Provided buffers        32                              ~16M
> Provided buffers         1                              ~10M
> Ring buffers            32                              ~27M
> Ring buffers             1                              ~27M
> 
> The ring mapped buffers perform almost as well as not using provided
> buffers at all, and they don't care if you provided 1 or more back at
> the same time. This means application can just replenish as they go,
> rather than need to batch and compact, further reducing overhead in
> the
> application. The NOP benchmark above doesn't need to do any
> compaction,
> so that overhead isn't even reflected in the above test.
> 
> Signed-off-by: Jens Axboe <axboe@xxxxxxxxx>
> ---
>  fs/io_uring.c                 | 227 +++++++++++++++++++++++++++++++-
> --
>  include/uapi/linux/io_uring.h |  26 ++++
>  2 files changed, 238 insertions(+), 15 deletions(-)
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index 3d5d02b40347..a9691727652c 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -285,7 +285,16 @@ struct io_rsrc_data {
>  struct io_buffer_list {
>         struct list_head list;
>         struct list_head buf_list;
> +       struct page **buf_pages;
>         __u16 bgid;
> +
> +       /* below is for ring provided buffers */
> +       __u16 buf_nr_pages;
> +       __u16 nr_entries;
> +       __u16 buf_per_page;
> +       struct io_uring_buf_ring *buf_ring;
> +       __u32 tail;
> +       __u32 mask;
>  };
>  
>  struct io_buffer {
> @@ -815,6 +824,7 @@ enum {
>         REQ_F_NEED_CLEANUP_BIT,
>         REQ_F_POLLED_BIT,
>         REQ_F_BUFFER_SELECTED_BIT,
> +       REQ_F_BUFFER_RING_BIT,
>         REQ_F_COMPLETE_INLINE_BIT,
>         REQ_F_REISSUE_BIT,
>         REQ_F_CREDS_BIT,
> @@ -865,6 +875,8 @@ enum {
>         REQ_F_POLLED            = BIT(REQ_F_POLLED_BIT),
>         /* buffer already selected */
>         REQ_F_BUFFER_SELECTED   = BIT(REQ_F_BUFFER_SELECTED_BIT),
> +       /* buffer selected from ring, needs commit */
> +       REQ_F_BUFFER_RING       = BIT(REQ_F_BUFFER_RING_BIT),
>         /* completion is deferred through io_comp_state */
>         REQ_F_COMPLETE_INLINE   = BIT(REQ_F_COMPLETE_INLINE_BIT),
>         /* caller should reissue async */
> @@ -984,6 +996,15 @@ struct io_kiocb {
>  
>                 /* stores selected buf, valid IFF
> REQ_F_BUFFER_SELECTED is set */
>                 struct io_buffer        *kbuf;
> +
> +               /*
> +                * stores buffer ID for ring provided buffers, valid
> IFF
> +                * REQ_F_BUFFER_RING is set.
> +                */
> +                struct {
> +                        struct io_buffer_list  *buf_list;
> +                        __u32                  bid;
> +                };
>         };
>  
>         union {
> @@ -1564,21 +1585,30 @@ static inline void
> io_req_set_rsrc_node(struct io_kiocb *req,
>  
>  static unsigned int __io_put_kbuf(struct io_kiocb *req, struct
> list_head *list)
>  {
> -       struct io_buffer *kbuf = req->kbuf;
>         unsigned int cflags;
>  
> -       cflags = IORING_CQE_F_BUFFER | (kbuf->bid <<
> IORING_CQE_BUFFER_SHIFT);
> -       req->flags &= ~REQ_F_BUFFER_SELECTED;
> -       list_add(&kbuf->list, list);
> -       req->kbuf = NULL;
> -       return cflags;
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               if (req->buf_list)
> +                       req->buf_list->tail++;

does this need locking? both on buf_list being available or atomic
increment on tail.

> +
> +               cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
> +               req->flags &= ~REQ_F_BUFFER_RING;
> +       } else {
> +               struct io_buffer *kbuf = req->kbuf;
> +
> +               cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
> +               list_add(&kbuf->list, list);
> +               req->kbuf = NULL;

I wonder if this is not necessary? we don't do it above to buf_list and
it seems to work? For consistency though maybe better to pick one
approach?

> +               req->flags &= ~REQ_F_BUFFER_SELECTED;
> +       }
> +       return cflags | IORING_CQE_F_BUFFER;
>  }
>  
>  static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
>  {
>         lockdep_assert_held(&req->ctx->completion_lock);
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return 0;
>         return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
>  }
> @@ -1588,7 +1618,7 @@ static inline unsigned int io_put_kbuf(struct
> io_kiocb *req,
>  {
>         unsigned int cflags;
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return 0;
>  
>         /*
> @@ -1603,7 +1633,10 @@ static inline unsigned int io_put_kbuf(struct
> io_kiocb *req,
>          * We migrate buffers from the comp_list to the issue cache
> list
>          * when we need one.
>          */
> -       if (issue_flags & IO_URING_F_UNLOCKED) {
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               /* no buffers to recycle for this case */
> +               cflags = __io_put_kbuf(req, NULL);
> +       } else if (issue_flags & IO_URING_F_UNLOCKED) {
>                 struct io_ring_ctx *ctx = req->ctx;
>  
>                 spin_lock(&ctx->completion_lock);
> @@ -1645,11 +1678,21 @@ static void io_kbuf_recycle(struct io_kiocb
> *req, unsigned issue_flags)
>         struct io_buffer_list *bl;
>         struct io_buffer *buf;
>  
> -       if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
> +       if (!(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
>                 return;
>         /* don't recycle if we already did IO to this buffer */
>         if (req->flags & REQ_F_PARTIAL_IO)
>                 return;
> +       /*
> +        * We don't need to recycle for REQ_F_BUFFER_RING, we can
> just clear
> +        * the flag and hence ensure that bl->tail doesn't get
> incremented.
> +        * If the tail has already been incremented, hang on to it.
> +        */
> +       if (req->flags & REQ_F_BUFFER_RING) {
> +               if (req->buf_list)
> +                       req->flags &= ~REQ_F_BUFFER_RING;
> +               return;
> +       }
>  
>         io_ring_submit_lock(ctx, issue_flags);
>  
> @@ -3603,6 +3646,53 @@ static void __user
> *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
>         return u64_to_user_ptr(kbuf->addr);
>  }
>  
> +static void __user *io_ring_buffer_select(struct io_kiocb *req,
> size_t *len,
> +                                         struct io_buffer_list *bl,
> +                                         unsigned int issue_flags)
> +{
> +       struct io_uring_buf_ring *br = bl->buf_ring;
> +       struct io_uring_buf *buf = &br->bufs[0];
> +       __u32 tail = bl->tail;
> +
> +       if (unlikely(smp_load_acquire(&br->head) == tail))
> +               return ERR_PTR(-ENOBUFS);
> +
> +       tail &= bl->mask;
> +       if (tail < bl->buf_per_page) {
> +               buf = &br->bufs[tail];
> +       } else {
> +               int index = tail - bl->buf_per_page;
> +               int off = index & bl->buf_per_page;
> +
> +               index = (index >> (PAGE_SHIFT - 4)) + 1;
> +               buf = page_address(bl->buf_pages[index]);
> +               buf += off;
> +       }
> +       if (*len > buf->len)
> +               *len = buf->len;
> +       req->flags |= REQ_F_BUFFER_RING;
> +       req->buf_list = bl;
> +       req->bid = buf->bid;
> +
> +       if (!(issue_flags & IO_URING_F_UNLOCKED))
> +               return u64_to_user_ptr(buf->addr);
> +
> +       /*
> +        * If we came in unlocked, we have no choice but to
> +        * consume the buffer here. This does mean it'll be
> +        * pinned until the IO completes. But coming in
> +        * unlocked means we're in io-wq context, hence there

I do not know if this matters here, but task work can also run unlocked
operations.

> +        * should be no further retry. For the locked case, the
> +        * caller must ensure to call the commit when the
> +        * transfer completes (or if we get -EAGAIN and must
> +        * poll or retry).
> +        */
> +       req->buf_list = NULL;
> +       bl->tail++;
> +       io_ring_submit_unlock(req->ctx, issue_flags);
> +       return u64_to_user_ptr(buf->addr);
> +}
> +
>  static void __user *io_buffer_select(struct io_kiocb *req, size_t
> *len,
>                                      unsigned int issue_flags)
>  {
> @@ -3618,6 +3708,9 @@ static void __user *io_buffer_select(struct
> io_kiocb *req, size_t *len,
>         }
>  
>         /* selection helpers drop the submit lock again, if needed */
> +       if (bl->buf_pages)
> +               return io_ring_buffer_select(req, len, bl,
> issue_flags);
> +
>         return io_provided_buffer_select(req, len, bl, issue_flags);
>  }
>  
> @@ -3674,7 +3767,7 @@ static ssize_t __io_iov_buffer_select(struct
> io_kiocb *req, struct iovec *iov,
>  static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct
> iovec *iov,
>                                     unsigned int issue_flags)
>  {
> -       if (req->flags & REQ_F_BUFFER_SELECTED) {
> +       if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
>                 iov[0].iov_base = u64_to_user_ptr(req->rw.addr);
>                 iov[0].iov_len = req->rw.len;
>                 return 0;
> @@ -3694,7 +3787,7 @@ static inline bool io_do_buffer_select(struct
> io_kiocb *req)
>  {
>         if (!(req->flags & REQ_F_BUFFER_SELECT))
>                 return false;
> -       return !(req->flags & REQ_F_BUFFER_SELECTED);
> +       return !(req->flags &
> (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING));
>  }
>  
>  static struct iovec *__io_import_iovec(int rw, struct io_kiocb *req,
> @@ -5216,6 +5309,17 @@ static int __io_remove_buffers(struct
> io_ring_ctx *ctx,
>         if (!nbufs)
>                 return 0;
>  
> +       if (bl->buf_pages) {
> +               int j;
> +
> +               if (WARN_ON_ONCE(nbufs != -1U))
> +                       return -EINVAL;
> +               for (j = 0; j < bl->buf_nr_pages; j++)
> +                       unpin_user_page(bl->buf_pages[j]);
> +               kvfree(bl->buf_pages);
> +               bl->buf_pages = NULL;
> +       }
> +
>         /* the head kbuf is the list itself */
>         while (!list_empty(&bl->buf_list)) {
>                 struct io_buffer *nxt;
> @@ -5242,8 +5346,12 @@ static int io_remove_buffers(struct io_kiocb
> *req, unsigned int issue_flags)
>  
>         ret = -ENOENT;
>         bl = io_buffer_get_list(ctx, p->bgid);
> -       if (bl)
> -               ret = __io_remove_buffers(ctx, bl, p->nbufs);
> +       if (bl) {
> +               ret = -EINVAL;
> +               /* can't use provide/remove buffers command on mapped
> buffers */
> +               if (!bl->buf_pages)
> +                       ret = __io_remove_buffers(ctx, bl, p->nbufs);
> +       }
>         if (ret < 0)
>                 req_set_fail(req);
>  
> @@ -5368,13 +5476,18 @@ static int io_provide_buffers(struct io_kiocb
> *req, unsigned int issue_flags)
>  
>         bl = io_buffer_get_list(ctx, p->bgid);
>         if (unlikely(!bl)) {
> -               bl = kmalloc(sizeof(*bl), GFP_KERNEL);
> +               bl = kzalloc(sizeof(*bl), GFP_KERNEL);
>                 if (!bl) {
>                         ret = -ENOMEM;
>                         goto err;
>                 }
>                 io_buffer_add_list(ctx, bl, p->bgid);
>         }
> +       /* can't add buffers via this command for a mapped buffer
> ring */
> +       if (bl->buf_pages) {
> +               ret = -EINVAL;
> +               goto err;
> +       }
>  
>         ret = io_add_buffers(ctx, p, bl);
>  err:
> @@ -12318,6 +12431,77 @@ static __cold int
> io_register_iowq_max_workers(struct io_ring_ctx *ctx,
>         return ret;
>  }
>  
> +static int io_register_pbuf_ring(struct io_ring_ctx *ctx, void
> __user *arg)
> +{
> +       struct io_uring_buf_ring *br;
> +       struct io_uring_buf_reg reg;
> +       struct io_buffer_list *bl;
> +       struct page **pages;
> +       int nr_pages;
> +
> +       if (copy_from_user(&reg, arg, sizeof(reg)))
> +               return -EFAULT;
> +
> +       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
> +               return -EINVAL;
> +       if (!reg.ring_addr)
> +               return -EFAULT;
> +       if (reg.ring_addr & ~PAGE_MASK)
> +               return -EINVAL;
> +       if (!is_power_of_2(reg.ring_entries))
> +               return -EINVAL;
> +
> +       bl = io_buffer_get_list(ctx, reg.bgid);
> +       if (bl)
> +               return -EEXIST;
> +       bl = kzalloc(sizeof(*bl), GFP_KERNEL);
> +       if (!bl)
> +               return -ENOMEM;
> +
> +       pages = io_pin_pages(reg.ring_addr,
> +                            struct_size(br, bufs, reg.ring_entries),
> +                            &nr_pages);
> +       if (IS_ERR(pages)) {
> +               kfree(bl);
> +               return PTR_ERR(pages);
> +       }
> +
> +       br = page_address(pages[0]);
> +       br->head = 0;
> +       bl->buf_pages = pages;
> +       bl->buf_nr_pages = nr_pages;
> +       bl->nr_entries = reg.ring_entries;
> +       BUILD_BUG_ON(sizeof(struct io_uring_buf) != 16);
> +       bl->buf_per_page = (PAGE_SIZE - sizeof(struct io_uring_buf))
> /
> +                               sizeof(struct io_uring_buf);
> +       bl->buf_ring = br;
> +       bl->mask = reg.ring_entries - 1;
> +       io_buffer_add_list(ctx, bl, reg.bgid);
> +       return 0;
> +}
> +
> +static int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void
> __user *arg)
> +{
> +       struct io_uring_buf_reg reg;
> +       struct io_buffer_list *bl;
> +
> +       if (copy_from_user(&reg, arg, sizeof(reg)))
> +               return -EFAULT;
> +       if (reg.resv[0] || reg.resv[1] || reg.resv[2])
> +               return -EINVAL;
> +
> +       bl = io_buffer_get_list(ctx, reg.bgid);
> +       if (!bl)
> +               return -ENOENT;
> +       if (!bl->buf_pages)
> +               return -EINVAL;
> +
> +       __io_remove_buffers(ctx, bl, -1U);
> +       list_del(&bl->list);
> +       kfree(bl);
> +       return 0;
> +}
> +
>  static int __io_uring_register(struct io_ring_ctx *ctx, unsigned
> opcode,
>                                void __user *arg, unsigned nr_args)
>         __releases(ctx->uring_lock)
> @@ -12446,6 +12630,18 @@ static int __io_uring_register(struct
> io_ring_ctx *ctx, unsigned opcode,
>         case IORING_UNREGISTER_RING_FDS:
>                 ret = io_ringfd_unregister(ctx, arg, nr_args);
>                 break;
> +       case IORING_REGISTER_PBUF_RING:
> +               ret = -EINVAL;
> +               if (!arg || nr_args != 1)
> +                       break;
> +               ret = io_register_pbuf_ring(ctx, arg);
> +               break;
> +       case IORING_UNREGISTER_PBUF_RING:
> +               ret = -EINVAL;
> +               if (!arg || nr_args != 1)
> +                       break;
> +               ret = io_unregister_pbuf_ring(ctx, arg);
> +               break;
>         default:
>                 ret = -EINVAL;
>                 break;
> @@ -12531,6 +12727,7 @@ static int __init io_uring_init(void)
>  
>         /* ->buf_index is u16 */
>         BUILD_BUG_ON(IORING_MAX_REG_BUFFERS >= (1u << 16));
> +       BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 16);
>  
>         /* should fit into one byte */
>         BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
> diff --git a/include/uapi/linux/io_uring.h
> b/include/uapi/linux/io_uring.h
> index 49d1f3994f8d..90b70071110a 100644
> --- a/include/uapi/linux/io_uring.h
> +++ b/include/uapi/linux/io_uring.h
> @@ -352,6 +352,10 @@ enum {
>         IORING_REGISTER_RING_FDS                = 20,
>         IORING_UNREGISTER_RING_FDS              = 21,
>  
> +       /* register ring based provide buffer group */
> +       IORING_REGISTER_PBUF_RING               = 22,
> +       IORING_UNREGISTER_PBUF_RING             = 23,
> +
>         /* this goes last */
>         IORING_REGISTER_LAST
>  };
> @@ -423,6 +427,28 @@ struct io_uring_restriction {
>         __u32 resv2[3];
>  };
>  
> +struct io_uring_buf {
> +       __u64   addr;
> +       __u32   len;
> +       __u32   bid;
> +};

I believe bid is 16 bits due to the way it comes back in CQE flags

> +
> +struct io_uring_buf_ring {
> +       union {
> +               __u32                   head;
> +               struct io_uring_buf     pad;
> +       };
> +       struct io_uring_buf             bufs[];
> +};
> +
> +/* argument for IORING_(UN)REGISTER_PBUF_RING */
> +struct io_uring_buf_reg {
> +       __u64   ring_addr;
> +       __u32   ring_entries;
> +       __u32   bgid;

ditto for bgid






[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux