Add tests that verify that overflow conditions behave appropriately. Specifically: * if overflow is continually flushed, then CQEs should arrive mostly in order to prevent starvation of some completions * if CQEs are dropped due to GFP_ATOMIC allocation failures it is possible to terminate cleanly. This is not tested by default as it requires debug kernel config, and also has system-wide effects Signed-off-by: Dylan Yudaken <dylany@xxxxxx> --- test/cq-overflow.c | 240 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 233 insertions(+), 7 deletions(-) diff --git a/test/cq-overflow.c b/test/cq-overflow.c index 057570e..067308a 100644 --- a/test/cq-overflow.c +++ b/test/cq-overflow.c @@ -9,6 +9,7 @@ #include <stdlib.h> #include <string.h> #include <fcntl.h> +#include <assert.h> #include "helpers.h" #include "liburing.h" @@ -21,6 +22,32 @@ static struct iovec *vecs; #define ENTRIES 8 +/* + * io_uring has rare cases where CQEs are lost. + * This happens when there is no space in the CQ ring, and also there is no + * GFP_ATOMIC memory available. In reality this probably means that the process + * is about to be killed as many other things might start failing, but we still + * want to test that liburing and the kernel deal with this properly. The fault + * injection framework allows us to test this scenario. Unfortunately this + * requires some system wide changes and so we do not enable this by default. + * The tests in this file should work in both cases (where overflows are queued + * and where they are dropped) on recent kernels. + * + * In order to test dropped CQEs you should enable fault injection in the kernel + * config: + * + * CONFIG_FAULT_INJECTION=y + * CONFIG_FAILSLAB=y + * CONFIG_FAULT_INJECTION_DEBUG_FS=y + * + * and then run the test as follows: + * echo Y > /sys/kernel/debug/failslab/task-filter + * echo 100 > /sys/kernel/debug/failslab/probability + * echo 0 > /sys/kernel/debug/failslab/verbose + * echo 100000 > /sys/kernel/debug/failslab/times + * bash -c "echo 1 > /proc/self/make-it-fail && exec ./cq-overflow.t" + */ + static int test_io(const char *file, unsigned long usecs, unsigned *drops, int fault) { struct io_uring_sqe *sqe; @@ -29,6 +56,7 @@ static int test_io(const char *file, unsigned long usecs, unsigned *drops, int f unsigned reaped, total; struct io_uring ring; int nodrop, i, fd, ret; + bool cqe_dropped = false; fd = open(file, O_RDONLY | O_DIRECT); if (fd < 0) { @@ -103,8 +131,8 @@ static int test_io(const char *file, unsigned long usecs, unsigned *drops, int f reap_it: reaped = 0; do { - if (nodrop) { - /* nodrop should never lose events */ + if (nodrop && !cqe_dropped) { + /* nodrop should never lose events unless cqe_dropped */ if (reaped == total) break; } else { @@ -112,7 +140,10 @@ reap_it: break; } ret = io_uring_wait_cqe(&ring, &cqe); - if (ret) { + if (nodrop && ret == -EBADR) { + cqe_dropped = true; + continue; + } else if (ret) { fprintf(stderr, "wait_cqe=%d\n", ret); goto err; } @@ -132,7 +163,7 @@ reap_it: goto err; } - if (!nodrop) { + if (!nodrop || cqe_dropped) { *drops = *ring.cq.koverflow; } else if (*ring.cq.koverflow) { fprintf(stderr, "Found %u overflows\n", *ring.cq.koverflow); @@ -153,18 +184,31 @@ static int reap_events(struct io_uring *ring, unsigned nr_events, int do_wait) { struct io_uring_cqe *cqe; int i, ret = 0, seq = 0; + unsigned int start_overflow = *ring->cq.koverflow; + unsigned int drop_count = 0; + bool dropped = false; for (i = 0; i < nr_events; i++) { if (do_wait) ret = io_uring_wait_cqe(ring, &cqe); else ret = io_uring_peek_cqe(ring, &cqe); - if (ret) { + if (do_wait && ret == -EBADR) { + unsigned int this_drop = *ring->cq.koverflow - + start_overflow; + + dropped = true; + drop_count += this_drop; + start_overflow = *ring->cq.koverflow; + assert(this_drop > 0); + i += (this_drop - 1); + continue; + } else if (ret) { if (ret != -EAGAIN) fprintf(stderr, "cqe peek failed: %d\n", ret); break; } - if (cqe->user_data != seq) { + if (!dropped && cqe->user_data != seq) { fprintf(stderr, "cqe sequence out-of-order\n"); fprintf(stderr, "got %d, wanted %d\n", (int) cqe->user_data, seq); @@ -241,19 +285,201 @@ err: return 1; } + +static void submit_one_nop(struct io_uring *ring, int ud) +{ + struct io_uring_sqe *sqe; + int ret; + + sqe = io_uring_get_sqe(ring); + assert(sqe); + io_uring_prep_nop(sqe); + sqe->user_data = ud; + ret = io_uring_submit(ring); + assert(ret == 1); +} + +/* + * Create an overflow condition and ensure that SQEs are still processed + */ +static int test_overflow_handling( + bool batch, + int cqe_multiple, + bool poll) +{ + struct io_uring ring; + struct io_uring_params p; + int ret, i, j, ud, cqe_count; + unsigned int count; + int const N = 8; + int const LOOPS = 128; + int const QUEUE_LENGTH = 1024; + int completions[N]; + int queue[QUEUE_LENGTH]; + int queued = 0; + int outstanding = 0; + bool cqe_dropped = false; + + memset(&completions, 0, sizeof(int) * N); + memset(&p, 0, sizeof(p)); + p.cq_entries = 2 * cqe_multiple; + p.flags |= IORING_SETUP_CQSIZE; + + if (poll) + p.flags |= IORING_SETUP_IOPOLL; + + ret = io_uring_queue_init_params(2, &ring, &p); + if (ret) { + fprintf(stderr, "io_uring_queue_init failed %d\n", ret); + return 1; + } + + assert(p.cq_entries < N); + /* submit N SQEs, some should overflow */ + for (i = 0; i < N; i++) { + submit_one_nop(&ring, i); + outstanding++; + } + + for (i = 0; i < LOOPS; i++) { + struct io_uring_cqe *cqes[N]; + + if (io_uring_cq_has_overflow(&ring)) { + /* + * Flush any overflowed CQEs and process those. Actively + * flush these to make sure CQEs arrive in vague order + * of being sent. + */ + ret = io_uring_flush_overflow(&ring); + if (ret != 0) { + fprintf(stderr, + "io_uring_flush_overflow returned %d\n", + ret); + goto err; + } + } else if (!cqe_dropped) { + for (j = 0; j < queued; j++) { + submit_one_nop(&ring, queue[j]); + outstanding++; + } + queued = 0; + } + + /* We have lost some random cqes, stop if no remaining. */ + if (cqe_dropped && outstanding == *ring.cq.koverflow) + break; + + ret = io_uring_wait_cqe(&ring, &cqes[0]); + if (ret == -EBADR) { + cqe_dropped = true; + fprintf(stderr, "CQE dropped\n"); + continue; + } else if (ret != 0) { + fprintf(stderr, "io_uring_wait_cqes failed %d\n", ret); + goto err; + } + cqe_count = 1; + if (batch) { + ret = io_uring_peek_batch_cqe(&ring, &cqes[0], 2); + if (ret < 0) { + fprintf(stderr, + "io_uring_peek_batch_cqe failed %d\n", + ret); + goto err; + } + cqe_count = ret; + } + for (j = 0; j < cqe_count; j++) { + assert(cqes[j]->user_data < N); + ud = cqes[j]->user_data; + completions[ud]++; + assert(queued < QUEUE_LENGTH); + queue[queued++] = (int)ud; + } + io_uring_cq_advance(&ring, cqe_count); + outstanding -= cqe_count; + } + + /* See if there were any drops by flushing the CQ ring *and* overflow */ + do { + struct io_uring_cqe *cqe; + + ret = io_uring_flush_overflow(&ring); + if (ret < 0) { + if (ret == -EBADR) { + fprintf(stderr, "CQE dropped\n"); + cqe_dropped = true; + break; + } + goto err; + } + if (outstanding && !io_uring_cq_ready(&ring)) + ret = io_uring_wait_cqe_timeout(&ring, &cqe, NULL); + + if (ret && ret != -ETIME) { + if (ret == -EBADR) { + fprintf(stderr, "CQE dropped\n"); + cqe_dropped = true; + break; + } + fprintf(stderr, "wait_cqe_timeout = %d\n", ret); + goto err; + } + count = io_uring_cq_ready(&ring); + io_uring_cq_advance(&ring, count); + outstanding -= count; + } while (count); + + io_uring_queue_exit(&ring); + + /* Make sure that completions come back in the same order they were + * sent. If they come back unfairly then this will concentrate on a + * couple of indices. + */ + for (i = 1; !cqe_dropped && i < N; i++) { + if (abs(completions[i] - completions[i - 1]) > 1) { + fprintf( + stderr, + "bad completion size %d %d\n", + completions[i], + completions[i - 1]); + goto err; + } + } + return 0; +err: + io_uring_queue_exit(&ring); + return 1; +} + int main(int argc, char *argv[]) { const char *fname = ".cq-overflow"; unsigned iters, drops; unsigned long usecs; int ret; + int i; if (argc > 1) return 0; + for (i = 0; i < 8; i++) { + bool batch = i & 1; + int mult = (i & 2) ? 1 : 2; + bool poll = i & 4; + + ret = test_overflow_handling(batch, mult, poll); + if (ret) { + fprintf(stderr, "test_overflow_handling(" + "batch=%d, mult=%d, poll=%d) failed\n", + batch, mult, poll); + goto err; + } + } + ret = test_overflow(); if (ret) { - printf("test_overflow failed\n"); + fprintf(stderr, "test_overflow failed\n"); return ret; } -- 2.30.2