On 3/9/22 6:55 PM, Jens Axboe wrote: > On 3/9/22 6:36 PM, Jens Axboe wrote: >> On 3/9/22 4:49 PM, Artyom Pavlov wrote: >>> Greetings! >>> >>> A common approach for multi-threaded servers is to have a number of >>> threads equal to a number of cores and launch a separate ring in each >>> one. AFAIK currently if we want to send an event to a different ring, >>> we have to write-lock this ring, create SQE, and update the index >>> ring. Alternatively, we could use some kind of user-space message >>> passing. >>> >>> Such approaches are somewhat inefficient and I think it can be solved >>> elegantly by updating the io_uring_sqe type to allow accepting fd of a >>> ring to which CQE must be sent by kernel. It can be done by >>> introducing an IOSQE_ flag and using one of currently unused padding >>> u64s. >>> >>> Such feature could be useful for load balancing and message passing >>> between threads which would ride on top of io-uring, i.e. you could >>> send NOP with user_data pointing to a message payload. >> >> So what you want is a NOP with 'fd' set to the fd of another ring, and >> that nop posts a CQE on that other ring? I don't think we'd need IOSQE >> flags for that, we just need a NOP that supports that. I see a few ways >> of going about that: >> >> 1) Add a new 'NOP' that takes an fd, and validates that that fd is an >> io_uring instance. It can then grab the completion lock on that ring >> and post an empty CQE. >> >> 2) We add a FEAT flag saying NOP supports taking an 'fd' argument, where >> 'fd' is another ring. Posting CQE same as above. >> >> 3) We add a specific opcode for this. Basically the same as #2, but >> maybe with a more descriptive name than NOP. >> >> Might make sense to pair that with a CQE flag or something like that, as >> there's no specific user_data that could be used as it doesn't match an >> existing SQE that has been issued. IORING_CQE_F_WAKEUP for example. >> Would be applicable to all the above cases. >> >> I kind of like #3 the best. Add a IORING_OP_RING_WAKEUP command, require >> that sqe->fd point to a ring (could even be the ring itself, doesn't >> matter). And add IORING_CQE_F_WAKEUP as a specific flag for that. > > Something like the below, totally untested. The request will complete on > the original ring with either 0, for success, or -EOVERFLOW if the > target ring was already in an overflow state. If the fd specified isn't > an io_uring context, then the request will complete with -EBADFD. > > If you have any way of testing this, please do. I'll write a basic > functionality test for it as well, but not until tomorrow. > > Maybe we want to include in cqe->res who the waker was? We can stuff the > pid/tid in there, for example. Made the pid change, and also wrote a test case for it. Only change otherwise is adding a completion trace event as well. Patch below against for-5.18/io_uring, and attached the test case for liburing. diff --git a/fs/io_uring.c b/fs/io_uring.c index 2e04f718319d..b21f85a48224 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -1105,6 +1105,9 @@ static const struct io_op_def io_op_defs[] = { [IORING_OP_MKDIRAT] = {}, [IORING_OP_SYMLINKAT] = {}, [IORING_OP_LINKAT] = {}, + [IORING_OP_WAKEUP_RING] = { + .needs_file = 1, + }, }; /* requests with any of those set should undergo io_disarm_next() */ @@ -4235,6 +4238,44 @@ static int io_nop(struct io_kiocb *req, unsigned int issue_flags) return 0; } +static int io_wakeup_ring_prep(struct io_kiocb *req, + const struct io_uring_sqe *sqe) +{ + if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index || sqe->off || + sqe->len || sqe->rw_flags || sqe->splice_fd_in || + sqe->buf_index || sqe->personality)) + return -EINVAL; + + if (req->file->f_op != &io_uring_fops) + return -EBADFD; + + return 0; +} + +static int io_wakeup_ring(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_uring_cqe *cqe; + struct io_ring_ctx *ctx; + int ret = 0; + + ctx = req->file->private_data; + spin_lock(&ctx->completion_lock); + cqe = io_get_cqe(ctx); + if (cqe) { + WRITE_ONCE(cqe->user_data, 0); + WRITE_ONCE(cqe->res, 0); + WRITE_ONCE(cqe->flags, IORING_CQE_F_WAKEUP); + } else { + ret = -EOVERFLOW; + } + io_commit_cqring(ctx); + spin_unlock(&ctx->completion_lock); + io_cqring_ev_posted(ctx); + + __io_req_complete(req, issue_flags, ret, 0); + return 0; +} + static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) { struct io_ring_ctx *ctx = req->ctx; @@ -6568,6 +6609,8 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) return io_symlinkat_prep(req, sqe); case IORING_OP_LINKAT: return io_linkat_prep(req, sqe); + case IORING_OP_WAKEUP_RING: + return io_wakeup_ring_prep(req, sqe); } printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n", @@ -6851,6 +6894,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) case IORING_OP_LINKAT: ret = io_linkat(req, issue_flags); break; + case IORING_OP_WAKEUP_RING: + ret = io_wakeup_ring(req, issue_flags); + break; default: ret = -EINVAL; break; diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 787f491f0d2a..088232133594 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -143,6 +143,7 @@ enum { IORING_OP_MKDIRAT, IORING_OP_SYMLINKAT, IORING_OP_LINKAT, + IORING_OP_WAKEUP_RING, /* this goes last, obviously */ IORING_OP_LAST, @@ -199,9 +200,11 @@ struct io_uring_cqe { * * IORING_CQE_F_BUFFER If set, the upper 16 bits are the buffer ID * IORING_CQE_F_MORE If set, parent SQE will generate more CQE entries + * IORING_CQE_F_WAKEUP Wakeup request CQE, no link to an SQE */ #define IORING_CQE_F_BUFFER (1U << 0) #define IORING_CQE_F_MORE (1U << 1) +#define IORING_CQE_F_WAKEUP (1U << 2) enum { IORING_CQE_BUFFER_SHIFT = 16, -- Jens Axboe
commit d07d17adab5b918bd0543ce78f75a4747a057379 Author: Jens Axboe <axboe@xxxxxxxxx> Date: Wed Mar 9 19:31:57 2022 -0700 test/wakeup-ring: add test cases for IORING_OP_WAKEUP_RING Signed-off-by: Jens Axboe <axboe@xxxxxxxxx> diff --git a/.gitignore b/.gitignore index c9dc77fbe162..961fd9e96dc7 100644 --- a/.gitignore +++ b/.gitignore @@ -128,6 +128,7 @@ /test/timeout-overflow /test/unlink /test/wakeup-hang +/test/wakeup-ring /test/multicqes_drain /test/poll-mshot-update /test/rsrc_tags diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h index a7d193d0df38..8f919b42a8ea 100644 --- a/src/include/liburing/io_uring.h +++ b/src/include/liburing/io_uring.h @@ -147,6 +147,7 @@ enum { IORING_OP_MKDIRAT, IORING_OP_SYMLINKAT, IORING_OP_LINKAT, + IORING_OP_WAKEUP_RING, /* this goes last, obviously */ IORING_OP_LAST, diff --git a/test/Makefile b/test/Makefile index f421f536df87..4aafbae826ca 100644 --- a/test/Makefile +++ b/test/Makefile @@ -151,6 +151,7 @@ test_srcs := \ timeout-overflow.c \ unlink.c \ wakeup-hang.c \ + wakeup-ring.c \ skip-cqe.c \ # EOL @@ -221,6 +222,7 @@ ring-leak2: override LDFLAGS += -lpthread poll-mshot-update: override LDFLAGS += -lpthread exit-no-cleanup: override LDFLAGS += -lpthread pollfree: override LDFLAGS += -lpthread +wakeup-ring: override LDFLAGS += -lpthread install: $(test_targets) runtests.sh runtests-loop.sh $(INSTALL) -D -d -m 755 $(datadir)/liburing-test/ diff --git a/test/wakeup-ring.c b/test/wakeup-ring.c new file mode 100644 index 000000000000..0ccae92d4c93 --- /dev/null +++ b/test/wakeup-ring.c @@ -0,0 +1,172 @@ +/* SPDX-License-Identifier: MIT */ +/* + * Description: test ring wakeup command + * + */ +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <fcntl.h> +#include <pthread.h> + +#include "liburing.h" + +static int no_wakeup; + +static int test_own(struct io_uring *ring) +{ + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + int ret, i; + + sqe = io_uring_get_sqe(ring); + if (!sqe) { + fprintf(stderr, "get sqe failed\n"); + goto err; + } + + sqe->opcode = IORING_OP_WAKEUP_RING; + sqe->fd = ring->ring_fd; + sqe->user_data = 1; + + ret = io_uring_submit(ring); + if (ret <= 0) { + fprintf(stderr, "sqe submit failed: %d\n", ret); + goto err; + } + + for (i = 0; i < 2; i++) { + ret = io_uring_peek_cqe(ring, &cqe); + if (ret < 0) { + fprintf(stderr, "wait completion %d\n", ret); + goto err; + } + switch (cqe->user_data) { + case 1: + if (cqe->res == -EINVAL || cqe->res == -EOPNOTSUPP) { + no_wakeup = 1; + return 0; + } + if (cqe->res != 0) { + fprintf(stderr, "wakeup res %d\n", cqe->res); + return -1; + } + break; + case 0: + if (!(cqe->flags & (1U << 2))) { + fprintf(stderr, "invalid flags %x\n", cqe->flags); + return -1; + } + break; + } + io_uring_cqe_seen(ring, cqe); + } + + return 0; +err: + return 1; +} + +static void *wait_cqe_fn(void *data) +{ + struct io_uring *ring = data; + struct io_uring_cqe *cqe; + int ret; + + ret = io_uring_wait_cqe(ring, &cqe); + if (ret) { + fprintf(stderr, "wait cqe %d\n", ret); + goto err; + } + + if (!(cqe->flags & (1U << 2))) { + fprintf(stderr, "invalid flags %x\n", cqe->flags); + goto err; + } + + return NULL; +err: + return (void *) (unsigned long) 1; +} + +static int test_remote(struct io_uring *ring, struct io_uring *target) +{ + struct io_uring_cqe *cqe; + struct io_uring_sqe *sqe; + int ret; + + sqe = io_uring_get_sqe(ring); + if (!sqe) { + fprintf(stderr, "get sqe failed\n"); + goto err; + } + + sqe->opcode = IORING_OP_WAKEUP_RING; + sqe->fd = target->ring_fd; + sqe->user_data = 1; + + ret = io_uring_submit(ring); + if (ret <= 0) { + fprintf(stderr, "sqe submit failed: %d\n", ret); + goto err; + } + + ret = io_uring_peek_cqe(ring, &cqe); + if (ret < 0) { + fprintf(stderr, "wait completion %d\n", ret); + goto err; + } + if (cqe->res != 0) { + fprintf(stderr, "wakeup res %d\n", cqe->res); + return -1; + } + + io_uring_cqe_seen(ring, cqe); + return 0; +err: + return 1; +} + +int main(int argc, char *argv[]) +{ + struct io_uring ring, ring2; + pthread_t thread; + void *tret; + int ret; + + if (argc > 1) + return 0; + + ret = io_uring_queue_init(8, &ring, 0); + if (ret) { + fprintf(stderr, "ring setup failed: %d\n", ret); + return 1; + } + ret = io_uring_queue_init(8, &ring2, 0); + if (ret) { + fprintf(stderr, "ring setup failed: %d\n", ret); + return 1; + } + + pthread_create(&thread, NULL, wait_cqe_fn, &ring2); + + ret = test_own(&ring); + if (ret) { + fprintf(stderr, "test_own failed\n"); + return ret; + } + if (no_wakeup) + return 0; + + ret = test_remote(&ring, &ring2); + if (ret) { + fprintf(stderr, "test_remote failed\n"); + return ret; + } + + pthread_join(thread, &tret); + + return 0; +}