Re: [PATCH 4/4] io_uring: optimise compl locking for non-shared rings

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 3/18/22 15:21, Jens Axboe wrote:
On 3/18/22 9:13 AM, Pavel Begunkov wrote:
On 3/18/22 14:54, Jens Axboe wrote:
On 3/18/22 7:52 AM, Pavel Begunkov wrote:
When only one task submits requests, most of CQEs are expected to be
filled from that task context so we have natural serialisation. That
would mean that in those cases we don't need spinlocking around CQE
posting. One downside is that it also mean that io-wq workers can't emit
CQEs directly but should do it through the original task context using
task_works. That may hurt latency and performance and might matter much
to some workloads, but it's not a huge deal in general as io-wq is a
slow path and there is some additional merit from tw completion
batching.

Not too worried about io-wq task_work for cq filling, it is the slower
path after all. And I think we can get away with doing notifications as
it's just for CQ filling. If the task is currently waiting in
cqring_wait, then it'll get woken anyway and it will process task work.
If it's in userspace, it doesn't need a notification. That should make
it somewhat lighter than requiring using TIF_NOTIFY_SIGNAL for that.

The feature should be opted-in by the userspace by setting a new
IORING_SETUP_PRIVATE_CQ flag. It doesn't work with IOPOLL, and also for
now only the task that created a ring can submit requests to it.

I know this is a WIP, but why do we need CQ_PRIVATE? And this needs to

One reason is because of the io-wq -> tw punting, which is not optimal
for e.g. active users of IOSQE_ASYNC. The second is because the
fundamental requirement is that only one task should be submitting
requests. Was thinking about automating it, e.g. when we register
a second tctx we go through a slow path waiting for all current tw
to complete and then removing an internal and not userspace visible
CQ_PRIVATE flag.

Was thinking something along those lines too. The alternative is setting
up the ring with SETUP_SINGLE_ISSUER or something like that, having the
application tell us that it is a single issuer and no submits are
shared across threads. Serves the same kind of purpose as CQ_PRIVATE,
but enables us to simply fail things if the task violates those
constraints. Would also be a better name I believe as it might enable
further optimizations in the future, like for example the mutex
reduction for submits.

That's exactly what it is, including the failing part. And I like
your name better, will take it

Also, as SQPOLL task is by definition the only one submitting SQEs,
was thinking about enabling it by default for them, but didn't do
because of the io-wq / IOSQE_ASYNC.

Gotcha.

work with registered files (and ring fd) as that is probably a bigger
win than skipping the completion_lock if you're not shared anyway.

It does work with fixed/registered files and registered io_uring fds.

t/io_uring fails for me with registered files or rings, getting EINVAL.
Might be user error, but that's simply just setting CQ_PRIVATE for
setup.

One thing I changed in the tool is that the ring should be created
by the submitter task, so move setup_ring into the submitter thread.
Plan to get rid of this restriction though.

Weird that it works only for you only without reg files/rings, will
take a look.

Attached io_uring.c that I used, it's based on some old version,
so do_nop can't be set in argv but should turned in the source code.
IORING_ENTER_REGISTERED_RING is always enabled.

In regards of "a bigger win", probably in many cases, but if you submit
a good batch at once, and completion tw batching doesn't kick in (e.g.
direct bdev read of not too high intensity), it might save
N spinlock/unlock when registered ring fd would kill only one pair of
fdget/fdput.

Definitely, various cases where one would be a bigger win than the
other, agree on that. But let's just ensure that both work together :-)

--
Pavel Begunkov
#include <stdio.h>
#include <errno.h>
#include <assert.h>
#include <stdlib.h>
#include <stddef.h>
#include <signal.h>
#include <inttypes.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <linux/fs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <sched.h>

#include "../arch/arch.h"
#include "../lib/types.h"
#include "../os/linux/io_uring.h"

#define min(a, b)		((a < b) ? (a) : (b))

struct io_sq_ring {
	unsigned *head;
	unsigned *tail;
	unsigned *ring_mask;
	unsigned *ring_entries;
	unsigned *flags;
	unsigned *array;
};


#define IORING_SETUP_PRIVATE_CQ	(1U << 8)

enum {
	IORING_REGISTER_RING_FDS		= 20,
};

#define IORING_ENTER_REGISTERED_RING	(1U << 4)

