On 3/4/22 6:39 AM, Xiaoguang Wang wrote:
hi,
On 3/3/22 2:19 PM, Jens Axboe wrote:
On 3/3/22 1:41 PM, Jens Axboe wrote:
On 3/3/22 10:18 AM, Jens Axboe wrote:
On 3/3/22 9:31 AM, Jens Axboe wrote:
On 3/3/22 7:40 AM, Jens Axboe wrote:
On 3/3/22 7:36 AM, Jens Axboe wrote:
The only potential oddity here is that the fd passed back is not a
legitimate fd. io_uring does support poll(2) on its file descriptor, so
that could cause some confusion even if I don't think anyone actually
does poll(2) on io_uring.
Side note - the only implication here is that we then likely can't make
the optimized behavior the default, it has to be an IORING_SETUP_REG
flag which tells us that the application is aware of this limitation.
Though I guess close(2) might mess with that too... Hmm.
Not sure I can find a good approach for that. Tried out your patch and
made some fixes:
- Missing free on final tctx free
- Rename registered_files to registered_rings
- Fix off-by-ones in checking max registration count
- Use kcalloc
- Rename ENTER_FIXED_FILE -> ENTER_REGISTERED_RING
- Don't pass in tctx to io_uring_unreg_ringfd()
- Get rid of forward declaration for adding tctx node
- Get rid of extra file pointer in io_uring_enter()
- Fix deadlock in io_ringfd_register()
- Use io_uring_rsrc_update rather than add a new struct type
- Allow multiple register/unregister instead of enforcing just 1 at the
time
- Check for it really being a ring fd when registering
For different batch counts, nice improvements are seen. Roughly:
Batch==1 15% faster
Batch==2 13% faster
Batch==4 11% faster
This is just in microbenchmarks where the fdget/fdput play a bigger
factor, but it will certainly help with real world situations where
batching is more limited than in benchmarks.
In trying this out in applications, I think the better registration API
is to allow io_uring to pick the offset. The application doesn't care,
it's just a magic integer there. And if we allow io_uring to pick it,
then that makes things a lot easier to deal with.
For registration, pass in an array of io_uring_rsrc_update structs, just
filling in the ring_fd in the data field. Return value is number of ring
fds registered, and up->offset now contains the chosen offset for each
of them.
Unregister is the same struct, but just with offset filled in.
For applications using io_uring, which is all of them as far as I'm
aware, we can also easily hide this. This means we can get the optimized
behavior by default.
Only complication here is if the ring is shared across threads or
processes. The thread one can be common, one thread doing submit and one
doing completions. That one is a bit harder to handle. Without
inheriting registered ring fds, not sure how it can be handled by
default. Should probably just introduce a new queue init helper, but
then that requires application changes to adopt...
Ideas welcome!
Don't see a way to do it by default, so I think it'll have to be opt-in.
We could make it the default if you never shared a ring, which most
people don't do, but we can't easily do so if it's ever shared between
tasks or processes.
Before sending this patch, I also have thought about whether we can
make this enter_ring_fd be shared between threads and be default
feature, and found that it's hard :)
1) first we need to ensure this ring fd registration always can be
unregistered, so this cache is tied to io_uring_task, then when
thread exits, we can ensure fput called correspondingly.
2) we may also implement a file cache shared between threads in a
process, but this may introduce extra overhead, at least need locks
to protect modifications to this cache. If this cache is per thread,
it doesn't need any synchronizations.
So I suggest to make it be just simple and opt-in, and the
registration interface seems not complicated, a thread trying to
submit sqes can adopt it easily.
Yes, I pretty much have come to that same conclusion myself...
With liburing, if you share, you share the io_uring struct as well. So
it's hard to maintain a new ->enter_ring_fd in there. It'd be doable if
we could reserve real fd == . Which is of course possible
using xarray or similar, but that'll add extra overhead.
For liburing, we may need to add fixed file version for helpers which
submit sqes or reap cqes, for example, io_uring_submit_fixed(), which
passes a enter_ring_fd.
I'll take a look at liburing and see what we need to do there. I think
the sanest thing to do here is say that using a registered ring fd means
you cannot share the ring, ever. And then just have a
ring->enter_ring_fd which is normally just set to ring_fd when the ring
is setup, and if you register the ring fd, then we set it to whatever
the registered value is. Everything calling io_uring_enter() then just
needs to be modified to use ->enter_ring_fd instead of ->ring_fd.
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8f26c4602384..9180f122ecd7 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -466,6 +466,11 @@ struct io_ring_ctx {
};
};
+/*
+ * Arbitrary limit, can be raised if need be
+ */
+#define IO_RINGFD_REG_MAX 16
+
struct io_uring_task {
/* submission side */
int cached_refs;
@@ -481,6 +486,7 @@ struct io_uring_task {
struct io_wq_work_list task_list;
struct io_wq_work_list prior_task_list;
struct callback_head task_work;
+ struct file **registered_rings;
bool task_running;
};
@@ -8779,8 +8785,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
if (unlikely(!tctx))
return -ENOMEM;
+ tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX,
+ sizeof(struct file *), GFP_KERNEL);
+ if (unlikely(!tctx->registered_rings)) {
+ kfree(tctx);
+ return -ENOMEM;
+ }
+
ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
if (unlikely(ret)) {
+ kfree(tctx->registered_rings);
kfree(tctx);
return ret;
}
@@ -8789,6 +8803,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
if (IS_ERR(tctx->io_wq)) {
ret = PTR_ERR(tctx->io_wq);
percpu_counter_destroy(&tctx->inflight);
+ kfree(tctx->registered_rings);
kfree(tctx);
return ret;
}
@@ -8813,6 +8828,7 @@ void __io_uring_free(struct task_struct *tsk)
WARN_ON_ONCE(tctx->io_wq);
WARN_ON_ONCE(tctx->cached_refs);
+ kfree(tctx->registered_rings);
percpu_counter_destroy(&tctx->inflight);
kfree(tctx);
tsk->io_uring = NULL;
@@ -10035,6 +10051,139 @@ void __io_uring_cancel(bool cancel_all)
io_uring_cancel_generic(cancel_all, NULL);
}
+void io_uring_unreg_ringfd(void)
+{
+ struct io_uring_task *tctx = current->io_uring;
+ int i;
+
+ for (i = 0; i < IO_RINGFD_REG_MAX; i++) {
+ if (tctx->registered_rings[i]) {
+ fput(tctx->registered_rings[i]);
+ tctx->registered_rings[i] = NULL;
+ }
+ }
+}
+
+static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
+ int start, int end)
+{
+ struct file *file;
+ int offset;
+
+ for (offset = start; offset < end; offset++) {
+ offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
+ if (tctx->registered_rings[offset])
+ continue;
+
+ file = fget(fd);
+ if (!file) {
+ return -EBADF;
+ } else if (file->f_op != &io_uring_fops) {
+ fput(file);
+ return -EOPNOTSUPP;
+ }
+ tctx->registered_rings[offset] = file;
+ return offset;
+ }
+
+ return -EBUSY;
+}
+
+/*
+ * Register a ring fd to avoid fdget/fdput for each io_uring_enter()
+ * invocation. User passes in an array of struct io_uring_rsrc_update
+ * with ->data set to the ring_fd, and ->offset given for the desired
+ * index. If no index is desired, application may set ->offset == -1U
+ * and we'll find an available index. Returns number of entries
+ * successfully processed, or < 0 on error if none were processed.
+ */
+static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
+ unsigned nr_args)
+{
+ struct io_uring_rsrc_update __user *arg = __arg;
+ struct io_uring_rsrc_update reg;
+ struct io_uring_task *tctx;
+ int ret, i;
+
+ if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+ return -EINVAL;
+
+ mutex_unlock(&ctx->uring_lock);
+ ret = io_uring_add_tctx_node(ctx);
+ mutex_lock(&ctx->uring_lock);
+ if (ret)
+ return ret;
+
+ tctx = current->io_uring;
+ for (i = 0; i < nr_args; i++) {
+ int start, end;
+
+ if (copy_from_user(®, &arg[i], sizeof(reg))) {
+ ret = -EFAULT;
+ break;
+ }
+
+ if (reg.offset == -1U) {
+ start = 0;
+ end = IO_RINGFD_REG_MAX;
+ } else {
+ if (reg.offset >= IO_RINGFD_REG_MAX) {
+ ret = -EINVAL;
+ break;
+ }
+ start = reg.offset;
+ end = start + 1;
+ }
+
+ ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
+ if (ret < 0)
+ break;
+
+ reg.offset = ret;
+ if (copy_to_user(&arg[i], ®, sizeof(reg))) {
+ fput(tctx->registered_rings[reg.offset]);
+ tctx->registered_rings[reg.offset] = NULL;
+ ret = -EFAULT;
+ break;
+ }
+ }
+
+ return i ? i : ret;
+}
+
+static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg,
+ unsigned nr_args)
+{
+ struct io_uring_rsrc_update __user *arg = __arg;
+ struct io_uring_task *tctx = current->io_uring;
+ struct io_uring_rsrc_update reg;
+ int ret = 0, i;
+
+ if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+ return -EINVAL;
+ if (!tctx)
+ return 0;
+
+ for (i = 0; i < nr_args; i++) {
+ if (copy_from_user(®, &arg[i], sizeof(reg))) {
+ ret = -EFAULT;
+ break;
+ }
+ if (reg.offset >= IO_RINGFD_REG_MAX) {
+ ret = -EINVAL;
+ break;
+ }
+
+ reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX);
+ if (tctx->registered_rings[reg.offset]) {
+ fput(tctx->registered_rings[reg.offset]);
+ tctx->registered_rings[reg.offset] = NULL;
+ }
+ }
+
+ return i ? i : ret;
+}
+
static void *io_uring_validate_mmap_request(struct file *file,
loff_t pgoff, size_t sz)
{
@@ -10165,12 +10314,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
io_run_task_work();
if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
- IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG)))
+ IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
+ IORING_ENTER_REGISTERED_RING)))
return -EINVAL;
- f = fdget(fd);
- if (unlikely(!f.file))
- return -EBADF;
+ /*
+ * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
+ * need only dereference our task private array to find it.
+ */
+ if (flags & IORING_ENTER_REGISTERED_RING) {
+ struct io_uring_task *tctx = current->io_uring;
+
+ if (fd < 0 || fd >= IO_RINGFD_REG_MAX || !tctx)
+ return -EINVAL;
+ fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
+ f.file = tctx->registered_rings[fd];
+ if (unlikely(!f.file))
+ return -EBADF;
+ } else {
+ f = fdget(fd);
+ if (unlikely(!f.file))
+ return -EBADF;
+ }
ret = -EOPNOTSUPP;
if (unlikely(f.file->f_op != &io_uring_fops))
@@ -10244,7 +10409,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
out:
percpu_ref_put(&ctx->refs);
out_fput:
- fdput(f);
+ if (!(flags & IORING_ENTER_REGISTERED_RING))
+ fdput(f);
return submitted ? submitted : ret;
}
@@ -11134,6 +11300,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
break;
ret = io_register_iowq_max_workers(ctx, arg);
break;
+ case IORING_REGISTER_RING_FDS:
+ ret = io_ringfd_register(ctx, arg, nr_args);
+ break;
+ case IORING_UNREGISTER_RING_FDS:
+ ret = io_ringfd_unregister(ctx, arg, nr_args);
+ break;
default:
ret = -EINVAL;
break;
diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h
index 649a4d7c241b..1814e698d861 100644
--- a/include/linux/io_uring.h
+++ b/include/linux/io_uring.h
@@ -9,11 +9,14 @@
struct sock *io_uring_get_socket(struct file *file);
void __io_uring_cancel(bool cancel_all);
void __io_uring_free(struct task_struct *tsk);
+void io_uring_unreg_ringfd(void);
static inline void io_uring_files_cancel(void)
{
- if (current->io_uring)
+ if (current->io_uring) {
+ io_uring_unreg_ringfd();
__io_uring_cancel(false);
+ }
}
static inline void io_uring_task_cancel(void)
{
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 787f491f0d2a..42b2fe84dbcd 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -257,10 +257,11 @@ struct io_cqring_offsets {
/*
* io_uring_enter(2) flags
*/
-#define IORING_ENTER_GETEVENTS (1U << 0)
-#define IORING_ENTER_SQ_WAKEUP (1U << 1)
-#define IORING_ENTER_SQ_WAIT (1U << 2)
-#define IORING_ENTER_EXT_ARG (1U << 3)
+#define IORING_ENTER_GETEVENTS (1U << 0)
+#define IORING_ENTER_SQ_WAKEUP (1U << 1)
+#define IORING_ENTER_SQ_WAIT (1U << 2)
+#define IORING_ENTER_EXT_ARG (1U << 3)
+#define IORING_ENTER_REGISTERED_RING (1U << 4)
/*
* Passed in for io_uring_setup(2). Copied back with updated info on success
@@ -325,6 +326,10 @@ enum {
/* set/get max number of io-wq workers */
IORING_REGISTER_IOWQ_MAX_WORKERS = 19,
+ /* register/unregister io_uring fd with the ring */
+ IORING_REGISTER_RING_FDS = 20,
+ IORING_UNREGISTER_RING_FDS = 21,
+
/* this goes last */
IORING_REGISTER_LAST
};