On Fri, 1 Apr 2022 at 17:36, Jens Axboe <axboe@xxxxxxxxx> wrote: > I take it you're continually reusing those slots? Yes. > If you have a test > case that'd be ideal. Agree that it sounds like we just need an > appropriate breather to allow fput/task_work to run. Or it could be the > deferral free of the fixed slot. Adding a breather could make the worst case latency be large. I think doing the fput synchronously would be better in general. I test this on an VM with 8G of memory and run the following: ./forkbomb 14 & # wait till 16k processes are forked for i in `seq 1 100`; do ./procreads u; done You can compare performance with plain reads (./procreads p), the other tests don't work on public kernels. Thanks, Miklos
#define _GNU_SOURCE #include <stdio.h> #include <fcntl.h> #include <string.h> #include <stdlib.h> #include <dirent.h> #include <unistd.h> #include <err.h> #include "liburing.h" #define CHECK_NEGERR(_expr) \ ({ typeof(_expr) _ret = (_expr); if (_ret < 0) { errno = -_ret; err(1, #_expr); } _ret; }) #define CHECK_NULL(_expr) \ ({ typeof(_expr) _ret = (_expr); if (_ret == NULL) { errx(1, #_expr " returned NULL"); } _ret; }) #define CHECK_ERR(_expr) \ ({ typeof(_expr) _ret = (_expr); if (_ret == -1) { err(1, #_expr); } _ret; }) struct name_val { char *name; /* in */ struct iovec value_in; /* in */ struct iovec value_out; /* out */ uint32_t error; /* out */ uint32_t reserved; }; static bool debug; static const char *proc_list[] = { "stat", "status", "cmdline", "cgroup" }; #define proc_num (sizeof(proc_list)/sizeof(proc_list[0])) #define batch 10 int getvalues(int dfd, const char *path, struct name_val *vec, size_t num, unsigned int flags) { return syscall(451, dfd, path, vec, num, flags); } static void print_val(const char *name, struct name_val *nv) { const char *s = nv->value_out.iov_base; size_t len = nv->value_out.iov_len; const size_t prmax = 40; int prlen = len < prmax ? len : prmax; const char *cont = len < prmax ? "" : "..."; if (nv->error) printf("/proc/%s/%s = ERROR %s (%i)\n", name, nv->name, strerror(nv->error), nv->error); else if (debug) printf("/proc/%s/%s = \"%.*s\"%s (len=%zi)\n", name, nv->name, prlen, s, cont, len); } static void print_values(const char *name, struct name_val *vec, size_t num, ssize_t ret) { int i; if (ret < 0) { errno = -ret; warn("getvalues failed"); } else { if ((size_t) ret < num) warnx("%zi values read out of %zi", ret, num); for (i = 0; i < ret; i++) print_val(name, &vec[i]); } } static ssize_t readfile_plain(int dfd, const char *path, char *buf, size_t size) { int fd; ssize_t ret; fd = openat(dfd, path, O_RDONLY); if (fd == -1) return -errno; ret = read(fd, buf, size); if (ret == -1) ret = -errno; else if ((size_t) ret == size) ret = -EOVERFLOW; close(fd); return ret; } static int readfiles_plain(int dfd, const char *path, struct name_val *vec, size_t num, int mode) { struct name_val *nv; ssize_t ret; size_t i; if (path[0]) dfd = CHECK_ERR(openat(dfd, path, O_PATH)); for (i = 0; i < num; i++) { nv = &vec[i]; if (mode) { CHECK_ERR(getvalues(dfd, "", nv, 1, mode == 2)); } else { ret = readfile_plain(dfd, nv->name, nv->value_in.iov_base, nv->value_in.iov_len); if (ret < 0) { nv->error = -ret; } else { nv->error = 0; nv->value_out.iov_base = nv->value_in.iov_base; nv->value_out.iov_len = ret; } } } if (path[0]) close(dfd); return num; } static int readfiles_uring(struct io_uring *ring, int dfd, const char *path, struct name_val *vec, size_t num) { struct io_uring_sqe *sqe; struct io_uring_cqe *cqe; size_t slot; int ret, i; static int seq = 1; struct name_val *nv; if (path[0]) dfd = CHECK_ERR(openat(dfd, path, O_PATH)); for (slot = 0; slot < num; slot++) { nv = &vec[slot]; sqe = io_uring_get_sqe(ring); io_uring_prep_openat_direct(sqe, dfd, nv->name, O_RDONLY, 0, slot); sqe->flags = IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS; sqe->user_data = seq + slot * 2; sqe = io_uring_get_sqe(ring); io_uring_prep_read(sqe, slot, nv->value_in.iov_base, nv->value_in.iov_len, 0); sqe->flags = IOSQE_FIXED_FILE; sqe->user_data = seq + slot * 2 + 1; } ret = CHECK_NEGERR(io_uring_submit_and_wait(ring, num)); ret /= 2; for (i = 0; i < ret; i++) { CHECK_NEGERR(io_uring_wait_cqe(ring, &cqe)); slot = (cqe->user_data - seq) / 2; nv = &vec[slot]; if (cqe->res < 0) { nv->error = -cqe->res; } else if ((size_t) cqe->res < nv->value_in.iov_len) { nv->error = 0; nv->value_out.iov_base = nv->value_in.iov_base; nv->value_out.iov_len = cqe->res; } else { nv->error = EOVERFLOW; } io_uring_cqe_seen(ring, cqe); } seq += 2 * num; if (path[0]) close(dfd); return ret; } static const char *next_name(DIR *dp) { const char *name; struct dirent *de; while ((de = readdir(dp))) { name = de->d_name; if (name[0] > '0' && name[0] <= '9') return name; } return NULL; } static size_t next_batch(DIR *dp, struct name_val *vec, size_t num, const char **namep) { const char *name; size_t i; if (batch == 1) { name = next_name(dp); if (!name) return 0; *namep = name; return 1; } *namep = ""; for (i = 0; i < num; i++) { if (i % proc_num == 0 && (name = next_name(dp)) == NULL) break; free(vec[i].name); vec[i].name = CHECK_NULL(malloc(128)); sprintf(vec[i].name, "%s/%s", name, proc_list[i % proc_num]); } return i; } static void test_uring(DIR *dp, struct name_val *vec, size_t num) { int fds[proc_num * batch]; const size_t numslots = sizeof(fds)/sizeof(fds[0]); struct io_uring ring; const char *name; ssize_t ret; memset(fds, -1, sizeof(fds)); CHECK_NEGERR(io_uring_queue_init(num * 2, &ring, 0)); CHECK_NEGERR(io_uring_register_files(&ring, fds, numslots)); while ((num = next_batch(dp, vec, num, &name))) { ret = readfiles_uring(&ring, dirfd(dp), name, vec, num); print_values(name, vec, num, ret); } io_uring_queue_exit(&ring); } static void test_plain(DIR *dp, struct name_val *vec, size_t num, int mode) { const char *name; ssize_t ret; while ((num = next_batch(dp, vec, num, &name))) { ret = readfiles_plain(dirfd(dp), name, vec, num, mode); print_values(name, vec, num, ret); } } static void test_values(DIR *dp, struct name_val *vec, size_t num, bool rf) { const char *name; ssize_t ret; while ((num = next_batch(dp, vec, num, &name))) { ret = getvalues(dirfd(dp), name, vec, num, rf); print_values(name, vec, num, ret); } } int main(int argc, char *argv[]) { const size_t num = proc_num * batch; char buf[num][4096]; struct name_val vec[num]; DIR *dp; size_t i; char type = 'p'; if (argc > 1) type = argv[1][0]; if (argc > 2) debug = true; for (i = 0; i < num; i++) { vec[i].value_in.iov_base = (type != 'w' || !i) ? buf[i] : NULL; vec[i].value_in.iov_len = sizeof(buf[i]); } dp = CHECK_NULL(opendir("/proc")); switch (type) { case 'p': test_plain(dp, vec, num, 0); break; case 'r': test_plain(dp, vec, num, 1); break; case 's': test_plain(dp, vec, num, 2); break; case 'u': test_uring(dp, vec, num); break; case 'w': vec[0].value_in.iov_len = sizeof(buf[0]) * num; /* fallthrough */ case 'v': case 'z': test_values(dp, vec, num, type == 'z'); break; } closedir(dp); return 0; }
#include <unistd.h> #include <stdio.h> #include <err.h> #include <pthread.h> #include <stdlib.h> static void *run(void *) { sleep(1000); return NULL; } int main(int argc, char *argv[]) { int pid, level, i; pthread_t thr; int maxlevel = atoi(argv[1]); for (level = 0; level < maxlevel; level++) { pid = fork(); if (pid == -1) err(1, "fork"); fprintf(stderr, "."); #if 0 if (pid == 0) { for (i = 0; i < 4; i++) pthread_create(&thr, NULL, run, NULL); } #endif } sleep(1000); }