[PATCH liburing 5/5] overflow: add tests

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Add tests that verify that overflow conditions behave appropriately.
Specifically:
 * if overflow is continually flushed, then CQEs should arrive mostly in
 order to prevent starvation of some completions
 * if CQEs are dropped due to GFP_ATOMIC allocation failures it is
 possible to terminate cleanly. This is not tested by default as it
 requires debug kernel config, and also has system-wide effects

Signed-off-by: Dylan Yudaken <dylany@xxxxxx>
---
 test/cq-overflow.c | 240 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 233 insertions(+), 7 deletions(-)

diff --git a/test/cq-overflow.c b/test/cq-overflow.c
index 057570e..067308a 100644
--- a/test/cq-overflow.c
+++ b/test/cq-overflow.c
@@ -9,6 +9,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <fcntl.h>
+#include <assert.h>
 
 #include "helpers.h"
 #include "liburing.h"
@@ -21,6 +22,32 @@ static struct iovec *vecs;
 
 #define ENTRIES	8
 
+/*
+ * io_uring has rare cases where CQEs are lost.
+ * This happens when there is no space in the CQ ring, and also there is no
+ * GFP_ATOMIC memory available. In reality this probably means that the process
+ * is about to be killed as many other things might start failing, but we still
+ * want to test that liburing and the kernel deal with this properly. The fault
+ * injection framework allows us to test this scenario. Unfortunately this
+ * requires some system wide changes and so we do not enable this by default.
+ * The tests in this file should work in both cases (where overflows are queued
+ * and where they are dropped) on recent kernels.
+ *
+ * In order to test dropped CQEs you should enable fault injection in the kernel
+ * config:
+ *
+ * CONFIG_FAULT_INJECTION=y
+ * CONFIG_FAILSLAB=y
+ * CONFIG_FAULT_INJECTION_DEBUG_FS=y
+ *
+ * and then run the test as follows:
+ * echo Y > /sys/kernel/debug/failslab/task-filter
+ * echo 100 > /sys/kernel/debug/failslab/probability
+ * echo 0 > /sys/kernel/debug/failslab/verbose
+ * echo 100000 > /sys/kernel/debug/failslab/times
+ * bash -c "echo 1 > /proc/self/make-it-fail && exec ./cq-overflow.t"
+ */
+
 static int test_io(const char *file, unsigned long usecs, unsigned *drops, int fault)
 {
 	struct io_uring_sqe *sqe;
@@ -29,6 +56,7 @@ static int test_io(const char *file, unsigned long usecs, unsigned *drops, int f
 	unsigned reaped, total;
 	struct io_uring ring;
 	int nodrop, i, fd, ret;
+	bool cqe_dropped = false;
 
 	fd = open(file, O_RDONLY | O_DIRECT);
 	if (fd < 0) {
@@ -103,8 +131,8 @@ static int test_io(const char *file, unsigned long usecs, unsigned *drops, int f
 reap_it:
 	reaped = 0;
 	do {
-		if (nodrop) {
-			/* nodrop should never lose events */
+		if (nodrop && !cqe_dropped) {
+			/* nodrop should never lose events unless cqe_dropped */
 			if (reaped == total)
 				break;
 		} else {
@@ -112,7 +140,10 @@ reap_it:
 				break;
 		}
 		ret = io_uring_wait_cqe(&ring, &cqe);
-		if (ret) {
+		if (nodrop && ret == -EBADR) {
+			cqe_dropped = true;
+			continue;
+		} else if (ret) {
 			fprintf(stderr, "wait_cqe=%d\n", ret);
 			goto err;
 		}
@@ -132,7 +163,7 @@ reap_it:
 		goto err;
 	}
 
-	if (!nodrop) {
+	if (!nodrop || cqe_dropped) {
 		*drops = *ring.cq.koverflow;
 	} else if (*ring.cq.koverflow) {
 		fprintf(stderr, "Found %u overflows\n", *ring.cq.koverflow);
@@ -153,18 +184,31 @@ static int reap_events(struct io_uring *ring, unsigned nr_events, int do_wait)
 {
 	struct io_uring_cqe *cqe;
 	int i, ret = 0, seq = 0;
+	unsigned int start_overflow = *ring->cq.koverflow;
+	unsigned int drop_count = 0;
+	bool dropped = false;
 
 	for (i = 0; i < nr_events; i++) {
 		if (do_wait)
 			ret = io_uring_wait_cqe(ring, &cqe);
 		else
 			ret = io_uring_peek_cqe(ring, &cqe);
-		if (ret) {
+		if (do_wait && ret == -EBADR) {
+			unsigned int this_drop = *ring->cq.koverflow -
+				start_overflow;
+
+			dropped = true;
+			drop_count += this_drop;
+			start_overflow = *ring->cq.koverflow;
+			assert(this_drop > 0);
+			i += (this_drop - 1);
+			continue;
+		} else if (ret) {
 			if (ret != -EAGAIN)
 				fprintf(stderr, "cqe peek failed: %d\n", ret);
 			break;
 		}
-		if (cqe->user_data != seq) {
+		if (!dropped && cqe->user_data != seq) {
 			fprintf(stderr, "cqe sequence out-of-order\n");
 			fprintf(stderr, "got %d, wanted %d\n", (int) cqe->user_data,
 					seq);
@@ -241,19 +285,201 @@ err:
 	return 1;
 }
 
+
+static void submit_one_nop(struct io_uring *ring, int ud)
+{
+	struct io_uring_sqe *sqe;
+	int ret;
+
+	sqe = io_uring_get_sqe(ring);
+	assert(sqe);
+	io_uring_prep_nop(sqe);
+	sqe->user_data = ud;
+	ret = io_uring_submit(ring);
+	assert(ret == 1);
+}
+
+/*
+ * Create an overflow condition and ensure that SQEs are still processed
+ */
+static int test_overflow_handling(
+	bool batch,
+	int cqe_multiple,
+	bool poll)
+{
+	struct io_uring ring;
+	struct io_uring_params p;
+	int ret, i, j, ud, cqe_count;
+	unsigned int count;
+	int const N = 8;
+	int const LOOPS = 128;
+	int const QUEUE_LENGTH = 1024;
+	int completions[N];
+	int queue[QUEUE_LENGTH];
+	int queued = 0;
+	int outstanding = 0;
+	bool cqe_dropped = false;
+
+	memset(&completions, 0, sizeof(int) * N);
+	memset(&p, 0, sizeof(p));
+	p.cq_entries = 2 * cqe_multiple;
+	p.flags |= IORING_SETUP_CQSIZE;
+
+	if (poll)
+		p.flags |= IORING_SETUP_IOPOLL;
+
+	ret = io_uring_queue_init_params(2, &ring, &p);
+	if (ret) {
+		fprintf(stderr, "io_uring_queue_init failed %d\n", ret);
+		return 1;
+	}
+
+	assert(p.cq_entries < N);
+	/* submit N SQEs, some should overflow */
+	for (i = 0; i < N; i++) {
+		submit_one_nop(&ring, i);
+		outstanding++;
+	}
+
+	for (i = 0; i < LOOPS; i++) {
+		struct io_uring_cqe *cqes[N];
+
+		if (io_uring_cq_has_overflow(&ring)) {
+			/*
+			 * Flush any overflowed CQEs and process those. Actively
+			 * flush these to make sure CQEs arrive in vague order
+			 * of being sent.
+			 */
+			ret = io_uring_flush_overflow(&ring);
+			if (ret != 0) {
+				fprintf(stderr,
+					"io_uring_flush_overflow returned %d\n",
+					ret);
+				goto err;
+			}
+		} else if (!cqe_dropped) {
+			for (j = 0; j < queued; j++) {
+				submit_one_nop(&ring, queue[j]);
+				outstanding++;
+			}
+			queued = 0;
+		}
+
+		/* We have lost some random cqes, stop if no remaining. */
+		if (cqe_dropped && outstanding == *ring.cq.koverflow)
+			break;
+
+		ret = io_uring_wait_cqe(&ring, &cqes[0]);
+		if (ret == -EBADR) {
+			cqe_dropped = true;
+			fprintf(stderr, "CQE dropped\n");
+			continue;
+		} else if (ret != 0) {
+			fprintf(stderr, "io_uring_wait_cqes failed %d\n", ret);
+			goto err;
+		}
+		cqe_count = 1;
+		if (batch) {
+			ret = io_uring_peek_batch_cqe(&ring, &cqes[0], 2);
+			if (ret < 0) {
+				fprintf(stderr,
+					"io_uring_peek_batch_cqe failed %d\n",
+					ret);
+				goto err;
+			}
+			cqe_count = ret;
+		}
+		for (j = 0; j < cqe_count; j++) {
+			assert(cqes[j]->user_data < N);
+			ud = cqes[j]->user_data;
+			completions[ud]++;
+			assert(queued < QUEUE_LENGTH);
+			queue[queued++] = (int)ud;
+		}
+		io_uring_cq_advance(&ring, cqe_count);
+		outstanding -= cqe_count;
+	}
+
+	/* See if there were any drops by flushing the CQ ring *and* overflow */
+	do {
+		struct io_uring_cqe *cqe;
+
+		ret = io_uring_flush_overflow(&ring);
+		if (ret < 0) {
+			if (ret == -EBADR) {
+				fprintf(stderr, "CQE dropped\n");
+				cqe_dropped = true;
+				break;
+			}
+			goto err;
+		}
+		if (outstanding && !io_uring_cq_ready(&ring))
+			ret = io_uring_wait_cqe_timeout(&ring, &cqe, NULL);
+
+		if (ret && ret != -ETIME) {
+			if (ret == -EBADR) {
+				fprintf(stderr, "CQE dropped\n");
+				cqe_dropped = true;
+				break;
+			}
+			fprintf(stderr, "wait_cqe_timeout = %d\n", ret);
+			goto err;
+		}
+		count = io_uring_cq_ready(&ring);
+		io_uring_cq_advance(&ring, count);
+		outstanding -= count;
+	} while (count);
+
+	io_uring_queue_exit(&ring);
+
+	/* Make sure that completions come back in the same order they were
+	 * sent. If they come back unfairly then this will concentrate on a
+	 * couple of indices.
+	 */
+	for (i = 1; !cqe_dropped && i < N; i++) {
+		if (abs(completions[i] - completions[i - 1]) > 1) {
+			fprintf(
+				stderr,
+				"bad completion size %d %d\n",
+				completions[i],
+				completions[i - 1]);
+			goto err;
+		}
+	}
+	return 0;
+err:
+	io_uring_queue_exit(&ring);
+	return 1;
+}
+
 int main(int argc, char *argv[])
 {
 	const char *fname = ".cq-overflow";
 	unsigned iters, drops;
 	unsigned long usecs;
 	int ret;
+	int i;
 
 	if (argc > 1)
 		return 0;
 
+	for (i = 0; i < 8; i++) {
+		bool batch = i & 1;
+		int mult = (i & 2) ? 1 : 2;
+		bool poll = i & 4;
+
+		ret = test_overflow_handling(batch, mult, poll);
+		if (ret) {
+			fprintf(stderr, "test_overflow_handling("
+				"batch=%d, mult=%d, poll=%d) failed\n",
+				batch, mult, poll);
+			goto err;
+		}
+	}
+
 	ret = test_overflow();
 	if (ret) {
-		printf("test_overflow failed\n");
+		fprintf(stderr, "test_overflow failed\n");
 		return ret;
 	}
 
-- 
2.30.2





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux