Re: [PATCH v5 13/13] io_uring: support buffer registration sharing

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 1/13/2021 6:01 PM, Bijan Mottahedeh wrote:
On 1/12/2021 1:33 PM, Bijan Mottahedeh wrote:
Implement buffer sharing among multiple rings.

A ring shares its (future) buffer registrations at setup time with
IORING_SETUP_SHARE_BUF. A ring attaches to another ring's buffer
registration at setup time with IORING_SETUP_ATTACH_BUF, after
authenticating with the buffer registration owner's fd. Any updates to
the owner's buffer registrations become immediately available to the
attached rings.

Signed-off-by: Bijan Mottahedeh <bijan.mottahedeh@xxxxxxxxxx>

Conflicts:
    fs/io_uring.c
---
  fs/io_uring.c                 | 85 +++++++++++++++++++++++++++++++++++++++++--
  include/uapi/linux/io_uring.h |  2 +
  2 files changed, 83 insertions(+), 4 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 37639b9..856a570b 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -8439,6 +8439,13 @@ static void io_buffers_map_free(struct io_ring_ctx *ctx)
      ctx->nr_user_bufs = 0;
  }
+static void io_detach_buf_data(struct io_ring_ctx *ctx)
+{
+    percpu_ref_put(&ctx->buf_data->refs);
+    ctx->buf_data = NULL;
+    ctx->nr_user_bufs = 0;
+}
+
  static int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
  {
      struct fixed_rsrc_data *data = ctx->buf_data;
@@ -8447,6 +8454,11 @@ static int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
      if (!data)
          return -ENXIO;
+    if (ctx->flags & IORING_SETUP_ATTACH_BUF) {
+        io_detach_buf_data(ctx);
+        return 0;
+    }
+
      ret = io_rsrc_ref_quiesce(data, ctx, alloc_fixed_buf_ref_node);
      if (ret)
          return ret;
@@ -8690,9 +8702,13 @@ static struct fixed_rsrc_data *io_buffers_map_alloc(struct io_ring_ctx *ctx,
      if (!nr_args || nr_args > IORING_MAX_FIXED_BUFS)
          return ERR_PTR(-EINVAL);
-    buf_data = alloc_fixed_rsrc_data(ctx);
-    if (IS_ERR(buf_data))
-        return buf_data;
+    if (ctx->buf_data) {
+        buf_data = ctx->buf_data;
+    } else {
+        buf_data = alloc_fixed_rsrc_data(ctx);
+        if (IS_ERR(buf_data))
+            return buf_data;
+    }
      nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_BUFS_TABLE);
      buf_data->table = kcalloc(nr_tables, sizeof(*buf_data->table),
@@ -8757,9 +8773,17 @@ static int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
      if (ctx->nr_user_bufs)
          return -EBUSY;
+    if (ctx->flags & IORING_SETUP_ATTACH_BUF) {
+        if (!ctx->buf_data)
+            return -EFAULT;
+        ctx->nr_user_bufs = ctx->buf_data->ctx->nr_user_bufs;
+        return 0;
+    }
+
      buf_data = io_buffers_map_alloc(ctx, nr_args);
      if (IS_ERR(buf_data))
          return PTR_ERR(buf_data);
+    ctx->buf_data = buf_data;
      for (i = 0; i < nr_args; i++, ctx->nr_user_bufs++) {
          struct io_mapped_ubuf *imu;
@@ -8783,7 +8807,6 @@ static int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
              break;
      }
-    ctx->buf_data = buf_data;
      if (ret) {
          io_sqe_buffers_unregister(ctx);
          return ret;
@@ -9831,6 +9854,55 @@ static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
      return file;
  }
+static int io_attach_buf_data(struct io_ring_ctx *ctx,
+                  struct io_uring_params *p)
+{
+    struct io_ring_ctx *ctx_attach;
+    struct fd f;
+
+    f = fdget(p->wq_fd);
+    if (!f.file)
+        return -EBADF;
+    if (f.file->f_op != &io_uring_fops) {
+        fdput(f);
+        return -EINVAL;
+    }
+
+    ctx_attach = f.file->private_data;
+    if (!ctx_attach->buf_data) {
+        fdput(f);
+        return -EINVAL;
+    }
+    ctx->buf_data = ctx_attach->buf_data;
+
+    percpu_ref_get(&ctx->buf_data->refs);
+    fdput(f);
+    return 0;
+}
+
+static int io_init_buf_data(struct io_ring_ctx *ctx, struct io_uring_params *p)
+{
+    if ((p->flags & (IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF)) ==
+        (IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF))
+        return -EINVAL;
+
+    if (p->flags & IORING_SETUP_SHARE_BUF) {
+        struct fixed_rsrc_data *buf_data;
+
+        buf_data = alloc_fixed_rsrc_data(ctx);
+        if (IS_ERR(buf_data))
+            return PTR_ERR(buf_data);
+
+        ctx->buf_data = buf_data;
+        return 0;
+    }
+
+    if (p->flags & IORING_SETUP_ATTACH_BUF)
+        return io_attach_buf_data(ctx, p);
+
+    return 0;
+}
+
  static int io_uring_create(unsigned entries, struct io_uring_params *p,
                 struct io_uring_params __user *params)
  {
@@ -9948,6 +10020,10 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
      if (ret)
          goto err;
+    ret = io_init_buf_data(ctx, p);
+    if (ret)
+        goto err;
+
      ret = io_sq_offload_create(ctx, p);
      if (ret)
          goto err;
@@ -10028,6 +10104,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
      if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
              IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
              IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
+            IORING_SETUP_SHARE_BUF | IORING_SETUP_ATTACH_BUF |
              IORING_SETUP_R_DISABLED))
          return -EINVAL;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index b289ef8..3ad786a 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -98,6 +98,8 @@ enum {
  #define IORING_SETUP_CLAMP    (1U << 4)    /* clamp SQ/CQ ring sizes */
  #define IORING_SETUP_ATTACH_WQ    (1U << 5)    /* attach to existing wq */   #define IORING_SETUP_R_DISABLED    (1U << 6)    /* start with ring disabled */ +#define IORING_SETUP_SHARE_BUF    (1U << 7)    /* share buffer registration */ +#define IORING_SETUP_ATTACH_BUF    (1U << 8)    /* attach buffer registration */
  enum {
      IORING_OP_NOP,


I recreated the deadlock scenario you had raised:

 > Ok, now the original io_uring instance will wait until the attached
 > once get rid of their references. That's a versatile ground to have
 > in kernel deadlocks.
 >
 > task1: uring1 = create()
 > task2: uring2 = create()
 > task1: uring3 = create(share=uring2);
 > task2: uring4 = create(share=uring1);
 >
 > task1: io_sqe_buffers_unregister(uring1)
 > task2: io_sqe_buffers_unregister(uring2)
 >
 > If I skimmed through the code right, that should hang unkillably.

with the following test:

+static int test_deadlock(void)
+{
+       int i, pid, ret;
+       struct io_uring rings[4];
+       struct io_uring_params p = {};
+
+       p.flags = IORING_SETUP_SHARE_BUF;
+
+       for (i = 0; i < 2; i++) {
+               ret = io_uring_queue_init_params(1, &rings[i], &p);
+               if (ret) {
+                       verror("queue_init share");
+                       return ret;
+               }
+       }
+
+       p.flags = IORING_SETUP_ATTACH_BUF;
+
+       pid = fork();
+       if (pid) {
+               p.wq_fd = rings[1].ring_fd;
+               ret = io_uring_queue_init_params(1, &rings[3], &p);
                                                            ^^^
                                                            2
+       } else {
+               p.wq_fd = rings[0].ring_fd;
+               ret = io_uring_queue_init_params(1, &rings[4], &p);
                                                            ^^^
                                                            3
+       }
+
+       if (ret) {
+               verror("queue_init attach");
+               return ret;
+       }
+
+
+       vinfo(V1, "unregister\n");
+
+       if (pid) {
+               close(rings[1].ring_fd);
+               ret = io_uring_unregister_buffers(&rings[0]);
+       } else {
+               close(rings[0].ring_fd);
+               ret = io_uring_unregister_buffers(&rings[1]);
+       }
+
+       vinfo(V1, "unregister done\n");
+
+       if (ret)
+               verror("unregister");
+
+       return ret;
+}


The two processe in the test hang but can be interrupted.

I checked that

ret = wait_for_completion_interruptible(&data->done);

in io_rsrc_ref_quiesce() returns -ERESTARTSYS (-512) after hitting ^C and that

ret = io_run_task_work_sig();

returns -EINTR (-4)

Finally in

io_ring_ctx_free()
-> io_sqe_buffers_unregister()
    -> io_rsrc_ref_quiesce()

ret = wait_for_completion_interruptible(&data->done);

returns 0.

So it looks like the unkillable hang is not there.

However, when I take out the io_uring_unregister_buffers() calls from the test, one of the processes gets a segfault with the following trace and I'm not sure what the cause is.

Bug in test itself. Wrong index passed to io_uring_queue_init_params() above.




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux