On 3/4/22 6:39 AM, Xiaoguang Wang wrote: > hi, > >> On 3/3/22 2:19 PM, Jens Axboe wrote: >>> On 3/3/22 1:41 PM, Jens Axboe wrote: >>>> On 3/3/22 10:18 AM, Jens Axboe wrote: >>>>> On 3/3/22 9:31 AM, Jens Axboe wrote: >>>>>> On 3/3/22 7:40 AM, Jens Axboe wrote: >>>>>>> On 3/3/22 7:36 AM, Jens Axboe wrote: >>>>>>>> The only potential oddity here is that the fd passed back is not a >>>>>>>> legitimate fd. io_uring does support poll(2) on its file descriptor, so >>>>>>>> that could cause some confusion even if I don't think anyone actually >>>>>>>> does poll(2) on io_uring. >>>>>>> Side note - the only implication here is that we then likely can't make >>>>>>> the optimized behavior the default, it has to be an IORING_SETUP_REG >>>>>>> flag which tells us that the application is aware of this limitation. >>>>>>> Though I guess close(2) might mess with that too... Hmm. >>>>>> Not sure I can find a good approach for that. Tried out your patch and >>>>>> made some fixes: >>>>>> >>>>>> - Missing free on final tctx free >>>>>> - Rename registered_files to registered_rings >>>>>> - Fix off-by-ones in checking max registration count >>>>>> - Use kcalloc >>>>>> - Rename ENTER_FIXED_FILE -> ENTER_REGISTERED_RING >>>>>> - Don't pass in tctx to io_uring_unreg_ringfd() >>>>>> - Get rid of forward declaration for adding tctx node >>>>>> - Get rid of extra file pointer in io_uring_enter() >>>>>> - Fix deadlock in io_ringfd_register() >>>>>> - Use io_uring_rsrc_update rather than add a new struct type >>>>> - Allow multiple register/unregister instead of enforcing just 1 at the >>>>> time >>>>> - Check for it really being a ring fd when registering >>>>> >>>>> For different batch counts, nice improvements are seen. Roughly: >>>>> >>>>> Batch==1 15% faster >>>>> Batch==2 13% faster >>>>> Batch==4 11% faster >>>>> >>>>> This is just in microbenchmarks where the fdget/fdput play a bigger >>>>> factor, but it will certainly help with real world situations where >>>>> batching is more limited than in benchmarks. >>>> In trying this out in applications, I think the better registration API >>>> is to allow io_uring to pick the offset. The application doesn't care, >>>> it's just a magic integer there. And if we allow io_uring to pick it, >>>> then that makes things a lot easier to deal with. >>>> >>>> For registration, pass in an array of io_uring_rsrc_update structs, just >>>> filling in the ring_fd in the data field. Return value is number of ring >>>> fds registered, and up->offset now contains the chosen offset for each >>>> of them. >>>> >>>> Unregister is the same struct, but just with offset filled in. >>>> >>>> For applications using io_uring, which is all of them as far as I'm >>>> aware, we can also easily hide this. This means we can get the optimized >>>> behavior by default. >>> Only complication here is if the ring is shared across threads or >>> processes. The thread one can be common, one thread doing submit and one >>> doing completions. That one is a bit harder to handle. Without >>> inheriting registered ring fds, not sure how it can be handled by >>> default. Should probably just introduce a new queue init helper, but >>> then that requires application changes to adopt... >>> >>> Ideas welcome! >> Don't see a way to do it by default, so I think it'll have to be opt-in. >> We could make it the default if you never shared a ring, which most >> people don't do, but we can't easily do so if it's ever shared between >> tasks or processes. > Before sending this patch, I also have thought about whether we can > make this enter_ring_fd be shared between threads and be default > feature, and found that it's hard :) > > 1) first we need to ensure this ring fd registration always can be > unregistered, so this cache is tied to io_uring_task, then when > thread exits, we can ensure fput called correspondingly. > > 2) we may also implement a file cache shared between threads in a > process, but this may introduce extra overhead, at least need locks > to protect modifications to this cache. If this cache is per thread, > it doesn't need any synchronizations. > > So I suggest to make it be just simple and opt-in, and the > registration interface seems not complicated, a thread trying to > submit sqes can adopt it easily. Yes, I pretty much have come to that same conclusion myself... >> With liburing, if you share, you share the io_uring struct as well. So >> it's hard to maintain a new ->enter_ring_fd in there. It'd be doable if >> we could reserve real fd == . Which is of course possible >> using xarray or similar, but that'll add extra overhead. > > For liburing, we may need to add fixed file version for helpers which > submit sqes or reap cqes, for example, io_uring_submit_fixed(), which > passes a enter_ring_fd. I'll take a look at liburing and see what we need to do there. I think the sanest thing to do here is say that using a registered ring fd means you cannot share the ring, ever. And then just have a ring->enter_ring_fd which is normally just set to ring_fd when the ring is setup, and if you register the ring fd, then we set it to whatever the registered value is. Everything calling io_uring_enter() then just needs to be modified to use ->enter_ring_fd instead of ->ring_fd. >> Anyway, current version below. Only real change here is allowing either >> specific offset or generated offset, depending on what the >> io_uring_rsrc_update->offset is set to. If set to -1U, then io_uring >> will find a free offset. If set to anything else, io_uring will use that >> index (as long as it's >=0 && < MAX). > Seems you forgot to attach the newest version, and also don't see a > patch attachment. Finally, thanks for your quick response and many > code improvements, really appreciate it. Oops, below now. How do you want to handle this patch? It's now a bit of a mix of both of our stuff... diff --git a/fs/io_uring.c b/fs/io_uring.c index 8f26c4602384..9180f122ecd7 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -466,6 +466,11 @@ struct io_ring_ctx { }; }; +/* + * Arbitrary limit, can be raised if need be + */ +#define IO_RINGFD_REG_MAX 16 + struct io_uring_task { /* submission side */ int cached_refs; @@ -481,6 +486,7 @@ struct io_uring_task { struct io_wq_work_list task_list; struct io_wq_work_list prior_task_list; struct callback_head task_work; + struct file **registered_rings; bool task_running; }; @@ -8779,8 +8785,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task, if (unlikely(!tctx)) return -ENOMEM; + tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX, + sizeof(struct file *), GFP_KERNEL); + if (unlikely(!tctx->registered_rings)) { + kfree(tctx); + return -ENOMEM; + } + ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL); if (unlikely(ret)) { + kfree(tctx->registered_rings); kfree(tctx); return ret; } @@ -8789,6 +8803,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task, if (IS_ERR(tctx->io_wq)) { ret = PTR_ERR(tctx->io_wq); percpu_counter_destroy(&tctx->inflight); + kfree(tctx->registered_rings); kfree(tctx); return ret; } @@ -8813,6 +8828,7 @@ void __io_uring_free(struct task_struct *tsk) WARN_ON_ONCE(tctx->io_wq); WARN_ON_ONCE(tctx->cached_refs); + kfree(tctx->registered_rings); percpu_counter_destroy(&tctx->inflight); kfree(tctx); tsk->io_uring = NULL; @@ -10035,6 +10051,139 @@ void __io_uring_cancel(bool cancel_all) io_uring_cancel_generic(cancel_all, NULL); } +void io_uring_unreg_ringfd(void) +{ + struct io_uring_task *tctx = current->io_uring; + int i; + + for (i = 0; i < IO_RINGFD_REG_MAX; i++) { + if (tctx->registered_rings[i]) { + fput(tctx->registered_rings[i]); + tctx->registered_rings[i] = NULL; + } + } +} + +static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd, + int start, int end) +{ + struct file *file; + int offset; + + for (offset = start; offset < end; offset++) { + offset = array_index_nospec(offset, IO_RINGFD_REG_MAX); + if (tctx->registered_rings[offset]) + continue; + + file = fget(fd); + if (!file) { + return -EBADF; + } else if (file->f_op != &io_uring_fops) { + fput(file); + return -EOPNOTSUPP; + } + tctx->registered_rings[offset] = file; + return offset; + } + + return -EBUSY; +} + +/* + * Register a ring fd to avoid fdget/fdput for each io_uring_enter() + * invocation. User passes in an array of struct io_uring_rsrc_update + * with ->data set to the ring_fd, and ->offset given for the desired + * index. If no index is desired, application may set ->offset == -1U + * and we'll find an available index. Returns number of entries + * successfully processed, or < 0 on error if none were processed. + */ +static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg, + unsigned nr_args) +{ + struct io_uring_rsrc_update __user *arg = __arg; + struct io_uring_rsrc_update reg; + struct io_uring_task *tctx; + int ret, i; + + if (!nr_args || nr_args > IO_RINGFD_REG_MAX) + return -EINVAL; + + mutex_unlock(&ctx->uring_lock); + ret = io_uring_add_tctx_node(ctx); + mutex_lock(&ctx->uring_lock); + if (ret) + return ret; + + tctx = current->io_uring; + for (i = 0; i < nr_args; i++) { + int start, end; + + if (copy_from_user(®, &arg[i], sizeof(reg))) { + ret = -EFAULT; + break; + } + + if (reg.offset == -1U) { + start = 0; + end = IO_RINGFD_REG_MAX; + } else { + if (reg.offset >= IO_RINGFD_REG_MAX) { + ret = -EINVAL; + break; + } + start = reg.offset; + end = start + 1; + } + + ret = io_ring_add_registered_fd(tctx, reg.data, start, end); + if (ret < 0) + break; + + reg.offset = ret; + if (copy_to_user(&arg[i], ®, sizeof(reg))) { + fput(tctx->registered_rings[reg.offset]); + tctx->registered_rings[reg.offset] = NULL; + ret = -EFAULT; + break; + } + } + + return i ? i : ret; +} + +static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg, + unsigned nr_args) +{ + struct io_uring_rsrc_update __user *arg = __arg; + struct io_uring_task *tctx = current->io_uring; + struct io_uring_rsrc_update reg; + int ret = 0, i; + + if (!nr_args || nr_args > IO_RINGFD_REG_MAX) + return -EINVAL; + if (!tctx) + return 0; + + for (i = 0; i < nr_args; i++) { + if (copy_from_user(®, &arg[i], sizeof(reg))) { + ret = -EFAULT; + break; + } + if (reg.offset >= IO_RINGFD_REG_MAX) { + ret = -EINVAL; + break; + } + + reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX); + if (tctx->registered_rings[reg.offset]) { + fput(tctx->registered_rings[reg.offset]); + tctx->registered_rings[reg.offset] = NULL; + } + } + + return i ? i : ret; +} + static void *io_uring_validate_mmap_request(struct file *file, loff_t pgoff, size_t sz) { @@ -10165,12 +10314,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, io_run_task_work(); if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP | - IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG))) + IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG | + IORING_ENTER_REGISTERED_RING))) return -EINVAL; - f = fdget(fd); - if (unlikely(!f.file)) - return -EBADF; + /* + * Ring fd has been registered via IORING_REGISTER_RING_FDS, we + * need only dereference our task private array to find it. + */ + if (flags & IORING_ENTER_REGISTERED_RING) { + struct io_uring_task *tctx = current->io_uring; + + if (fd < 0 || fd >= IO_RINGFD_REG_MAX || !tctx) + return -EINVAL; + fd = array_index_nospec(fd, IO_RINGFD_REG_MAX); + f.file = tctx->registered_rings[fd]; + if (unlikely(!f.file)) + return -EBADF; + } else { + f = fdget(fd); + if (unlikely(!f.file)) + return -EBADF; + } ret = -EOPNOTSUPP; if (unlikely(f.file->f_op != &io_uring_fops)) @@ -10244,7 +10409,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, out: percpu_ref_put(&ctx->refs); out_fput: - fdput(f); + if (!(flags & IORING_ENTER_REGISTERED_RING)) + fdput(f); return submitted ? submitted : ret; } @@ -11134,6 +11300,12 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, break; ret = io_register_iowq_max_workers(ctx, arg); break; + case IORING_REGISTER_RING_FDS: + ret = io_ringfd_register(ctx, arg, nr_args); + break; + case IORING_UNREGISTER_RING_FDS: + ret = io_ringfd_unregister(ctx, arg, nr_args); + break; default: ret = -EINVAL; break; diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h index 649a4d7c241b..1814e698d861 100644 --- a/include/linux/io_uring.h +++ b/include/linux/io_uring.h @@ -9,11 +9,14 @@ struct sock *io_uring_get_socket(struct file *file); void __io_uring_cancel(bool cancel_all); void __io_uring_free(struct task_struct *tsk); +void io_uring_unreg_ringfd(void); static inline void io_uring_files_cancel(void) { - if (current->io_uring) + if (current->io_uring) { + io_uring_unreg_ringfd(); __io_uring_cancel(false); + } } static inline void io_uring_task_cancel(void) { diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 787f491f0d2a..42b2fe84dbcd 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -257,10 +257,11 @@ struct io_cqring_offsets { /* * io_uring_enter(2) flags */ -#define IORING_ENTER_GETEVENTS (1U << 0) -#define IORING_ENTER_SQ_WAKEUP (1U << 1) -#define IORING_ENTER_SQ_WAIT (1U << 2) -#define IORING_ENTER_EXT_ARG (1U << 3) +#define IORING_ENTER_GETEVENTS (1U << 0) +#define IORING_ENTER_SQ_WAKEUP (1U << 1) +#define IORING_ENTER_SQ_WAIT (1U << 2) +#define IORING_ENTER_EXT_ARG (1U << 3) +#define IORING_ENTER_REGISTERED_RING (1U << 4) /* * Passed in for io_uring_setup(2). Copied back with updated info on success @@ -325,6 +326,10 @@ enum { /* set/get max number of io-wq workers */ IORING_REGISTER_IOWQ_MAX_WORKERS = 19, + /* register/unregister io_uring fd with the ring */ + IORING_REGISTER_RING_FDS = 20, + IORING_UNREGISTER_RING_FDS = 21, + /* this goes last */ IORING_REGISTER_LAST }; -- Jens Axboe