Re: [PATCH 12/12] io_uring: add support for ring mapped supplied buffers

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 5/1/22 6:25 AM, Jens Axboe wrote:
>>> @@ -1564,21 +1585,30 @@ static inline void
>>> io_req_set_rsrc_node(struct io_kiocb *req,
>>>  
>>>  static unsigned int __io_put_kbuf(struct io_kiocb *req, struct
>>> list_head *list)
>>>  {
>>> -       struct io_buffer *kbuf = req->kbuf;
>>>         unsigned int cflags;
>>>  
>>> -       cflags = IORING_CQE_F_BUFFER | (kbuf->bid <<
>>> IORING_CQE_BUFFER_SHIFT);
>>> -       req->flags &= ~REQ_F_BUFFER_SELECTED;
>>> -       list_add(&kbuf->list, list);
>>> -       req->kbuf = NULL;
>>> -       return cflags;
>>> +       if (req->flags & REQ_F_BUFFER_RING) {
>>> +               if (req->buf_list)
>>> +                       req->buf_list->tail++;
>>
>> does this need locking? both on buf_list being available or atomic
>> increment on tail.
> 
> This needs some comments and checks around the expectation. But the idea
> is that the fast path will invoke eg the recv with the uring_lock
> already held, and we'll hold it until we complete it.
> 
> Basically we have two cases:
> 
> 1) Op invoked with uring_lock held. Either the request completes
>    successfully in this invocation, and we put the kbuf with it still
>    held. The completion just increments the tail, buf now consumed. Or
>    we need to retry somehow, and we can just clear REQ_F_BUFFER_RING to
>    recycle the buf, that's it.
> 
> 2) Op invoked without uring_lock held. We get a buf and increment the
>    tail, as we'd otherwise need to grab it again for the completion.
>    We're now stuck with the buf, hold it until the request completes.
> 
> #1 is the above code, just need some checks and safe guards to ensure
> that if ->buf_list is still set, we are still holding the lock.

Here's a debug patch that does a lock sequence check for it. Totally
untested, but it'd be able to catch a violation of the above.

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 28654864201e..51e3536befe2 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -389,6 +389,7 @@ struct io_ring_ctx {
 	/* submission data */
 	struct {
 		struct mutex		uring_lock;
+		int			__lock_seq;
 
 		/*
 		 * Ring buffer of indices into array of io_uring_sqe, which is
@@ -1004,6 +1005,7 @@ struct io_kiocb {
 		 struct {
 			 struct io_buffer_list	*buf_list;
 			 __u32			bid;
+			 int			buf_lock_seq;
 		 };
 	};
 
@@ -1443,11 +1445,32 @@ static inline bool io_file_need_scm(struct file *filp)
 }
 #endif
 
+static inline void ctx_lock(struct io_ring_ctx *ctx)
+{
+	mutex_lock(&ctx->uring_lock);
+	ctx->__lock_seq++;
+}
+
+static inline bool ctx_trylock(struct io_ring_ctx *ctx)
+{
+	if (mutex_trylock(&ctx->uring_lock)) {
+		ctx->__lock_seq++;
+		return true;
+	}
+	return false;
+}
+
+static inline void ctx_unlock(struct io_ring_ctx *ctx)
+{
+	ctx->__lock_seq++;
+	mutex_unlock(&ctx->uring_lock);
+}
+
 static void io_ring_submit_unlock(struct io_ring_ctx *ctx, unsigned issue_flags)
 {
 	lockdep_assert_held(&ctx->uring_lock);
 	if (issue_flags & IO_URING_F_UNLOCKED)
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 }
 
 static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags)
@@ -1459,14 +1482,14 @@ static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags)
 	 * from an async worker thread, grab the lock for that case.
 	 */
 	if (issue_flags & IO_URING_F_UNLOCKED)
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	lockdep_assert_held(&ctx->uring_lock);
 }
 
 static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
 {
 	if (!*locked) {
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		*locked = true;
 	}
 }
@@ -1588,8 +1611,10 @@ static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 	unsigned int cflags;
 
 	if (req->flags & REQ_F_BUFFER_RING) {
-		if (req->buf_list)
+		if (req->buf_list) {
+			WARN_ON_ONCE(req->buf_lock_seq != req->ctx->__lock_seq);
 			req->buf_list->tail++;
+		}
 
 		cflags = req->bid << IORING_CQE_BUFFER_SHIFT;
 		req->flags &= ~REQ_F_BUFFER_RING;
@@ -1777,7 +1802,7 @@ static __cold void io_fallback_req_func(struct work_struct *work)
 
 	if (locked) {
 		io_submit_flush_completions(ctx);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	percpu_ref_put(&ctx->refs);
 }
@@ -2237,10 +2262,10 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
 	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
 		/* iopoll syncs against uring_lock, not completion_lock */
 		if (ctx->flags & IORING_SETUP_IOPOLL)
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 		ret = __io_cqring_overflow_flush(ctx, false);
 		if (ctx->flags & IORING_SETUP_IOPOLL)
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 	}
 
 	return ret;
@@ -2698,7 +2723,7 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
 		return;
 	if (*locked) {
 		io_submit_flush_completions(ctx);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		*locked = false;
 	}
 	percpu_ref_put(&ctx->refs);
@@ -2731,7 +2756,7 @@ static void handle_prev_tw_list(struct io_wq_work_node *node,
 			ctx_flush_and_put(*ctx, uring_locked);
 			*ctx = req->ctx;
 			/* if not contended, grab and improve batching */
-			*uring_locked = mutex_trylock(&(*ctx)->uring_lock);
+			*uring_locked = ctx_trylock(*ctx);
 			percpu_ref_get(&(*ctx)->refs);
 			if (unlikely(!*uring_locked))
 				spin_lock(&(*ctx)->completion_lock);
@@ -2762,7 +2787,7 @@ static void handle_tw_list(struct io_wq_work_node *node,
 			ctx_flush_and_put(*ctx, locked);
 			*ctx = req->ctx;
 			/* if not contended, grab and improve batching */
-			*locked = mutex_trylock(&(*ctx)->uring_lock);
+			*locked = ctx_trylock(*ctx);
 			percpu_ref_get(&(*ctx)->refs);
 		}
 		req->io_task_work.func(req, locked);
@@ -3126,7 +3151,7 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
 	if (!(ctx->flags & IORING_SETUP_IOPOLL))
 		return;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	while (!wq_list_empty(&ctx->iopoll_list)) {
 		/* let it sleep and repeat later if can't complete a request */
 		if (io_do_iopoll(ctx, true) == 0)
@@ -3137,12 +3162,12 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
 		 * Also let task_work, etc. to progress by releasing the mutex
 		 */
 		if (need_resched()) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			cond_resched();
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 		}
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 }
 
 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
@@ -3183,9 +3208,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
 		if (wq_list_empty(&ctx->iopoll_list)) {
 			u32 tail = ctx->cached_cq_tail;
 
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			io_run_task_work();
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 
 			/* some requests don't go through iopoll_list */
 			if (tail != ctx->cached_cq_tail ||
@@ -3347,7 +3372,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
 
 	/* workqueue context doesn't hold uring_lock, grab it now */
 	if (unlikely(needs_lock))
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 
 	/*
 	 * Track whether we have multiple files in our lists. This will impact
@@ -3385,7 +3410,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
 		    wq_has_sleeper(&ctx->sq_data->wait))
 			wake_up(&ctx->sq_data->wait);
 
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 }
 
@@ -3674,8 +3699,10 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
 	req->buf_list = bl;
 	req->bid = buf->bid;
 
-	if (!(issue_flags & IO_URING_F_UNLOCKED))
+	if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+		req->buf_lock_seq = req->ctx->__lock_seq;
 		return u64_to_user_ptr(buf->addr);
+	}
 
 	/*
 	 * If we came in unlocked, we have no choice but to
@@ -8740,7 +8767,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
 		if (ctx->sq_creds != current_cred())
 			creds = override_creds(ctx->sq_creds);
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		if (!wq_list_empty(&ctx->iopoll_list))
 			io_do_iopoll(ctx, true);
 
@@ -8751,7 +8778,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
 		if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
 		    !(ctx->flags & IORING_SETUP_R_DISABLED))
 			ret = io_submit_sqes(ctx, to_submit);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 
 		if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
 			wake_up(&ctx->sqo_sq_wait);
@@ -9165,17 +9192,17 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
 		/* kill initial ref, already quiesced if zero */
 		if (atomic_dec_and_test(&data->refs))
 			break;
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		flush_delayed_work(&ctx->rsrc_put_work);
 		ret = wait_for_completion_interruptible(&data->done);
 		if (!ret) {
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 			if (atomic_read(&data->refs) > 0) {
 				/*
 				 * it has been revived by another thread while
 				 * we were unlocked
 				 */
-				mutex_unlock(&ctx->uring_lock);
+				ctx_unlock(ctx);
 			} else {
 				break;
 			}
@@ -9187,7 +9214,7 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
 		reinit_completion(&data->done);
 
 		ret = io_run_task_work_sig();
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	} while (ret >= 0);
 	data->quiesce = false;
 
@@ -9572,7 +9599,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
 
 		if (prsrc->tag) {
 			if (ctx->flags & IORING_SETUP_IOPOLL)
-				mutex_lock(&ctx->uring_lock);
+				ctx_lock(ctx);
 
 			spin_lock(&ctx->completion_lock);
 			io_fill_cqe_aux(ctx, prsrc->tag, 0, 0);
@@ -9581,7 +9608,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
 			io_cqring_ev_posted(ctx);
 
 			if (ctx->flags & IORING_SETUP_IOPOLL)
-				mutex_unlock(&ctx->uring_lock);
+				ctx_unlock(ctx);
 		}
 
 		rsrc_data->do_put(ctx, prsrc);
@@ -9879,19 +9906,19 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx,
 	struct io_wq_data data;
 	unsigned int concurrency;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	hash = ctx->hash_map;
 	if (!hash) {
 		hash = kzalloc(sizeof(*hash), GFP_KERNEL);
 		if (!hash) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			return ERR_PTR(-ENOMEM);
 		}
 		refcount_set(&hash->refs, 1);
 		init_waitqueue_head(&hash->wait);
 		ctx->hash_map = hash;
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	data.hash = hash;
 	data.task = task;
@@ -10642,7 +10669,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
 	struct io_submit_state *state = &ctx->submit_state;
 	int nr = 0;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	io_flush_cached_locked_reqs(ctx, state);
 
 	while (!io_req_cache_empty(ctx)) {
@@ -10656,7 +10683,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
 	}
 	if (nr)
 		percpu_ref_put_many(&ctx->refs, nr);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 }
 
 static void io_wait_rsrc_data(struct io_rsrc_data *data)
@@ -10691,7 +10718,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_wait_rsrc_data(ctx->buf_data);
 	io_wait_rsrc_data(ctx->file_data);
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	if (ctx->buf_data)
 		__io_sqe_buffers_unregister(ctx);
 	if (ctx->file_data)
@@ -10700,7 +10727,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 		__io_cqring_overflow_flush(ctx, true);
 	io_eventfd_unregister(ctx);
 	io_flush_apoll_cache(ctx);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	io_destroy_buffers(ctx);
 	if (ctx->sq_creds)
 		put_cred(ctx->sq_creds);
@@ -10859,7 +10886,7 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 	 * completion_lock, see io_req_task_submit(). Apart from other work,
 	 * this lock/unlock section also waits them to finish.
 	 */
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	while (!list_empty(&ctx->tctx_list)) {
 		WARN_ON_ONCE(time_after(jiffies, timeout));
 
@@ -10871,11 +10898,11 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 		if (WARN_ON_ONCE(ret))
 			continue;
 
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 		wait_for_completion(&exit.completion);
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	spin_lock(&ctx->completion_lock);
 	spin_unlock(&ctx->completion_lock);
 
@@ -10910,13 +10937,13 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	unsigned long index;
 	struct creds *creds;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	percpu_ref_kill(&ctx->refs);
 	if (ctx->rings)
 		__io_cqring_overflow_flush(ctx, true);
 	xa_for_each(&ctx->personalities, index, creds)
 		io_unregister_personality(ctx, index);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	/* failed during ring init, it couldn't have issued any requests */
 	if (ctx->rings) {
@@ -10991,7 +11018,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
 	enum io_wq_cancel cret;
 	bool ret = false;
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
 		struct io_uring_task *tctx = node->task->io_uring;
 
@@ -11004,7 +11031,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
 		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
 		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
 	}
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 
 	return ret;
 }
@@ -11091,9 +11118,9 @@ static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx)
 			return ret;
 		}
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		list_add(&node->ctx_node, &ctx->tctx_list);
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	tctx->last = ctx;
 	return 0;
@@ -11128,9 +11155,9 @@ static __cold void io_uring_del_tctx_node(unsigned long index)
 	WARN_ON_ONCE(current != node->task);
 	WARN_ON_ONCE(list_empty(&node->ctx_node));
 
-	mutex_lock(&node->ctx->uring_lock);
+	ctx_lock(node->ctx);
 	list_del(&node->ctx_node);
-	mutex_unlock(&node->ctx->uring_lock);
+	ctx_unlock(node->ctx);
 
 	if (tctx->last == node->ctx)
 		tctx->last = NULL;
@@ -11296,9 +11323,9 @@ static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
 	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
 		return -EINVAL;
 
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	ret = io_uring_add_tctx_node(ctx);
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	if (ret)
 		return ret;
 
@@ -11583,15 +11610,15 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 		if (unlikely(ret))
 			goto out;
 
-		mutex_lock(&ctx->uring_lock);
+		ctx_lock(ctx);
 		ret = io_submit_sqes(ctx, to_submit);
 		if (ret != to_submit) {
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			goto out;
 		}
 		if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
 			goto iopoll_locked;
-		mutex_unlock(&ctx->uring_lock);
+		ctx_unlock(ctx);
 	}
 	if (flags & IORING_ENTER_GETEVENTS) {
 		int ret2;
@@ -11602,7 +11629,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 			 * prevent racing with polled issue that got punted to
 			 * a workqueue.
 			 */
-			mutex_lock(&ctx->uring_lock);
+			ctx_lock(ctx);
 iopoll_locked:
 			ret2 = io_validate_ext_arg(flags, argp, argsz);
 			if (likely(!ret2)) {
@@ -11610,7 +11637,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 						   ctx->cq_entries);
 				ret2 = io_iopoll_check(ctx, min_complete);
 			}
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 		} else {
 			const sigset_t __user *sig;
 			struct __kernel_timespec __user *ts;
@@ -12372,9 +12399,9 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
 			 * a ref to the ctx.
 			 */
 			refcount_inc(&sqd->refs);
-			mutex_unlock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			mutex_lock(&sqd->lock);
-			mutex_lock(&ctx->uring_lock);
+			ctx_unlock(ctx);
 			if (sqd->thread)
 				tctx = sqd->thread->io_uring;
 		}
@@ -12668,9 +12695,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
 
 	io_run_task_work();
 
-	mutex_lock(&ctx->uring_lock);
+	ctx_lock(ctx);
 	ret = __io_uring_register(ctx, opcode, arg, nr_args);
-	mutex_unlock(&ctx->uring_lock);
+	ctx_unlock(ctx);
 	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
 out_fput:
 	fdput(f);

-- 
Jens Axboe




[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux