Re: [ISSUE] The time cost of IOSQE_IO_LINK

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 2/14/20 6:27 PM, Jens Axboe wrote:
> On 2/14/20 6:25 PM, Carter Li 李通洲 wrote:
>> There are at least 2 benefits over POLL->READ
>>
>> 1. Reduce a little complexity of user code, and save lots of sqes.
>> 2. Better performance. Users can’t if an operation will block without
>> issuing an extra O_NONBLOCK syscall, which ends up with always using
>> POLL->READ link. If it’s handled by kernel, we may only poll when
>> we know it’s needed.
> 
> Exactly, it'll enable the app to do read/recv or write/send without
> having to worry about anything, and it'll be as efficient as having
> it linked to a poll command.

Couldn't help myself... The below is the general direction, but not
done by any stretch of the imagination. There are a few hacks in there.
But, in short, it does allow eg send/recv to behave in an async manner
without needing the thread offload. Tested it with the test/send_recv
and test/send_recvmsg test apps, and it works there. There seems to be
some weird issue with eg test/socket-rw, not sure what that is yet.

Just wanted to throw it out there. It's essentially the same as the
linked poll, except it's just done internally.


commit d1fc97c5f132eaf39c06783925bd11dca9fa3ecd
Author: Jens Axboe <axboe@xxxxxxxxx>
Date:   Fri Feb 14 22:23:12 2020 -0700

    io_uring: use poll driven retry for files that support it
    
    Currently io_uring tries any request in a non-blocking manner, if it can,
    and then retries from a worker thread if we got -EAGAIN. Now that we have
    a new and fancy poll based retry backend, use that to retry requests if
    the file supports it.
    
    This means that, for example, an IORING_OP_RECVMSG on a socket no longer
    requires an async thread to complete the IO. If we get -EAGAIN reading
    from the socket in a non-blocking manner, we arm a poll handler for
    notification on when the socket becomes readable. When it does, the
    pending read is executed directly by the task again, through the io_uring
    scheduler handlers.
    
    Note that this is very much a work-in-progress, and it doesn't (yet) pass
    the full test suite. Notable missing features:
    
    - With the work queued up async, we have a common method for locating it
      for cancelation purposes. I need to add cancel tracking for poll
      managed requests.
    
    - Need to double check req->apoll life time.
    
    - Probably a lot I don't quite recall right now...
    
    It does work for the basic read/write, send/recv, etc testing I've
    tried.
    
    Signed-off-by: Jens Axboe <axboe@xxxxxxxxx>

diff --git a/fs/io_uring.c b/fs/io_uring.c
index fb94b8bac638..530dcd91fa53 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -577,6 +577,7 @@ struct io_kiocb {
 		struct {
 			struct task_struct	*task;
 			struct list_head	task_list;
+			struct io_poll_iocb	*apoll;
 		};
 		struct io_wq_work	work;
 	};
@@ -623,6 +624,9 @@ struct io_op_def {
 	unsigned		file_table : 1;
 	/* needs ->fs */
 	unsigned		needs_fs : 1;
+	/* set if opcode supports polled "wait" */
+	unsigned		pollin : 1;
+	unsigned		pollout : 1;
 };
 
 static const struct io_op_def io_op_defs[] = {
@@ -632,6 +636,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_WRITEV] = {
 		.async_ctx		= 1,
@@ -639,6 +644,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.hash_reg_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollout		= 1,
 	},
 	[IORING_OP_FSYNC] = {
 		.needs_file		= 1,
@@ -646,11 +652,13 @@ static const struct io_op_def io_op_defs[] = {
 	[IORING_OP_READ_FIXED] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_WRITE_FIXED] = {
 		.needs_file		= 1,
 		.hash_reg_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollout		= 1,
 	},
 	[IORING_OP_POLL_ADD] = {
 		.needs_file		= 1,
@@ -666,6 +674,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.needs_fs		= 1,
+		.pollout		= 1,
 	},
 	[IORING_OP_RECVMSG] = {
 		.async_ctx		= 1,
@@ -673,6 +682,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.needs_fs		= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_TIMEOUT] = {
 		.async_ctx		= 1,
@@ -684,6 +694,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
 		.file_table		= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_ASYNC_CANCEL] = {},
 	[IORING_OP_LINK_TIMEOUT] = {
@@ -695,6 +706,7 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_FALLOCATE] = {
 		.needs_file		= 1,
@@ -723,11 +735,13 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_WRITE] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollout		= 1,
 	},
 	[IORING_OP_FADVISE] = {
 		.needs_file		= 1,
@@ -739,11 +753,13 @@ static const struct io_op_def io_op_defs[] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollout		= 1,
 	},
 	[IORING_OP_RECV] = {
 		.needs_mm		= 1,
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
+		.pollin			= 1,
 	},
 	[IORING_OP_OPENAT2] = {
 		.needs_file		= 1,
@@ -2228,6 +2244,139 @@ static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	return 0;
 }
 
+struct io_poll_table {
+	struct poll_table_struct pt;
+	struct io_kiocb *req;
+	int error;
+};
+
+static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
+			    struct wait_queue_head *head)
+{
+	if (unlikely(poll->head)) {
+		pt->error = -EINVAL;
+		return;
+	}
+
+	pt->error = 0;
+	poll->head = head;
+	add_wait_queue(head, &poll->wait);
+}
+
+static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
+			       struct poll_table_struct *p)
+{
+	struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
+
+	__io_queue_proc(pt->req->apoll, pt, head);
+}
+
+static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
+			   __poll_t mask)
+{
+	struct task_struct *tsk;
+	unsigned long flags;
+
+	/* for instances that support it check for an event match first: */
+	if (mask && !(mask & poll->events))
+		return 0;
+
+	list_del_init(&poll->wait.entry);
+
+	tsk = req->task;
+	req->result = mask;
+	spin_lock_irqsave(&tsk->uring_lock, flags);
+	list_add_tail(&req->task_list, &tsk->uring_work);
+	spin_unlock_irqrestore(&tsk->uring_lock, flags);
+	wake_up_process(tsk);
+	return 1;
+}
+
+static int io_async_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
+			void *key)
+{
+	struct io_kiocb *req = wait->private;
+	struct io_poll_iocb *poll = req->apoll;
+
+	trace_io_uring_poll_wake(req->ctx, req->opcode, req->user_data, key_to_poll(key));
+	return __io_async_wake(req, poll, key_to_poll(key));
+}
+
+static bool io_arm_poll_handler(struct io_kiocb *req, int *retry_count)
+{
+	const struct io_op_def *def = &io_op_defs[req->opcode];
+	struct io_ring_ctx *ctx = req->ctx;
+	struct io_poll_iocb *poll;
+	struct io_poll_table ipt;
+	bool cancel = false;
+	__poll_t mask;
+
+	if (!file_can_poll(req->file))
+		return false;
+	if (req->flags & REQ_F_MUST_PUNT)
+		return false;
+	if (!def->pollin && !def->pollout)
+		return false;
+
+	poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
+	if (unlikely(!poll))
+		return false;
+
+	req->task = current;
+	INIT_LIST_HEAD(&req->task_list);
+	req->apoll = poll;
+
+	if (def->pollin)
+		mask = POLLIN;
+	if (def->pollout)
+		mask |= POLLOUT;
+
+	poll->file = req->file;
+	poll->head = NULL;
+	poll->done = poll->canceled = false;
+	poll->events = mask;
+	ipt.pt._qproc = io_async_queue_proc;
+	ipt.pt._key = mask;
+	ipt.req = req;
+	ipt.error = -EINVAL;
+
+	INIT_LIST_HEAD(&poll->wait.entry);
+	init_waitqueue_func_entry(&poll->wait, io_async_wake);
+	poll->wait.private = req;
+
+	mask = vfs_poll(req->file, &ipt.pt) & poll->events;
+
+	spin_lock_irq(&ctx->completion_lock);
+	if (likely(poll->head)) {
+		spin_lock(&poll->head->lock);
+		if (unlikely(list_empty(&poll->wait.entry))) {
+			if (ipt.error)
+				cancel = true;
+			ipt.error = 0;
+			mask = 0;
+		}
+		if (mask || ipt.error)
+			list_del_init(&poll->wait.entry);
+		else if (cancel)
+			WRITE_ONCE(poll->canceled, true);
+#if 0
+		Needs tracking for cancelation
+		else if (!poll->done) /* actually waiting for an event */
+			io_poll_req_insert(req);
+#endif
+		spin_unlock(&poll->head->lock);
+	}
+	if (mask) {
+		ipt.error = 0;
+		poll->done = true;
+		(*retry_count)++;
+	}
+	spin_unlock_irq(&ctx->completion_lock);
+	trace_io_uring_poll_arm(ctx, req->opcode, req->user_data, mask,
+					poll->events);
+	return true;
+}
+
 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
 		   bool force_nonblock)
 {
@@ -3569,44 +3718,16 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 {
 	struct io_poll_iocb *poll = wait->private;
 	struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
-	__poll_t mask = key_to_poll(key);
-	struct task_struct *tsk;
-	unsigned long flags;
-
-	/* for instances that support it check for an event match first: */
-	if (mask && !(mask & poll->events))
-		return 0;
-
-	list_del_init(&poll->wait.entry);
 
-	tsk = req->task;
-	req->result = mask;
-	spin_lock_irqsave(&tsk->uring_lock, flags);
-	list_add_tail(&req->task_list, &tsk->uring_work);
-	spin_unlock_irqrestore(&tsk->uring_lock, flags);
-	wake_up_process(tsk);
-	return 1;
+	return __io_async_wake(req, poll, key_to_poll(key));
 }
 
-struct io_poll_table {
-	struct poll_table_struct pt;
-	struct io_kiocb *req;
-	int error;
-};
-
 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
 			       struct poll_table_struct *p)
 {
 	struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
 
-	if (unlikely(pt->req->poll.head)) {
-		pt->error = -EINVAL;
-		return;
-	}
-
-	pt->error = 0;
-	pt->req->poll.head = head;
-	add_wait_queue(head, &pt->req->poll.wait);
+	__io_queue_proc(&pt->req->poll, pt, head);
 }
 
 static void io_poll_req_insert(struct io_kiocb *req)
@@ -4617,11 +4738,13 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_kiocb *linked_timeout;
 	struct io_kiocb *nxt = NULL;
+	int retry_count = 0;
 	int ret;
 
 again:
 	linked_timeout = io_prep_linked_timeout(req);
 
+issue:
 	ret = io_issue_sqe(req, sqe, &nxt, true);
 
 	/*
@@ -4630,6 +4753,14 @@ static void __io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	 */
 	if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
 	    (req->flags & REQ_F_MUST_PUNT))) {
+
+		if (io_arm_poll_handler(req, &retry_count)) {
+			if (retry_count == 1)
+				goto issue;
+			else if (!retry_count)
+				goto done_req;
+			INIT_IO_WORK(&req->work, io_wq_submit_work);
+		}
 punt:
 		if (io_op_defs[req->opcode].file_table) {
 			ret = io_grab_files(req);
@@ -5154,26 +5285,40 @@ void io_uring_task_handler(struct task_struct *tsk)
 {
 	LIST_HEAD(local_list);
 	struct io_kiocb *req;
+	long state;
 
 	spin_lock_irq(&tsk->uring_lock);
 	if (!list_empty(&tsk->uring_work))
 		list_splice_init(&tsk->uring_work, &local_list);
 	spin_unlock_irq(&tsk->uring_lock);
 
+	state = current->state;
+	__set_current_state(TASK_RUNNING);
 	while (!list_empty(&local_list)) {
 		struct io_kiocb *nxt = NULL;
+		void *to_free = NULL;
 
 		req = list_first_entry(&local_list, struct io_kiocb, task_list);
 		list_del(&req->task_list);
 
-		io_poll_task_handler(req, &nxt);
+		if (req->opcode == IORING_OP_POLL_ADD) {
+			io_poll_task_handler(req, &nxt);
+		} else {
+			nxt = req;
+			to_free = req->apoll;
+			WARN_ON_ONCE(!list_empty(&req->apoll->wait.entry));
+		}
 		if (nxt)
 			__io_queue_sqe(nxt, NULL);
 
+		kfree(to_free);
+
 		/* finish next time, if we're out of time slice */
-		if (need_resched())
+		if (need_resched() && !(current->flags & PF_EXITING))
 			break;
 	}
+	WARN_ON_ONCE(current->state != TASK_RUNNING);
+	__set_current_state(state);
 
 	if (!list_empty(&local_list)) {
 		spin_lock_irq(&tsk->uring_lock);
diff --git a/include/trace/events/io_uring.h b/include/trace/events/io_uring.h
index 27bd9e4f927b..133a16472423 100644
--- a/include/trace/events/io_uring.h
+++ b/include/trace/events/io_uring.h
@@ -357,6 +357,60 @@ TRACE_EVENT(io_uring_submit_sqe,
 			  __entry->force_nonblock, __entry->sq_thread)
 );
 
+TRACE_EVENT(io_uring_poll_arm,
+
+	TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask, int events),
+
+	TP_ARGS(ctx, opcode, user_data, mask, events),
+
+	TP_STRUCT__entry (
+		__field(  void *,	ctx		)
+		__field(  u8,		opcode		)
+		__field(  u64,		user_data	)
+		__field(  int,		mask		)
+		__field(  int,		events		)
+	),
+
+	TP_fast_assign(
+		__entry->ctx		= ctx;
+		__entry->opcode		= opcode;
+		__entry->user_data	= user_data;
+		__entry->mask		= mask;
+		__entry->events		= events;
+	),
+
+	TP_printk("ring %p, op %d, data 0x%llx, mask 0x%x, events 0x%x",
+			  __entry->ctx, __entry->opcode,
+			  (unsigned long long) __entry->user_data,
+			  __entry->mask, __entry->events)
+);
+
+TRACE_EVENT(io_uring_poll_wake,
+
+	TP_PROTO(void *ctx, u8 opcode, u64 user_data, int mask),
+
+	TP_ARGS(ctx, opcode, user_data, mask),
+
+	TP_STRUCT__entry (
+		__field(  void *,	ctx		)
+		__field(  u8,		opcode		)
+		__field(  u64,		user_data	)
+		__field(  int,		mask		)
+	),
+
+	TP_fast_assign(
+		__entry->ctx		= ctx;
+		__entry->opcode		= opcode;
+		__entry->user_data	= user_data;
+		__entry->mask		= mask;
+	),
+
+	TP_printk("ring %p, op %d, data 0x%llx, mask 0x%x",
+			  __entry->ctx, __entry->opcode,
+			  (unsigned long long) __entry->user_data,
+			  __entry->mask)
+);
+
 #endif /* _TRACE_IO_URING_H */
 
 /* This part must be outside protection */
diff --git a/kernel/exit.c b/kernel/exit.c
index 2833ffb0c211..988799763f34 100644
--- a/kernel/exit.c
+++ b/kernel/exit.c
@@ -716,6 +716,7 @@ void __noreturn do_exit(long code)
 	profile_task_exit(tsk);
 	kcov_task_exit(tsk);
 
+	WARN_ON(!list_empty(&tsk->uring_work));
 	WARN_ON(blk_needs_flush_plug(tsk));
 
 	if (unlikely(in_interrupt()))

-- 
Jens Axboe




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux