The following changes since commit 46961748140e291f0f1e966bac9aad1a3b9ad1c7: engines/libaio: increase RLIMIT_MEMLOCK for user buffers (2018-12-04 11:27:02 -0700) are available in the Git repository at: git://git.kernel.dk/fio.git master for you to fetch changes up to 0527b23f9112b59dc63f3b40a7f40d45c48ec60d: t/aio-ring: updates/cleanups (2018-12-10 15:14:36 -0700) ---------------------------------------------------------------- Jens Axboe (2): Add aio-ring test app t/aio-ring: updates/cleanups Makefile | 7 + t/aio-ring.c | 424 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 t/aio-ring.c --- Diff of recent changes: diff --git a/Makefile b/Makefile index 5ac568e9..284621d3 100644 --- a/Makefile +++ b/Makefile @@ -263,6 +263,9 @@ T_VS_PROGS = t/fio-verify-state T_PIPE_ASYNC_OBJS = t/read-to-pipe-async.o T_PIPE_ASYNC_PROGS = t/read-to-pipe-async +T_AIO_RING_OBJS = t/aio-ring.o +T_AIO_RING_PROGS = t/aio-ring + T_MEMLOCK_OBJS = t/memlock.o T_MEMLOCK_PROGS = t/memlock @@ -281,6 +284,7 @@ T_OBJS += $(T_VS_OBJS) T_OBJS += $(T_PIPE_ASYNC_OBJS) T_OBJS += $(T_MEMLOCK_OBJS) T_OBJS += $(T_TT_OBJS) +T_OBJS += $(T_AIO_RING_OBJS) ifneq (,$(findstring CYGWIN,$(CONFIG_TARGET_OS))) T_DEDUPE_OBJS += os/windows/posix.o lib/hweight.o @@ -440,6 +444,9 @@ cairo_text_helpers.o: cairo_text_helpers.c cairo_text_helpers.h printing.o: printing.c printing.h $(QUIET_CC)$(CC) $(CFLAGS) $(GTK_CFLAGS) $(CPPFLAGS) -c $< +t/aio-ring: $(T_AIO_RING_OBJS) + $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_AIO_RING_OBJS) $(LIBS) + t/read-to-pipe-async: $(T_PIPE_ASYNC_OBJS) $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_PIPE_ASYNC_OBJS) $(LIBS) diff --git a/t/aio-ring.c b/t/aio-ring.c new file mode 100644 index 00000000..c6106348 --- /dev/null +++ b/t/aio-ring.c @@ -0,0 +1,424 @@ +/* + * gcc -D_GNU_SOURCE -Wall -O2 -o aio-ring aio-ring.c -lpthread -laio + */ +#include <stdio.h> +#include <errno.h> +#include <assert.h> +#include <stdlib.h> +#include <stddef.h> +#include <signal.h> +#include <inttypes.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/ioctl.h> +#include <sys/syscall.h> +#include <sys/resource.h> +#include <linux/fs.h> +#include <fcntl.h> +#include <unistd.h> +#include <libaio.h> +#include <string.h> +#include <pthread.h> +#include <sched.h> + +#define IOCB_FLAG_HIPRI (1 << 2) + +#define IOCTX_FLAG_IOPOLL (1 << 1) +#define IOCTX_FLAG_USERIOCB (1 << 0) +#define IOCTX_FLAG_FIXEDBUFS (1 << 2) +#define IOCTX_FLAG_SCQRING (1 << 3) /* Use SQ/CQ rings */ +#define IOCTX_FLAG_SQTHREAD (1 << 4) /* Use SQ thread */ +#define IOCTX_FLAG_SQWQ (1 << 5) /* Use SQ wq */ + +#define barrier() __asm__ __volatile__("": : :"memory") + +#define min(a, b) ((a < b) ? (a) : (b)) + +typedef uint32_t u32; + +struct aio_iocb_ring { + union { + struct { + u32 head, tail; + u32 nr_events; + u32 sq_thread_cpu; + }; + struct iocb pad_iocb; + }; + struct iocb iocbs[0]; +}; + +struct aio_io_event_ring { + union { + struct { + u32 head, tail; + u32 nr_events; + }; + struct io_event pad_event; + }; + struct io_event events[0]; +}; + +#define IORING_FLAG_SUBMIT (1 << 0) +#define IORING_FLAG_GETEVENTS (1 << 1) + +#define DEPTH 32 +#define RING_SIZE (DEPTH + 1) + +#define BATCH_SUBMIT 8 +#define BATCH_COMPLETE 8 + +#define BS 4096 + +struct submitter { + pthread_t thread; + unsigned long max_blocks; + io_context_t ioc; + struct drand48_data rand; + struct aio_iocb_ring *sq_ring; + struct aio_io_event_ring *cq_ring; + int inflight; + unsigned long reaps; + unsigned long done; + unsigned long calls; + volatile int finish; + char filename[128]; +}; + +static struct submitter submitters[1]; +static volatile int finish; + +static int polled = 1; /* use IO polling */ +static int fixedbufs = 1; /* use fixed user buffers */ +static int buffered = 0; /* use buffered IO, not O_DIRECT */ +static int sq_thread = 0; /* use kernel submission thread */ +static int sq_thread_cpu = 0; /* pin above thread to this CPU */ + +static int io_setup2(unsigned int nr_events, unsigned int flags, + struct iocb *iocbs, struct aio_iocb_ring *sq_ring, + struct aio_io_event_ring *cq_ring, io_context_t *ctx_idp) +{ + return syscall(335, nr_events, flags, iocbs, sq_ring, cq_ring, ctx_idp); +} + +static int io_ring_enter(io_context_t ctx, unsigned int to_submit, + unsigned int min_complete, unsigned int flags) +{ + return syscall(336, ctx, to_submit, min_complete, flags); +} + +static int gettid(void) +{ + return syscall(__NR_gettid); +} + +static void init_io(struct submitter *s, int fd, struct iocb *iocb) +{ + unsigned long offset; + long r; + + lrand48_r(&s->rand, &r); + offset = (r % (s->max_blocks - 1)) * BS; + + iocb->aio_fildes = fd; + iocb->aio_lio_opcode = IO_CMD_PREAD; + iocb->u.c.offset = offset; + if (polled) + iocb->u.c.flags = IOCB_FLAG_HIPRI; + if (!fixedbufs) + iocb->u.c.nbytes = BS; +} + +static int prep_more_ios(struct submitter *s, int fd, int max_ios) +{ + struct aio_iocb_ring *ring = s->sq_ring; + struct iocb *iocb; + u32 tail, next_tail, prepped = 0; + + next_tail = tail = ring->tail; + do { + next_tail++; + if (next_tail == ring->nr_events) + next_tail = 0; + + barrier(); + if (next_tail == ring->head) + break; + + iocb = &s->sq_ring->iocbs[tail]; + init_io(s, fd, iocb); + prepped++; + tail = next_tail; + } while (prepped < max_ios); + + if (ring->tail != tail) { + /* order tail store with writes to iocbs above */ + barrier(); + ring->tail = tail; + barrier(); + } + return prepped; +} + +static int get_file_size(int fd, unsigned long *blocks) +{ + struct stat st; + + if (fstat(fd, &st) < 0) + return -1; + if (S_ISBLK(st.st_mode)) { + unsigned long long bytes; + + if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) + return -1; + + *blocks = bytes / BS; + return 0; + } else if (S_ISREG(st.st_mode)) { + *blocks = st.st_size / BS; + return 0; + } + + return -1; +} + +static int reap_events(struct submitter *s) +{ + struct aio_io_event_ring *ring = s->cq_ring; + struct io_event *ev; + u32 head, reaped = 0; + + head = ring->head; + do { + barrier(); + if (head == ring->tail) + break; + ev = &ring->events[head]; + if (ev->res != BS) { + int index = (int) (uintptr_t) ev->obj; + struct iocb *iocb = &s->sq_ring->iocbs[index]; + + printf("io: unexpected ret=%ld\n", ev->res); + printf("offset=%lu, size=%lu\n", (unsigned long) iocb->u.c.offset, (unsigned long) iocb->u.c.nbytes); + return -1; + } + reaped++; + head++; + if (head == ring->nr_events) + head = 0; + } while (1); + + s->inflight -= reaped; + ring->head = head; + barrier(); + return reaped; +} + +static void *submitter_fn(void *data) +{ + struct submitter *s = data; + int fd, ret, prepped, flags; + + printf("submitter=%d\n", gettid()); + + flags = O_RDONLY; + if (!buffered) + flags |= O_DIRECT; + fd = open(s->filename, flags); + if (fd < 0) { + perror("open"); + goto done; + } + + if (get_file_size(fd, &s->max_blocks)) { + printf("failed getting size of device/file\n"); + goto err; + } + if (!s->max_blocks) { + printf("Zero file/device size?\n"); + goto err; + } + + s->max_blocks--; + + srand48_r(pthread_self(), &s->rand); + + prepped = 0; + do { + int to_wait, flags, to_submit, this_reap; + + if (!prepped && s->inflight < DEPTH) + prepped = prep_more_ios(s, fd, min(DEPTH - s->inflight, BATCH_SUBMIT)); + s->inflight += prepped; +submit_more: + to_submit = prepped; +submit: + if (s->inflight + BATCH_SUBMIT < DEPTH) + to_wait = 0; + else + to_wait = min(s->inflight + to_submit, BATCH_COMPLETE); + + flags = IORING_FLAG_GETEVENTS; + if (to_submit) + flags |= IORING_FLAG_SUBMIT; + + ret = io_ring_enter(s->ioc, to_submit, to_wait, flags); + s->calls++; + + this_reap = reap_events(s); + if (this_reap == -1) + break; + s->reaps += this_reap; + + if (ret >= 0) { + if (!ret) { + to_submit = 0; + if (s->inflight) + goto submit; + continue; + } else if (ret < to_submit) { + int diff = to_submit - ret; + + s->done += ret; + prepped -= diff; + goto submit_more; + } + s->done += ret; + prepped = 0; + continue; + } else if (ret < 0) { + if ((ret == -1 && errno == EAGAIN) || ret == -EAGAIN) { + if (s->finish) + break; + if (this_reap) + goto submit; + to_submit = 0; + goto submit; + } + if (ret == -1) + printf("io_submit: %s\n", strerror(errno)); + else + printf("io_submit: %s\n", strerror(-ret)); + break; + } + } while (!s->finish); +err: + close(fd); +done: + finish = 1; + return NULL; +} + +static void sig_int(int sig) +{ + printf("Exiting on signal %d\n", sig); + submitters[0].finish = 1; + finish = 1; +} + +static void arm_sig_int(void) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + act.sa_handler = sig_int; + act.sa_flags = SA_RESTART; + sigaction(SIGINT, &act, NULL); +} + +int main(int argc, char *argv[]) +{ + struct submitter *s = &submitters[0]; + unsigned long done, calls, reap; + int flags = 0, err; + int j; + size_t size; + void *p, *ret; + struct rlimit rlim; + + if (argc < 2) { + printf("%s: filename\n", argv[0]); + return 1; + } + + rlim.rlim_cur = RLIM_INFINITY; + rlim.rlim_max = RLIM_INFINITY; + if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) { + perror("setrlimit"); + return 1; + } + + arm_sig_int(); + + size = sizeof(struct aio_iocb_ring) + RING_SIZE * sizeof(struct iocb); + if (posix_memalign(&p, 4096, size)) + return 1; + s->sq_ring = p; + memset(p, 0, size); + + size = sizeof(struct aio_io_event_ring) + RING_SIZE * sizeof(struct io_event); + if (posix_memalign(&p, 4096, size)) + return 1; + s->cq_ring = p; + memset(p, 0, size); + + for (j = 0; j < RING_SIZE; j++) { + struct iocb *iocb = &s->sq_ring->iocbs[j]; + + if (posix_memalign(&iocb->u.c.buf, BS, BS)) { + printf("failed alloc\n"); + return 1; + } + iocb->u.c.nbytes = BS; + } + + flags = IOCTX_FLAG_SCQRING; + if (polled) + flags |= IOCTX_FLAG_IOPOLL; + if (fixedbufs) + flags |= IOCTX_FLAG_FIXEDBUFS; + if (buffered) + flags |= IOCTX_FLAG_SQWQ; + else if (sq_thread) { + flags |= IOCTX_FLAG_SQTHREAD; + s->sq_ring->sq_thread_cpu = sq_thread_cpu; + } + + err = io_setup2(RING_SIZE, flags, s->sq_ring->iocbs, s->sq_ring, s->cq_ring, &s->ioc); + if (err) { + printf("ctx_init failed: %s, %d\n", strerror(errno), err); + return 1; + } + printf("polled=%d, fixedbufs=%d, buffered=%d\n", polled, fixedbufs, buffered); + printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", DEPTH, s->sq_ring->nr_events, s->cq_ring->nr_events); + strcpy(s->filename, argv[1]); + + pthread_create(&s->thread, NULL, submitter_fn, s); + + reap = calls = done = 0; + do { + unsigned long this_done = 0; + unsigned long this_reap = 0; + unsigned long this_call = 0; + unsigned long rpc = 0, ipc = 0; + + sleep(1); + this_done += s->done; + this_call += s->calls; + this_reap += s->reaps; + if (this_call - calls) { + rpc = (this_done - done) / (this_call - calls); + ipc = (this_reap - reap) / (this_call - calls); + } + printf("IOPS=%lu, IOS/call=%lu/%lu, inflight=%u (head=%d tail=%d), %lu, %lu\n", + this_done - done, rpc, ipc, s->inflight, + s->cq_ring->head, s->cq_ring->tail, s->reaps, s->done); + done = this_done; + calls = this_call; + reap = this_reap; + } while (!finish); + + pthread_join(s->thread, &ret); + return 0; +}