[PATCH] io_uring: refactor io_sq_thread management

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Xiaoguang Wang <xiaoguang.wang@xxxxxxxxxxxxxxxxx>
---
 fs/io_uring.c | 257 +++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 232 insertions(+), 25 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 6e51140a5722..f49653cd9f41 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -258,8 +258,13 @@ struct io_ring_ctx {
 	/* IO offload */
 	struct io_wq		*io_wq;
 	struct task_struct	*sqo_thread;	/* if using sq thread polling */
+	wait_queue_head_t	*sqo_wait;
+	int			submit_status;
+	int			sq_thread_cpu;
+	struct list_head	node;
+
 	struct mm_struct	*sqo_mm;
-	wait_queue_head_t	sqo_wait;
+	wait_queue_head_t	__sqo_wait;
 
 	/*
 	 * If used, fixed file set. Writers must ensure that ->refs is dead,
@@ -330,6 +335,16 @@ struct io_ring_ctx {
 	struct work_struct		exit_work;
 };
 
+struct io_percpu_thread {
+	struct list_head ctx_list;
+	wait_queue_head_t sqo_percpu_wait;
+	struct mutex lock;
+	struct task_struct *sqo_thread;
+	unsigned int sq_thread_idle;
+};
+
+static struct io_percpu_thread __percpu *percpu_threads;
+
 /*
  * First field must be the file pointer in all the
  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
@@ -934,6 +949,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	ctx->flags = p->flags;
 	init_waitqueue_head(&ctx->cq_wait);
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
+	INIT_LIST_HEAD(&ctx->node);
 	init_completion(&ctx->completions[0]);
 	init_completion(&ctx->completions[1]);
 	idr_init(&ctx->io_buffer_idr);
@@ -1162,8 +1178,8 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 {
 	if (waitqueue_active(&ctx->wait))
 		wake_up(&ctx->wait);
-	if (waitqueue_active(&ctx->sqo_wait))
-		wake_up(&ctx->sqo_wait);
+	if (waitqueue_active(ctx->sqo_wait))
+		wake_up(ctx->sqo_wait);
 	if (io_should_trigger_evfd(ctx))
 		eventfd_signal(ctx->cq_ev_fd, 1);
 }
@@ -1989,8 +2005,8 @@ static void io_iopoll_req_issued(struct io_kiocb *req)
 		list_add_tail(&req->list, &ctx->poll_list);
 
 	if ((ctx->flags & IORING_SETUP_SQPOLL) &&
-	    wq_has_sleeper(&ctx->sqo_wait))
-		wake_up(&ctx->sqo_wait);
+	    wq_has_sleeper(ctx->sqo_wait))
+		wake_up(ctx->sqo_wait);
 }
 
 static void io_file_put(struct io_submit_state *state)
@@ -6006,7 +6022,7 @@ static int io_sq_thread(void *data)
 				continue;
 			}
 
-			prepare_to_wait(&ctx->sqo_wait, &wait,
+			prepare_to_wait(ctx->sqo_wait, &wait,
 						TASK_INTERRUPTIBLE);
 
 			/*
@@ -6018,7 +6034,7 @@ static int io_sq_thread(void *data)
 			 */
 			if ((ctx->flags & IORING_SETUP_IOPOLL) &&
 			    !list_empty_careful(&ctx->poll_list)) {
-				finish_wait(&ctx->sqo_wait, &wait);
+				finish_wait(ctx->sqo_wait, &wait);
 				continue;
 			}
 
@@ -6031,23 +6047,23 @@ static int io_sq_thread(void *data)
 			if (!to_submit || ret == -EBUSY ||
 			    percpu_ref_is_dying(&ctx->refs)) {
 				if (kthread_should_park()) {
-					finish_wait(&ctx->sqo_wait, &wait);
+					finish_wait(ctx->sqo_wait, &wait);
 					break;
 				}
 				if (current->task_works) {
 					task_work_run();
-					finish_wait(&ctx->sqo_wait, &wait);
+					finish_wait(ctx->sqo_wait, &wait);
 					continue;
 				}
 				if (signal_pending(current))
 					flush_signals(current);
 				schedule();
-				finish_wait(&ctx->sqo_wait, &wait);
+				finish_wait(ctx->sqo_wait, &wait);
 
 				ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
 				continue;
 			}
-			finish_wait(&ctx->sqo_wait, &wait);
+			finish_wait(ctx->sqo_wait, &wait);
 
 			ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
 		}
@@ -6071,6 +6087,133 @@ static int io_sq_thread(void *data)
 	return 0;
 }
 
+static int process_ctx(struct io_ring_ctx *ctx)
+{
+	unsigned int to_submit;
+	int ret = 0;
+
+	if (!list_empty(&ctx->poll_list)) {
+		unsigned nr_events = 0;
+
+		mutex_lock(&ctx->uring_lock);
+		if (!list_empty(&ctx->poll_list))
+			io_iopoll_getevents(ctx, &nr_events, 0);
+		mutex_unlock(&ctx->uring_lock);
+	}
+
+	to_submit = io_sqring_entries(ctx);
+	if (to_submit) {
+		mutex_lock(&ctx->uring_lock);
+		if (likely(!percpu_ref_is_dying(&ctx->refs)))
+			ret = io_submit_sqes(ctx, to_submit, NULL, -1, true);
+		mutex_unlock(&ctx->uring_lock);
+	}
+
+	if (current->task_works)
+		task_work_run();
+
+	io_sq_thread_drop_mm(ctx);
+	return ret;
+}
+
+static int io_sq_percpu_thread(void *data)
+{
+	struct io_percpu_thread *t = data;
+	struct io_ring_ctx *ctx, *tmp;
+	mm_segment_t old_fs;
+	const struct cred *saved_creds, *cur_creds, *old_creds;
+	unsigned long timeout;
+	DEFINE_WAIT(wait);
+	int iters = 0;
+
+	timeout = jiffies + t->sq_thread_idle;
+	old_fs = get_fs();
+	set_fs(USER_DS);
+	saved_creds = cur_creds = NULL;
+	while (!kthread_should_park()) {
+		bool continue_run;
+		bool needs_wait;
+		unsigned int to_submit;
+
+		mutex_lock(&t->lock);
+again:
+		continue_run = false;
+		list_for_each_entry_safe(ctx, tmp, &t->ctx_list, node) {
+			if (cur_creds != ctx->creds) {
+				old_creds = override_creds(ctx->creds);
+				cur_creds = ctx->creds;
+				if (saved_creds)
+					put_cred(old_creds);
+				else
+					saved_creds = old_creds;
+			}
+			ctx->submit_status = process_ctx(ctx);
+
+			to_submit = io_sqring_entries(ctx);
+			if (!continue_run &&
+			    ((to_submit && ctx->submit_status != -EBUSY) ||
+			    !list_empty(&ctx->poll_list)))
+				continue_run = true;
+		}
+		if (continue_run && (++iters & 7)) {
+			timeout = jiffies + t->sq_thread_idle;
+			goto again;
+		}
+		mutex_unlock(&t->lock);
+		if (continue_run) {
+			timeout = jiffies + t->sq_thread_idle;
+			continue;
+		}
+		if (!time_after(jiffies, timeout)) {
+			cond_resched();
+			continue;
+		}
+
+		needs_wait = true;
+		prepare_to_wait(&t->sqo_percpu_wait, &wait, TASK_INTERRUPTIBLE);
+		mutex_lock(&t->lock);
+		list_for_each_entry_safe(ctx, tmp, &t->ctx_list, node) {
+			if ((ctx->flags & IORING_SETUP_IOPOLL) &&
+			    !list_empty_careful(&ctx->poll_list)) {
+				needs_wait = false;
+				break;
+			}
+			to_submit = io_sqring_entries(ctx);
+			if (to_submit && ctx->submit_status != -EBUSY) {
+				needs_wait = false;
+				break;
+			}
+		}
+		if (needs_wait) {
+			list_for_each_entry_safe(ctx, tmp, &t->ctx_list, node)
+				ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
+			smp_mb();
+
+		}
+		mutex_unlock(&t->lock);
+
+		if (needs_wait) {
+			schedule();
+			mutex_lock(&t->lock);
+				list_for_each_entry_safe(ctx, tmp,
+							 &t->ctx_list, node)
+					ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
+			mutex_unlock(&t->lock);
+			finish_wait(&t->sqo_percpu_wait, &wait);
+		} else
+			finish_wait(&t->sqo_percpu_wait, &wait);
+		timeout = jiffies + t->sq_thread_idle;
+		cond_resched();
+	}
+
+	if (current->task_works)
+		task_work_run();
+	set_fs(old_fs);
+	revert_creds(saved_creds);
+	kthread_parkme();
+	return 0;
+}
+
 struct io_wait_queue {
 	struct wait_queue_entry wq;
 	struct io_ring_ctx *ctx;
@@ -6232,18 +6375,23 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
 	return 0;
 }
 
+static void destroy_io_percpu_thread(struct io_ring_ctx *ctx, int cpu);
+
 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 {
 	if (ctx->sqo_thread) {
-		wait_for_completion(&ctx->completions[1]);
-		/*
-		 * The park is a bit of a work-around, without it we get
-		 * warning spews on shutdown with SQPOLL set and affinity
-		 * set to a single CPU.
-		 */
-		kthread_park(ctx->sqo_thread);
-		kthread_stop(ctx->sqo_thread);
-		ctx->sqo_thread = NULL;
+		if (!(ctx->flags & IORING_SETUP_SQ_AFF)) {
+			wait_for_completion(&ctx->completions[1]);
+			/*
+			 * The park is a bit of a work-around, without it we get
+			 * warning spews on shutdown with SQPOLL set and affinity
+			 * set to a single CPU.
+			 */
+			kthread_park(ctx->sqo_thread);
+			kthread_stop(ctx->sqo_thread);
+			ctx->sqo_thread = NULL;
+		} else
+			destroy_io_percpu_thread(ctx, ctx->sq_thread_cpu);
 	}
 }
 
@@ -6854,12 +7002,59 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx,
 	return ret;
 }
 
+static void create_io_percpu_thread(struct io_ring_ctx *ctx, int cpu)
+{
+	struct io_percpu_thread *t;
+
+	t = per_cpu_ptr(percpu_threads, cpu);
+	mutex_lock(&t->lock);
+	if (!t->sqo_thread) {
+		t->sqo_thread = kthread_create_on_cpu(io_sq_percpu_thread, t,
+					cpu, "io_uring_percpu-sq");
+		if (IS_ERR(t->sqo_thread)) {
+			ctx->sqo_thread = t->sqo_thread;
+			t->sqo_thread = NULL;
+			mutex_unlock(&t->lock);
+			return;
+		}
+	}
+
+	if (t->sq_thread_idle < ctx->sq_thread_idle)
+		t->sq_thread_idle = ctx->sq_thread_idle;
+	ctx->sqo_wait = &t->sqo_percpu_wait;
+	ctx->sq_thread_cpu = cpu;
+	list_add_tail(&ctx->node, &t->ctx_list);
+	ctx->sqo_thread = t->sqo_thread;
+	mutex_unlock(&t->lock);
+}
+
+static void destroy_io_percpu_thread(struct io_ring_ctx *ctx, int cpu)
+{
+	struct io_percpu_thread *t;
+	struct task_struct *sqo_thread = NULL;
+
+	t = per_cpu_ptr(percpu_threads, cpu);
+	mutex_lock(&t->lock);
+	list_del(&ctx->node);
+	if (list_empty(&t->ctx_list)) {
+		sqo_thread = t->sqo_thread;
+		t->sqo_thread = NULL;
+	}
+	mutex_unlock(&t->lock);
+
+	if (sqo_thread) {
+		kthread_park(sqo_thread);
+		kthread_stop(sqo_thread);
+	}
+}
+
 static int io_sq_offload_start(struct io_ring_ctx *ctx,
 			       struct io_uring_params *p)
 {
 	int ret;
 
-	init_waitqueue_head(&ctx->sqo_wait);
+	init_waitqueue_head(&ctx->__sqo_wait);
+	ctx->sqo_wait = &ctx->__sqo_wait;
 	mmgrab(current->mm);
 	ctx->sqo_mm = current->mm;
 
@@ -6881,9 +7076,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
 			if (!cpu_online(cpu))
 				goto err;
 
-			ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
-							ctx, cpu,
-							"io_uring-sq");
+			create_io_percpu_thread(ctx, cpu);
 		} else {
 			ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
 							"io_uring-sq");
@@ -7531,7 +7724,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 		if (!list_empty_careful(&ctx->cq_overflow_list))
 			io_cqring_overflow_flush(ctx, false);
 		if (flags & IORING_ENTER_SQ_WAKEUP)
-			wake_up(&ctx->sqo_wait);
+			wake_up(ctx->sqo_wait);
 		submitted = to_submit;
 	} else if (to_submit) {
 		mutex_lock(&ctx->uring_lock);
@@ -8117,6 +8310,8 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
 
 static int __init io_uring_init(void)
 {
+	int cpu;
+
 #define __BUILD_BUG_VERIFY_ELEMENT(stype, eoffset, etype, ename) do { \
 	BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
 	BUILD_BUG_ON(sizeof(etype) != sizeof_field(stype, ename)); \
@@ -8156,6 +8351,18 @@ static int __init io_uring_init(void)
 	BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
 	BUILD_BUG_ON(__REQ_F_LAST_BIT >= 8 * sizeof(int));
 	req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
+
+	percpu_threads = alloc_percpu(struct io_percpu_thread);
+	for_each_possible_cpu(cpu) {
+		struct io_percpu_thread *t;
+
+		t = per_cpu_ptr(percpu_threads, cpu);
+		INIT_LIST_HEAD(&t->ctx_list);
+		init_waitqueue_head(&t->sqo_percpu_wait);
+		mutex_init(&t->lock);
+		t->sqo_thread = NULL;
+		t->sq_thread_idle = 0;
+	}
 	return 0;
 };
 __initcall(io_uring_init);
-- 
2.17.2




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux