Jens Axboe <axboe@xxxxxxxxx> writes: > Buffers can get registered with io_uring, which allows to skip the > repeated pin_pages, unpin/unref pages for each O_DIRECT operation. This > reduces the overhead of O_DIRECT IO. > > However, registrering buffers can take some time. Normally this isn't an > issue as it's done at initialization time (and hence less critical), but > for cases where rings can be created and destroyed as part of an IO > thread pool, registering the same buffers for multiple rings become a > more time sensitive proposition. As an example, let's say an application > has an IO memory pool of 500G. Initial registration takes: > > Got 500 huge pages (each 1024MB) > Registered 500 pages in 409 msec > > or about 0.4 seconds. If we go higher to 900 1GB huge pages being > registered: > > Registered 900 pages in 738 msec > > which is, as expected, a fully linear scaling. > > Rather than have each ring pin/map/register the same buffer pool, > provide an io_uring_register(2) opcode to simply duplicate the buffers > that are registered with another ring. Adding the same 900GB of > registered buffers to the target ring can then be accomplished in: > > Copied 900 pages in 17 usec > > While timing differs a bit, this provides around a 25,000-40,000x > speedup for this use case. Looks good, but I couldn't get it to apply on top of your branches. I have only one comment, if you are doing a v4: > > Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> > --- > include/uapi/linux/io_uring.h | 13 +++++ > io_uring/register.c | 6 +++ > io_uring/rsrc.c | 91 +++++++++++++++++++++++++++++++++++ > io_uring/rsrc.h | 1 + > 4 files changed, 111 insertions(+) > > diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h > --- a/io_uring/rsrc.c > +++ b/io_uring/rsrc.c > @@ -17,6 +17,7 @@ > #include "openclose.h" > #include "rsrc.h" > #include "memmap.h" > +#include "register.h" > > struct io_rsrc_update { > struct file *file; > @@ -1137,3 +1138,93 @@ int io_import_fixed(int ddir, struct iov_iter *iter, > > return 0; > } > + > +static int io_copy_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx) The error handling code in this function is a bit hairy, IMO. I think if you check nbufs unlocked and validate it later, it could be much simpler: static int io_copy_buffers(struct io_ring_ctx *ctx, struct io_ring_ctx *src_ctx) { struct io_mapped_ubuf **user_bufs; struct io_rsrc_data *data; int i, ret, nbufs; /* Read nr_user_bufs unlocked. Must be validated later */ nbufs = READ_ONCE(src_ctx->nr_user_bufs); if (!nbufs) return -ENXIO; ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, NULL, nbufs, &data); if (ret) return ret; user_bufs = kcalloc(nbufs, sizeof(*ctx->user_bufs), GFP_KERNEL); if (!user_bufs) { ret = -ENOMEM; goto out_free_data; } mutex_unlock(&ctx->uring_lock); mutex_lock(&src_ctx->uring_lock); ret = -EBUSY; if (nbufs != src_ctx->nr_user_bufs) { mutex_unlock(&src_ctx->uring_lock); mutex_lock(&ctx->uring_lock); goto out; } for (i = 0; i < nbufs; i++) { struct io_mapped_ubuf *src = src_ctx->user_bufs[i]; refcount_inc(&src->refs); user_bufs[i] = src; } /* Have a ref on the bufs now, drop src lock and re-grab our own lock */ mutex_unlock(&src_ctx->uring_lock); mutex_lock(&ctx->uring_lock); if (!ctx->user_bufs) goto out_unmap; ctx->user_bufs = user_bufs; ctx->buf_data = data; ctx->nr_user_bufs = nbufs; return 0; out_unmap: /* someone raced setting up buffers, dump ours */ for (i = 0; i < nbufs; i++) io_buffer_unmap(ctx, &user_bufs[i]); out: kfree(user_bufs); out_free_data: io_rsrc_data_free(data); return ret; } Thanks, -- Gabriel Krisman Bertazi