On 07/12/2020 22:15, Bijan Mottahedeh wrote: > Implement buffer sharing among multiple rings. > > A ring shares its (future) buffer registrations at setup time with > IORING_SETUP_SHARE_BUF. A ring attaches to another ring's buffer > registration at setup time with IORING_SETUP_ATTACH_BUF, after > authenticating with the buffer registration owner's fd. Any updates to > the owner's buffer registrations become immediately available to the > attached rings. > > Signed-off-by: Bijan Mottahedeh <bijan.mottahedeh@xxxxxxxxxx> > --- > fs/io_uring.c | 85 +++++++++++++++++++++++++++++++++++++++++-- > include/uapi/linux/io_uring.h | 2 + > 2 files changed, 83 insertions(+), 4 deletions(-) > > diff --git a/fs/io_uring.c b/fs/io_uring.c > index 479a6b9..b75cbd7 100644 > --- a/fs/io_uring.c > +++ b/fs/io_uring.c > @@ -8408,6 +8408,12 @@ static void io_buffers_map_free(struct io_ring_ctx *ctx) > ctx->nr_user_bufs = 0; > } > > +static void io_detach_buf_data(struct io_ring_ctx *ctx) > +{ > + percpu_ref_put(&ctx->buf_data->refs); > + ctx->buf_data = NULL; > +} > + > static int io_sqe_buffers_unregister(struct io_ring_ctx *ctx) > { > struct fixed_rsrc_data *data = ctx->buf_data; > @@ -8415,6 +8421,12 @@ static int io_sqe_buffers_unregister(struct io_ring_ctx *ctx) > if (!data) > return -ENXIO; > > + if (ctx->flags & IORING_SETUP_ATTACH_BUF) { > + io_detach_buf_data(ctx); > + ctx->nr_user_bufs = 0; nr_user_bufs is a part of invariant and should stay together with stuff in io_detach_buf_data(). > + return 0; > + } > + > io_rsrc_ref_quiesce(data, ctx); > io_buffers_unmap(ctx); > io_buffers_map_free(ctx); > @@ -8660,9 +8672,13 @@ static struct fixed_rsrc_data *io_buffers_map_alloc(struct io_ring_ctx *ctx, > if (!nr_args || nr_args > IORING_MAX_FIXED_BUFS) > return ERR_PTR(-EINVAL); > > - buf_data = alloc_fixed_rsrc_data(ctx); > - if (IS_ERR(buf_data)) > - return buf_data; > + if (ctx->buf_data) { > + buf_data = ctx->buf_data; > + } else { > + buf_data = alloc_fixed_rsrc_data(ctx); > + if (IS_ERR(buf_data)) > + return buf_data; > + } > > nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_BUFS_TABLE); > buf_data->table = kcalloc(nr_tables, sizeof(*buf_data->table), > @@ -8724,9 +8740,17 @@ static int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg, > struct fixed_rsrc_ref_node *ref_node; > struct fixed_rsrc_data *buf_data; > > + if (ctx->flags & IORING_SETUP_ATTACH_BUF) { > + if (!ctx->buf_data) > + return -EFAULT; > + ctx->nr_user_bufs = ctx->buf_data->ctx->nr_user_bufs; Why? Once a table is initialised it shouldn't change its size, would be racy otherwise. > + return 0; > + } > + > buf_data = io_buffers_map_alloc(ctx, nr_args); > if (IS_ERR(buf_data)) > return PTR_ERR(buf_data); > + ctx->buf_data = buf_data; Wanted to write that there is missing `if (ctx->user_bufs) return -EBUSY` but apparently it was moved into io_buffers_map_alloc(). I'd really prefer to have it here. > > for (i = 0; i < nr_args; i++, ctx->nr_user_bufs++) { > struct fixed_rsrc_table *table; > @@ -8754,7 +8778,6 @@ static int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg, > break; > } > > - ctx->buf_data = buf_data; > if (ret) { > io_sqe_buffers_unregister(ctx); > return ret; > @@ -9783,6 +9806,55 @@ static int io_uring_get_fd(struct io_ring_ctx *ctx) > return ret; > } > > +static int io_attach_buf_data(struct io_ring_ctx *ctx, > + struct io_uring_params *p) > +{ > + struct io_ring_ctx *ctx_attach; > + struct fd f; > + > + f = fdget(p->wq_fd); > + if (!f.file) > + return -EBADF; > + if (f.file->f_op != &io_uring_fops) { > + fdput(f); > + return -EINVAL; > + } > + > + ctx_attach = f.file->private_data; > + if (!ctx_attach->buf_data) { It looks racy. What prevents it from being deleted while we're working on it, e.g. by io_sqe_buffers_unregister? > + fdput(f); > + return -EINVAL; > + } > + ctx->buf_data = ctx_attach->buf_data; Before updates, etc. (e.g. __io_sqe_buffers_update()) were synchronised by uring_lock, now it's modified concurrently, that looks to be really racy. > + > + percpu_ref_get(&ctx->buf_data->refs); Ok, now the original io_uring instance will wait until the attached once get rid of their references. That's a versatile ground to have in kernel deadlocks. task1: uring1 = create() task2: uring2 = create() task1: uring3 = create(share=uring2); task2: uring4 = create(share=uring1); task1: io_sqe_buffers_unregister(uring1) task2: io_sqe_buffers_unregister(uring2) If I skimmed through the code right, that should hang unkillably. > + fdput(f); > + return 0; > +} > + > +static int io_init_buf_data(struct io_ring_ctx *ctx, struct io_uring_params *p) > +{ > + if ((p->flags & (IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF)) == > + (IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF)) > + return -EINVAL; > + > + if (p->flags & IORING_SETUP_SHARE_BUF) { > + struct fixed_rsrc_data *buf_data; > + > + buf_data = alloc_fixed_rsrc_data(ctx); > + if (IS_ERR(buf_data)) > + return PTR_ERR(buf_data); > + > + ctx->buf_data = buf_data; > + return 0; > + } > + > + if (p->flags & IORING_SETUP_ATTACH_BUF) > + return io_attach_buf_data(ctx, p); > + > + return 0; > +} > + > static int io_uring_create(unsigned entries, struct io_uring_params *p, > struct io_uring_params __user *params) > { > @@ -9897,6 +9969,10 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, > if (ret) > goto err; > > + ret = io_init_buf_data(ctx, p); > + if (ret) > + goto err; > + > ret = io_sq_offload_create(ctx, p); > if (ret) > goto err; > @@ -9968,6 +10044,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params) > if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | > IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE | > IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ | > + IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF | > IORING_SETUP_R_DISABLED)) > return -EINVAL; > > diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h > index 0d9ac12..2366126 100644 > --- a/include/uapi/linux/io_uring.h > +++ b/include/uapi/linux/io_uring.h > @@ -98,6 +98,8 @@ enum { > #define IORING_SETUP_CLAMP (1U << 4) /* clamp SQ/CQ ring sizes */ > #define IORING_SETUP_ATTACH_WQ (1U << 5) /* attach to existing wq */ > #define IORING_SETUP_R_DISABLED (1U << 6) /* start with ring disabled */ > +#define IORING_SETUP_SHARE_BUF (1U << 7) /* share buffer registration */ > +#define IORING_SETUP_ATTACH_BUF (1U << 8) /* attach buffer registration */ > > enum { > IORING_OP_NOP, > -- Pavel Begunkov