struct io_cq_ring {
	unsigned *head;
	unsigned *tail;
	unsigned *ring_mask;
	unsigned *ring_entries;
	struct io_uring_cqe *cqes;
};

#define DEPTH			128
#define BATCH_SUBMIT		32
#define BATCH_COMPLETE		32
#define BS			4096

#define MAX_FDS			16

static unsigned sq_ring_mask, cq_ring_mask;

struct file {
	unsigned long max_blocks;
	unsigned pending_ios;
	int real_fd;
	int fixed_fd;
};

struct submitter {
	pthread_t thread;
	int ring_fd;
	struct io_sq_ring sq_ring;
	struct io_uring_sqe *sqes;
	struct io_cq_ring cq_ring;
	int inflight;
	unsigned long reaps;
	unsigned long done;
	unsigned long calls;
	volatile int finish;

	__s32 *fds;

	struct file files[MAX_FDS];
	unsigned nr_files;
	unsigned cur_file;
	struct iovec iovecs[];
};

static struct submitter *submitter;
static volatile int finish;

static int depth = DEPTH;
static int batch_submit = BATCH_SUBMIT;
static int batch_complete = BATCH_COMPLETE;
static int bs = BS;
static int polled = 1;		/* use IO polling */
static int fixedbufs = 1;	/* use fixed user buffers */
static int register_files = 1;	/* use fixed files */
static int buffered = 0;	/* use buffered IO, not O_DIRECT */
static int sq_thread_poll = 0;	/* use kernel submission/poller thread */
static int sq_thread_cpu = -1;	/* pin above thread to this CPU */
static int do_nop = 1;		/* no-op SQ ring commands */

struct io_uring_rsrc_update {
	__u32 offset;
	__u32 resv;
	__aligned_u64 data;
};

static int vectored = 1;

static int setup_ring(struct submitter *s);

static int io_uring_register_buffers(struct submitter *s)
{
	if (do_nop)
		return 0;

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_BUFFERS, s->iovecs, depth);
}

static int io_uring_register_io_uring_fd(struct submitter *s)
{
	struct io_uring_rsrc_update up = {};

	up.offset = 0;
	up.data = s->ring_fd;

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_RING_FDS, &up, 1);
}

static int io_uring_register_files(struct submitter *s)
{
	int i;

	if (do_nop)
		return 0;

	s->fds = calloc(s->nr_files, sizeof(__s32));
	for (i = 0; i < s->nr_files; i++) {
		s->fds[i] = s->files[i].real_fd;
		s->files[i].fixed_fd = i;
	}

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_FILES, s->fds, s->nr_files);
}

static int io_uring_setup(unsigned entries, struct io_uring_params *p)
{
	return syscall(__NR_io_uring_setup, entries, p);
}

static void io_uring_probe(int fd)
{
	struct io_uring_probe *p;
	int ret;

	p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
	if (!p)
		return;

	memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
	ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
	if (ret < 0)
		goto out;

	if (IORING_OP_READ > p->ops_len)
		goto out;

	if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
		vectored = 0;
out:
	free(p);
}

static int io_uring_enter(struct submitter *s, unsigned int to_submit,
			  unsigned int min_complete, unsigned int flags)
{
	return syscall(__NR_io_uring_enter, 0, to_submit, min_complete,
			flags | IORING_ENTER_REGISTERED_RING, NULL, 0);
}

#ifndef CONFIG_HAVE_GETTID
static int gettid(void)
{
	return syscall(__NR_gettid);
}
#endif

static unsigned file_depth(struct submitter *s)
{
	return (depth + s->nr_files - 1) / s->nr_files;
}

static void init_io(struct submitter *s, unsigned index)
{
	struct io_uring_sqe *sqe = &s->sqes[index];
	unsigned long offset;
	struct file *f;
	long r;

	if (do_nop) {
		sqe->opcode = IORING_OP_NOP;
		return;
	}

	if (s->nr_files == 1) {
		f = &s->files[0];
	} else {
		f = &s->files[s->cur_file];
		if (f->pending_ios >= file_depth(s)) {
			s->cur_file++;
			if (s->cur_file == s->nr_files)
				s->cur_file = 0;
			f = &s->files[s->cur_file];
		}
	}
	f->pending_ios++;

	r = lrand48();
	offset = (r % (f->max_blocks - 1)) * bs;

	if (register_files) {
		sqe->flags = IOSQE_FIXED_FILE;
		sqe->fd = f->fixed_fd;
	} else {
		sqe->flags = 0;
		sqe->fd = f->real_fd;
	}
	if (fixedbufs) {
		sqe->opcode = IORING_OP_READ_FIXED;
		sqe->addr = (unsigned long) s->iovecs[index].iov_base;
		sqe->len = bs;
		sqe->buf_index = index;
	} else if (!vectored) {
		sqe->opcode = IORING_OP_READ;
		sqe->addr = (unsigned long) s->iovecs[index].iov_base;
		sqe->len = bs;
		sqe->buf_index = 0;
	} else {
		sqe->opcode = IORING_OP_READV;
		sqe->addr = (unsigned long) &s->iovecs[index];
		sqe->len = 1;
		sqe->buf_index = 0;
	}
	sqe->ioprio = 0;
	sqe->off = offset;
	sqe->user_data = (unsigned long) f;
}

static int prep_more_ios(struct submitter *s, int max_ios)
{
	struct io_sq_ring *ring = &s->sq_ring;
	unsigned index, tail, next_tail, prepped = 0;

	next_tail = tail = *ring->tail;
	do {
		next_tail++;
		if (next_tail == atomic_load_acquire(ring->head))
			break;

		index = tail & sq_ring_mask;
		init_io(s, index);
		ring->array[index] = index;
		prepped++;
		tail = next_tail;
	} while (prepped < max_ios);

	if (prepped)
		atomic_store_release(ring->tail, tail);
	return prepped;
}

static int get_file_size(struct file *f)
{
	struct stat st;

	if (fstat(f->real_fd, &st) < 0)
		return -1;
	if (S_ISBLK(st.st_mode)) {
		unsigned long long bytes;

		if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
			return -1;

		f->max_blocks = bytes / bs;
		return 0;
	} else if (S_ISREG(st.st_mode)) {
		f->max_blocks = st.st_size / bs;
		return 0;
	}

	return -1;
}

static int reap_events(struct submitter *s)
{
	struct io_cq_ring *ring = &s->cq_ring;
	struct io_uring_cqe *cqe;
	unsigned head, reaped = 0;

	head = *ring->head;
	do {
		struct file *f;

		read_barrier();
		if (head == atomic_load_acquire(ring->tail))
			break;
		cqe = &ring->cqes[head & cq_ring_mask];
		if (!do_nop) {
			f = (struct file *) (uintptr_t) cqe->user_data;
			f->pending_ios--;
			if (cqe->res != bs) {
				printf("io: unexpected ret=%d\n", cqe->res);
				if (polled && cqe->res == -EOPNOTSUPP)
					printf("Your filesystem/driver/kernel doesn't support polled IO\n");
				return -1;
			}
		}
		reaped++;
		head++;
	} while (1);

	if (reaped) {
		s->inflight -= reaped;
		atomic_store_release(ring->head, head);
	}
	return reaped;
}

static void *submitter_fn(void *data)
{
	struct submitter *s = data;
	struct io_sq_ring *ring = &s->sq_ring;
	int ret, prepped;

	ret = setup_ring(s);
	if (ret) {
		printf("ring setup failed: %s, %d\n", strerror(errno), ret);
		return NULL;
	}

	printf("submitter=%d\n", gettid());

	srand48(pthread_self());

	prepped = 0;
	do {
		int to_wait, to_submit, this_reap, to_prep;
		unsigned ring_flags = 0;

		if (!prepped && s->inflight < depth) {
			to_prep = min(depth - s->inflight, batch_submit);
			prepped = prep_more_ios(s, to_prep);
		}
		s->inflight += prepped;
submit_more:
		to_submit = prepped;
submit:
		if (to_submit && (s->inflight + to_submit <= depth))
			to_wait = 0;
		else
			to_wait = min(s->inflight + to_submit, batch_complete);

		/*
		 * Only need to call io_uring_enter if we're not using SQ thread
		 * poll, or if IORING_SQ_NEED_WAKEUP is set.
		 */
		if (sq_thread_poll)
			ring_flags = atomic_load_acquire(ring->flags);
		if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
			unsigned flags = 0;

			if (to_wait)
				flags = IORING_ENTER_GETEVENTS;
			if (ring_flags & IORING_SQ_NEED_WAKEUP)
				flags |= IORING_ENTER_SQ_WAKEUP;
			ret = io_uring_enter(s, to_submit, to_wait, flags);
			s->calls++;
		} else {
			/* for SQPOLL, we submitted it all effectively */
			ret = to_submit;
		}

		/*
		 * For non SQ thread poll, we already got the events we needed
		 * through the io_uring_enter() above. For SQ thread poll, we
		 * need to loop here until we find enough events.
		 */
		this_reap = 0;
		do {
			int r;
			r = reap_events(s);
			if (r == -1) {
				s->finish = 1;
				break;
			} else if (r > 0)
				this_reap += r;
		} while (sq_thread_poll && this_reap < to_wait);
		s->reaps += this_reap;

		if (ret >= 0) {
			if (!ret) {
				to_submit = 0;
				if (s->inflight)
					goto submit;
				continue;
			} else if (ret < to_submit) {
				int diff = to_submit - ret;

				s->done += ret;
				prepped -= diff;
				goto submit_more;
			}
			s->done += ret;
			prepped = 0;
			continue;
		} else if (ret < 0) {
			if (errno == EAGAIN) {
				if (s->finish)
					break;
				if (this_reap)
					goto submit;
				to_submit = 0;
				goto submit;
			}
			printf("io_submit: %s\n", strerror(errno));
			break;
		}
	} while (!s->finish);

	finish = 1;
	return NULL;
}

static void sig_int(int sig)
{
	printf("Exiting on signal %d\n", sig);
	submitter->finish = 1;
	finish = 1;
}

static void arm_sig_int(void)
{
	struct sigaction act;

	memset(&act, 0, sizeof(act));
	act.sa_handler = sig_int;
	act.sa_flags = SA_RESTART;
	sigaction(SIGINT, &act, NULL);
}

static int setup_ring(struct submitter *s)
{
	struct io_sq_ring *sring = &s->sq_ring;
	struct io_cq_ring *cring = &s->cq_ring;
	struct io_uring_params p;
	int ret, fd;
	void *ptr;

	memset(&p, 0, sizeof(p));

	if (polled && !do_nop)
		p.flags |= IORING_SETUP_IOPOLL;
	if (sq_thread_poll) {
		p.flags |= IORING_SETUP_SQPOLL;
		if (sq_thread_cpu != -1) {
			p.flags |= IORING_SETUP_SQ_AFF;
			p.sq_thread_cpu = sq_thread_cpu;
		}
	}
	p.flags |= IORING_SETUP_PRIVATE_CQ;

	fd = io_uring_setup(depth, &p);
	if (fd < 0) {
		perror("io_uring_setup");
		return 1;
	}
	s->ring_fd = fd;

	io_uring_probe(fd);

	ret = io_uring_register_io_uring_fd(s);
	if (ret < 0) {
		perror("io_uring_register_io_uring_fd");
		return 1;
	}

	if (fixedbufs) {
		ret = io_uring_register_buffers(s);
		if (ret < 0) {
			perror("io_uring_register_buffers");
			return 1;
		}
	}

	if (register_files) {
		ret = io_uring_register_files(s);
		if (ret < 0) {
			perror("io_uring_register_files");
			return 1;
		}
	}

	ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_SQ_RING);
	printf("sq_ring ptr = 0x%p\n", ptr);
	sring->head = ptr + p.sq_off.head;
	sring->tail = ptr + p.sq_off.tail;
	sring->ring_mask = ptr + p.sq_off.ring_mask;
	sring->ring_entries = ptr + p.sq_off.ring_entries;
	sring->flags = ptr + p.sq_off.flags;
	sring->array = ptr + p.sq_off.array;
	sq_ring_mask = *sring->ring_mask;

	s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_SQES);
	printf("sqes ptr    = 0x%p\n", s->sqes);

	ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_CQ_RING);
	printf("cq_ring ptr = 0x%p\n", ptr);
	cring->head = ptr + p.cq_off.head;
	cring->tail = ptr + p.cq_off.tail;
	cring->ring_mask = ptr + p.cq_off.ring_mask;
	cring->ring_entries = ptr + p.cq_off.ring_entries;
	cring->cqes = ptr + p.cq_off.cqes;
	cq_ring_mask = *cring->ring_mask;
	return 0;
}

static void file_depths(char *buf)
{
	struct submitter *s = submitter;
	char *p;
	int i;

	buf[0] = '\0';
	p = buf;
	for (i = 0; i < s->nr_files; i++) {
		struct file *f = &s->files[i];

		if (i + 1 == s->nr_files)
			p += sprintf(p, "%d", f->pending_ios);
		else
			p += sprintf(p, "%d, ", f->pending_ios);
	}
}

static void usage(char *argv)
{
	printf("%s [options] -- [filenames]\n"
		" -d <int> : IO Depth, default %d\n"
		" -s <int> : Batch submit, default %d\n"
		" -c <int> : Batch complete, default %d\n"
		" -b <int> : Block size, default %d\n"
		" -p <bool> : Polled IO, default %d\n",
		argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled);
	exit(0);
}

int main(int argc, char *argv[])
{
	struct submitter *s;
	unsigned long done, calls, reap;
	int i, flags, fd, opt;
	char *fdepths;
	void *ret;

	if (!do_nop && argc < 2) {
		printf("%s: filename [options]\n", argv[0]);
		return 1;
	}

	while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:h?")) != -1) {
		switch (opt) {
		case 'd':
			depth = atoi(optarg);
			break;
		case 's':
			batch_submit = atoi(optarg);
			break;
		case 'c':
			batch_complete = atoi(optarg);
			break;
		case 'b':
			bs = atoi(optarg);
			break;
		case 'p':
			polled = !!atoi(optarg);
			break;
		case 'B':
			fixedbufs = !!atoi(optarg);
			break;
		case 'F':
			register_files = !!atoi(optarg);
			break;
		case 'h':
		case '?':
		default:
			usage(argv[0]);
			break;
		}
	}

	submitter = malloc(sizeof(*submitter) + depth * sizeof(struct iovec));
	memset(submitter, 0, sizeof(*submitter) + depth * sizeof(struct iovec));
	s = submitter;

	flags = O_RDONLY | O_NOATIME;
	if (!buffered)
		flags |= O_DIRECT;

	i = optind;
	while (!do_nop && i < argc) {
		struct file *f;

		if (s->nr_files == MAX_FDS) {
			printf("Max number of files (%d) reached\n", MAX_FDS);
			break;
		}
		fd = open(argv[i], flags);
		if (fd < 0) {
			perror("open");
			return 1;
		}

		f = &s->files[s->nr_files];
		f->real_fd = fd;
		if (get_file_size(f)) {
			printf("failed getting size of device/file\n");
			return 1;
		}
		if (f->max_blocks <= 1) {
			printf("Zero file/device size?\n");
			return 1;
		}
		f->max_blocks--;

		printf("Added file %s\n", argv[i]);
		s->nr_files++;
		i++;
	}

	if (fixedbufs) {
		struct rlimit rlim;

		rlim.rlim_cur = RLIM_INFINITY;
		rlim.rlim_max = RLIM_INFINITY;
		if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) {
			perror("setrlimit");
			return 1;
		}
	}

	arm_sig_int();

	for (i = 0; i < depth; i++) {
		void *buf;

		if (posix_memalign(&buf, bs, bs)) {
			printf("failed alloc\n");
			return 1;
		}
		s->iovecs[i].iov_base = buf;
		s->iovecs[i].iov_len = bs;
	}

	printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d", polled, fixedbufs, register_files, buffered);
	printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", depth, 0, 0);

	pthread_create(&s->thread, NULL, submitter_fn, s);

	fdepths = malloc(8 * s->nr_files);
	reap = calls = done = 0;
	do {
		unsigned long this_done = 0;
		unsigned long this_reap = 0;
		unsigned long this_call = 0;
		unsigned long rpc = 0, ipc = 0;

		sleep(1);
		this_done += s->done;
		this_call += s->calls;
		this_reap += s->reaps;
		if (this_call - calls) {
			rpc = (this_done - done) / (this_call - calls);
			ipc = (this_reap - reap) / (this_call - calls);
		} else
			rpc = ipc = -1;
		file_depths(fdepths);
		printf("IOPS=%lu, IOS/call=%ld/%ld, inflight=%u (%s)\n",
				this_done - done, rpc, ipc, s->inflight,
				fdepths);
		done = this_done;
		calls = this_call;
		reap = this_reap;
	} while (!finish);

	pthread_join(s->thread, &ret);
	close(s->ring_fd);
	free(fdepths);
	return 0;
}

[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux