Re: Sending CQE to a different ring

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 3/9/22 6:55 PM, Jens Axboe wrote:
> On 3/9/22 6:36 PM, Jens Axboe wrote:
>> On 3/9/22 4:49 PM, Artyom Pavlov wrote:
>>> Greetings!
>>>
>>> A common approach for multi-threaded servers is to have a number of
>>> threads equal to a number of cores and launch a separate ring in each
>>> one. AFAIK currently if we want to send an event to a different ring,
>>> we have to write-lock this ring, create SQE, and update the index
>>> ring. Alternatively, we could use some kind of user-space message
>>> passing.
>>>
>>> Such approaches are somewhat inefficient and I think it can be solved
>>> elegantly by updating the io_uring_sqe type to allow accepting fd of a
>>> ring to which CQE must be sent by kernel. It can be done by
>>> introducing an IOSQE_ flag and using one of currently unused padding
>>> u64s.
>>>
>>> Such feature could be useful for load balancing and message passing
>>> between threads which would ride on top of io-uring, i.e. you could
>>> send NOP with user_data pointing to a message payload.
>>
>> So what you want is a NOP with 'fd' set to the fd of another ring, and
>> that nop posts a CQE on that other ring? I don't think we'd need IOSQE
>> flags for that, we just need a NOP that supports that. I see a few ways
>> of going about that:
>>
>> 1) Add a new 'NOP' that takes an fd, and validates that that fd is an
>>    io_uring instance. It can then grab the completion lock on that ring
>>    and post an empty CQE.
>>
>> 2) We add a FEAT flag saying NOP supports taking an 'fd' argument, where
>>    'fd' is another ring. Posting CQE same as above.
>>
>> 3) We add a specific opcode for this. Basically the same as #2, but
>>    maybe with a more descriptive name than NOP.
>>
>> Might make sense to pair that with a CQE flag or something like that, as
>> there's no specific user_data that could be used as it doesn't match an
>> existing SQE that has been issued. IORING_CQE_F_WAKEUP for example.
>> Would be applicable to all the above cases.
>>
>> I kind of like #3 the best. Add a IORING_OP_RING_WAKEUP command, require
>> that sqe->fd point to a ring (could even be the ring itself, doesn't
>> matter). And add IORING_CQE_F_WAKEUP as a specific flag for that.
> 
> Something like the below, totally untested. The request will complete on
> the original ring with either 0, for success, or -EOVERFLOW if the
> target ring was already in an overflow state. If the fd specified isn't
> an io_uring context, then the request will complete with -EBADFD.
> 
> If you have any way of testing this, please do. I'll write a basic
> functionality test for it as well, but not until tomorrow.
> 
> Maybe we want to include in cqe->res who the waker was? We can stuff the
> pid/tid in there, for example.

Made the pid change, and also wrote a test case for it. Only change
otherwise is adding a completion trace event as well. Patch below
against for-5.18/io_uring, and attached the test case for liburing.

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 2e04f718319d..b21f85a48224 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -1105,6 +1105,9 @@ static const struct io_op_def io_op_defs[] = {
 	[IORING_OP_MKDIRAT] = {},
 	[IORING_OP_SYMLINKAT] = {},
 	[IORING_OP_LINKAT] = {},
+	[IORING_OP_WAKEUP_RING] = {
+		.needs_file		= 1,
+	},
 };
 
 /* requests with any of those set should undergo io_disarm_next() */
@@ -4235,6 +4238,44 @@ static int io_nop(struct io_kiocb *req, unsigned int issue_flags)
 	return 0;
 }
 
+static int io_wakeup_ring_prep(struct io_kiocb *req,
+			       const struct io_uring_sqe *sqe)
+{
+	if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index || sqe->off ||
+		     sqe->len || sqe->rw_flags || sqe->splice_fd_in ||
+		     sqe->buf_index || sqe->personality))
+		return -EINVAL;
+
+	if (req->file->f_op != &io_uring_fops)
+		return -EBADFD;
+
+	return 0;
+}
+
+static int io_wakeup_ring(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_uring_cqe *cqe;
+	struct io_ring_ctx *ctx;
+	int ret = 0;
+
+	ctx = req->file->private_data;
+	spin_lock(&ctx->completion_lock);
+	cqe = io_get_cqe(ctx);
+	if (cqe) {
+		WRITE_ONCE(cqe->user_data, 0);
+		WRITE_ONCE(cqe->res, 0);
+		WRITE_ONCE(cqe->flags, IORING_CQE_F_WAKEUP);
+	} else {
+		ret = -EOVERFLOW;
+	}
+	io_commit_cqring(ctx);
+	spin_unlock(&ctx->completion_lock);
+	io_cqring_ev_posted(ctx);
+
+	__io_req_complete(req, issue_flags, ret, 0);
+	return 0;
+}
+
 static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -6568,6 +6609,8 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 		return io_symlinkat_prep(req, sqe);
 	case IORING_OP_LINKAT:
 		return io_linkat_prep(req, sqe);
+	case IORING_OP_WAKEUP_RING:
+		return io_wakeup_ring_prep(req, sqe);
 	}
 
 	printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
@@ -6851,6 +6894,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 	case IORING_OP_LINKAT:
 		ret = io_linkat(req, issue_flags);
 		break;
+	case IORING_OP_WAKEUP_RING:
+		ret = io_wakeup_ring(req, issue_flags);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 787f491f0d2a..088232133594 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -143,6 +143,7 @@ enum {
 	IORING_OP_MKDIRAT,
 	IORING_OP_SYMLINKAT,
 	IORING_OP_LINKAT,
+	IORING_OP_WAKEUP_RING,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
@@ -199,9 +200,11 @@ struct io_uring_cqe {
  *
  * IORING_CQE_F_BUFFER	If set, the upper 16 bits are the buffer ID
  * IORING_CQE_F_MORE	If set, parent SQE will generate more CQE entries
+ * IORING_CQE_F_WAKEUP	Wakeup request CQE, no link to an SQE
  */
 #define IORING_CQE_F_BUFFER		(1U << 0)
 #define IORING_CQE_F_MORE		(1U << 1)
+#define IORING_CQE_F_WAKEUP		(1U << 2)
 
 enum {
 	IORING_CQE_BUFFER_SHIFT		= 16,

-- 
Jens Axboe
commit d07d17adab5b918bd0543ce78f75a4747a057379
Author: Jens Axboe <axboe@xxxxxxxxx>
Date:   Wed Mar 9 19:31:57 2022 -0700

    test/wakeup-ring: add test cases for IORING_OP_WAKEUP_RING
    
    Signed-off-by: Jens Axboe <axboe@xxxxxxxxx>

diff --git a/.gitignore b/.gitignore
index c9dc77fbe162..961fd9e96dc7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -128,6 +128,7 @@
 /test/timeout-overflow
 /test/unlink
 /test/wakeup-hang
+/test/wakeup-ring
 /test/multicqes_drain
 /test/poll-mshot-update
 /test/rsrc_tags
diff --git a/src/include/liburing/io_uring.h b/src/include/liburing/io_uring.h
index a7d193d0df38..8f919b42a8ea 100644
--- a/src/include/liburing/io_uring.h
+++ b/src/include/liburing/io_uring.h
@@ -147,6 +147,7 @@ enum {
 	IORING_OP_MKDIRAT,
 	IORING_OP_SYMLINKAT,
 	IORING_OP_LINKAT,
+	IORING_OP_WAKEUP_RING,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
diff --git a/test/Makefile b/test/Makefile
index f421f536df87..4aafbae826ca 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -151,6 +151,7 @@ test_srcs := \
 	timeout-overflow.c \
 	unlink.c \
 	wakeup-hang.c \
+	wakeup-ring.c \
 	skip-cqe.c \
 	# EOL
 
@@ -221,6 +222,7 @@ ring-leak2: override LDFLAGS += -lpthread
 poll-mshot-update: override LDFLAGS += -lpthread
 exit-no-cleanup: override LDFLAGS += -lpthread
 pollfree: override LDFLAGS += -lpthread
+wakeup-ring: override LDFLAGS += -lpthread
 
 install: $(test_targets) runtests.sh runtests-loop.sh
 	$(INSTALL) -D -d -m 755 $(datadir)/liburing-test/
diff --git a/test/wakeup-ring.c b/test/wakeup-ring.c
new file mode 100644
index 000000000000..0ccae92d4c93
--- /dev/null
+++ b/test/wakeup-ring.c
@@ -0,0 +1,172 @@
+/* SPDX-License-Identifier: MIT */
+/*
+ * Description: test ring wakeup command
+ *
+ */
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h>
+#include <pthread.h>
+
+#include "liburing.h"
+
+static int no_wakeup;
+
+static int test_own(struct io_uring *ring)
+{
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	int ret, i;
+
+	sqe = io_uring_get_sqe(ring);
+	if (!sqe) {
+		fprintf(stderr, "get sqe failed\n");
+		goto err;
+	}
+
+	sqe->opcode = IORING_OP_WAKEUP_RING;
+	sqe->fd = ring->ring_fd;
+	sqe->user_data = 1;
+
+	ret = io_uring_submit(ring);
+	if (ret <= 0) {
+		fprintf(stderr, "sqe submit failed: %d\n", ret);
+		goto err;
+	}
+
+	for (i = 0; i < 2; i++) {
+		ret = io_uring_peek_cqe(ring, &cqe);
+		if (ret < 0) {
+			fprintf(stderr, "wait completion %d\n", ret);
+			goto err;
+		}
+		switch (cqe->user_data) {
+		case 1:
+			if (cqe->res == -EINVAL || cqe->res == -EOPNOTSUPP) {
+				no_wakeup = 1;
+				return 0;
+			}
+			if (cqe->res != 0) {
+				fprintf(stderr, "wakeup res %d\n", cqe->res);
+				return -1;
+			}
+			break;
+		case 0:
+			if (!(cqe->flags & (1U << 2))) {
+				fprintf(stderr, "invalid flags %x\n", cqe->flags);
+				return -1;
+			}
+			break;
+		}
+		io_uring_cqe_seen(ring, cqe);
+	}
+
+	return 0;
+err:
+	return 1;
+}
+
+static void *wait_cqe_fn(void *data)
+{
+	struct io_uring *ring = data;
+	struct io_uring_cqe *cqe;
+	int ret;
+
+	ret = io_uring_wait_cqe(ring, &cqe);
+	if (ret) {
+		fprintf(stderr, "wait cqe %d\n", ret);
+		goto err;
+	}
+
+	if (!(cqe->flags & (1U << 2))) {
+		fprintf(stderr, "invalid flags %x\n", cqe->flags);
+		goto err;
+	}
+
+	return NULL;
+err:
+	return (void *) (unsigned long) 1;
+}
+
+static int test_remote(struct io_uring *ring, struct io_uring *target)
+{
+	struct io_uring_cqe *cqe;
+	struct io_uring_sqe *sqe;
+	int ret;
+
+	sqe = io_uring_get_sqe(ring);
+	if (!sqe) {
+		fprintf(stderr, "get sqe failed\n");
+		goto err;
+	}
+
+	sqe->opcode = IORING_OP_WAKEUP_RING;
+	sqe->fd = target->ring_fd;
+	sqe->user_data = 1;
+
+	ret = io_uring_submit(ring);
+	if (ret <= 0) {
+		fprintf(stderr, "sqe submit failed: %d\n", ret);
+		goto err;
+	}
+
+	ret = io_uring_peek_cqe(ring, &cqe);
+	if (ret < 0) {
+		fprintf(stderr, "wait completion %d\n", ret);
+		goto err;
+	}
+	if (cqe->res != 0) {
+		fprintf(stderr, "wakeup res %d\n", cqe->res);
+		return -1;
+	}
+
+	io_uring_cqe_seen(ring, cqe);
+	return 0;
+err:
+	return 1;
+}
+
+int main(int argc, char *argv[])
+{
+	struct io_uring ring, ring2;
+	pthread_t thread;
+	void *tret;
+	int ret;
+
+	if (argc > 1)
+		return 0;
+
+	ret = io_uring_queue_init(8, &ring, 0);
+	if (ret) {
+		fprintf(stderr, "ring setup failed: %d\n", ret);
+		return 1;
+	}
+	ret = io_uring_queue_init(8, &ring2, 0);
+	if (ret) {
+		fprintf(stderr, "ring setup failed: %d\n", ret);
+		return 1;
+	}
+
+	pthread_create(&thread, NULL, wait_cqe_fn, &ring2);
+
+	ret = test_own(&ring);
+	if (ret) {
+		fprintf(stderr, "test_own failed\n");
+		return ret;
+	}
+	if (no_wakeup)
+		return 0;
+
+	ret = test_remote(&ring, &ring2);
+	if (ret) {
+		fprintf(stderr, "test_remote failed\n");
+		return ret;
+	}
+
+	pthread_join(thread, &tret);
+
+	return 0;
+}

[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux