On 5/1/22 6:25 AM, Jens Axboe wrote: >>> @@ -1564,21 +1585,30 @@ static inline void >>> io_req_set_rsrc_node(struct io_kiocb *req, >>> >>> static unsigned int __io_put_kbuf(struct io_kiocb *req, struct >>> list_head *list) >>> { >>> - struct io_buffer *kbuf = req->kbuf; >>> unsigned int cflags; >>> >>> - cflags = IORING_CQE_F_BUFFER | (kbuf->bid << >>> IORING_CQE_BUFFER_SHIFT); >>> - req->flags &= ~REQ_F_BUFFER_SELECTED; >>> - list_add(&kbuf->list, list); >>> - req->kbuf = NULL; >>> - return cflags; >>> + if (req->flags & REQ_F_BUFFER_RING) { >>> + if (req->buf_list) >>> + req->buf_list->tail++; >> >> does this need locking? both on buf_list being available or atomic >> increment on tail. > > This needs some comments and checks around the expectation. But the idea > is that the fast path will invoke eg the recv with the uring_lock > already held, and we'll hold it until we complete it. > > Basically we have two cases: > > 1) Op invoked with uring_lock held. Either the request completes > successfully in this invocation, and we put the kbuf with it still > held. The completion just increments the tail, buf now consumed. Or > we need to retry somehow, and we can just clear REQ_F_BUFFER_RING to > recycle the buf, that's it. > > 2) Op invoked without uring_lock held. We get a buf and increment the > tail, as we'd otherwise need to grab it again for the completion. > We're now stuck with the buf, hold it until the request completes. > > #1 is the above code, just need some checks and safe guards to ensure > that if ->buf_list is still set, we are still holding the lock. Here's a debug patch that does a lock sequence check for it. Totally untested, but it'd be able to catch a violation of the above. diff --git a/fs/io_uring.c b/fs/io_uring.c index 28654864201e..51e3536befe2 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -389,6 +389,7 @@ struct io_ring_ctx { /* submission data */ struct { struct mutex uring_lock; + int __lock_seq; /* * Ring buffer of indices into array of io_uring_sqe, which is @@ -1004,6 +1005,7 @@ struct io_kiocb { struct { struct io_buffer_list *buf_list; __u32 bid; + int buf_lock_seq; }; }; @@ -1443,11 +1445,32 @@ static inline bool io_file_need_scm(struct file *filp) } #endif +static inline void ctx_lock(struct io_ring_ctx *ctx) +{ + mutex_lock(&ctx->uring_lock); + ctx->__lock_seq++; +} + +static inline bool ctx_trylock(struct io_ring_ctx *ctx) +{ + if (mutex_trylock(&ctx->uring_lock)) { + ctx->__lock_seq++; + return true; + } + return false; +} + +static inline void ctx_unlock(struct io_ring_ctx *ctx) +{ + ctx->__lock_seq++; + mutex_unlock(&ctx->uring_lock); +} + static void io_ring_submit_unlock(struct io_ring_ctx *ctx, unsigned issue_flags) { lockdep_assert_held(&ctx->uring_lock); if (issue_flags & IO_URING_F_UNLOCKED) - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags) @@ -1459,14 +1482,14 @@ static void io_ring_submit_lock(struct io_ring_ctx *ctx, unsigned issue_flags) * from an async worker thread, grab the lock for that case. */ if (issue_flags & IO_URING_F_UNLOCKED) - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); lockdep_assert_held(&ctx->uring_lock); } static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked) { if (!*locked) { - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); *locked = true; } } @@ -1588,8 +1611,10 @@ static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list) unsigned int cflags; if (req->flags & REQ_F_BUFFER_RING) { - if (req->buf_list) + if (req->buf_list) { + WARN_ON_ONCE(req->buf_lock_seq != req->ctx->__lock_seq); req->buf_list->tail++; + } cflags = req->bid << IORING_CQE_BUFFER_SHIFT; req->flags &= ~REQ_F_BUFFER_RING; @@ -1777,7 +1802,7 @@ static __cold void io_fallback_req_func(struct work_struct *work) if (locked) { io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } percpu_ref_put(&ctx->refs); } @@ -2237,10 +2262,10 @@ static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { /* iopoll syncs against uring_lock, not completion_lock */ if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); ret = __io_cqring_overflow_flush(ctx, false); if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } return ret; @@ -2698,7 +2723,7 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked) return; if (*locked) { io_submit_flush_completions(ctx); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); *locked = false; } percpu_ref_put(&ctx->refs); @@ -2731,7 +2756,7 @@ static void handle_prev_tw_list(struct io_wq_work_node *node, ctx_flush_and_put(*ctx, uring_locked); *ctx = req->ctx; /* if not contended, grab and improve batching */ - *uring_locked = mutex_trylock(&(*ctx)->uring_lock); + *uring_locked = ctx_trylock(*ctx); percpu_ref_get(&(*ctx)->refs); if (unlikely(!*uring_locked)) spin_lock(&(*ctx)->completion_lock); @@ -2762,7 +2787,7 @@ static void handle_tw_list(struct io_wq_work_node *node, ctx_flush_and_put(*ctx, locked); *ctx = req->ctx; /* if not contended, grab and improve batching */ - *locked = mutex_trylock(&(*ctx)->uring_lock); + *locked = ctx_trylock(*ctx); percpu_ref_get(&(*ctx)->refs); } req->io_task_work.func(req, locked); @@ -3126,7 +3151,7 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) if (!(ctx->flags & IORING_SETUP_IOPOLL)) return; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); while (!wq_list_empty(&ctx->iopoll_list)) { /* let it sleep and repeat later if can't complete a request */ if (io_do_iopoll(ctx, true) == 0) @@ -3137,12 +3162,12 @@ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx) * Also let task_work, etc. to progress by releasing the mutex */ if (need_resched()) { - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); cond_resched(); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); } } - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } static int io_iopoll_check(struct io_ring_ctx *ctx, long min) @@ -3183,9 +3208,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) if (wq_list_empty(&ctx->iopoll_list)) { u32 tail = ctx->cached_cq_tail; - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); io_run_task_work(); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); /* some requests don't go through iopoll_list */ if (tail != ctx->cached_cq_tail || @@ -3347,7 +3372,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) /* workqueue context doesn't hold uring_lock, grab it now */ if (unlikely(needs_lock)) - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); /* * Track whether we have multiple files in our lists. This will impact @@ -3385,7 +3410,7 @@ static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags) wq_has_sleeper(&ctx->sq_data->wait)) wake_up(&ctx->sq_data->wait); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } } @@ -3674,8 +3699,10 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len, req->buf_list = bl; req->bid = buf->bid; - if (!(issue_flags & IO_URING_F_UNLOCKED)) + if (!(issue_flags & IO_URING_F_UNLOCKED)) { + req->buf_lock_seq = req->ctx->__lock_seq; return u64_to_user_ptr(buf->addr); + } /* * If we came in unlocked, we have no choice but to @@ -8740,7 +8767,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) if (ctx->sq_creds != current_cred()) creds = override_creds(ctx->sq_creds); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); if (!wq_list_empty(&ctx->iopoll_list)) io_do_iopoll(ctx, true); @@ -8751,7 +8778,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) && !(ctx->flags & IORING_SETUP_R_DISABLED)) ret = io_submit_sqes(ctx, to_submit); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait)) wake_up(&ctx->sqo_sq_wait); @@ -9165,17 +9192,17 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data, /* kill initial ref, already quiesced if zero */ if (atomic_dec_and_test(&data->refs)) break; - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); flush_delayed_work(&ctx->rsrc_put_work); ret = wait_for_completion_interruptible(&data->done); if (!ret) { - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); if (atomic_read(&data->refs) > 0) { /* * it has been revived by another thread while * we were unlocked */ - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } else { break; } @@ -9187,7 +9214,7 @@ static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data, reinit_completion(&data->done); ret = io_run_task_work_sig(); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); } while (ret >= 0); data->quiesce = false; @@ -9572,7 +9599,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) if (prsrc->tag) { if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); spin_lock(&ctx->completion_lock); io_fill_cqe_aux(ctx, prsrc->tag, 0, 0); @@ -9581,7 +9608,7 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) io_cqring_ev_posted(ctx); if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } rsrc_data->do_put(ctx, prsrc); @@ -9879,19 +9906,19 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx, struct io_wq_data data; unsigned int concurrency; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); hash = ctx->hash_map; if (!hash) { hash = kzalloc(sizeof(*hash), GFP_KERNEL); if (!hash) { - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); return ERR_PTR(-ENOMEM); } refcount_set(&hash->refs, 1); init_waitqueue_head(&hash->wait); ctx->hash_map = hash; } - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); data.hash = hash; data.task = task; @@ -10642,7 +10669,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx) struct io_submit_state *state = &ctx->submit_state; int nr = 0; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); io_flush_cached_locked_reqs(ctx, state); while (!io_req_cache_empty(ctx)) { @@ -10656,7 +10683,7 @@ static void io_req_caches_free(struct io_ring_ctx *ctx) } if (nr) percpu_ref_put_many(&ctx->refs, nr); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } static void io_wait_rsrc_data(struct io_rsrc_data *data) @@ -10691,7 +10718,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) io_wait_rsrc_data(ctx->buf_data); io_wait_rsrc_data(ctx->file_data); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); if (ctx->buf_data) __io_sqe_buffers_unregister(ctx); if (ctx->file_data) @@ -10700,7 +10727,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_cqring_overflow_flush(ctx, true); io_eventfd_unregister(ctx); io_flush_apoll_cache(ctx); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); io_destroy_buffers(ctx); if (ctx->sq_creds) put_cred(ctx->sq_creds); @@ -10859,7 +10886,7 @@ static __cold void io_ring_exit_work(struct work_struct *work) * completion_lock, see io_req_task_submit(). Apart from other work, * this lock/unlock section also waits them to finish. */ - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); while (!list_empty(&ctx->tctx_list)) { WARN_ON_ONCE(time_after(jiffies, timeout)); @@ -10871,11 +10898,11 @@ static __cold void io_ring_exit_work(struct work_struct *work) if (WARN_ON_ONCE(ret)) continue; - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); wait_for_completion(&exit.completion); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); } - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); spin_lock(&ctx->completion_lock); spin_unlock(&ctx->completion_lock); @@ -10910,13 +10937,13 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) unsigned long index; struct creds *creds; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); percpu_ref_kill(&ctx->refs); if (ctx->rings) __io_cqring_overflow_flush(ctx, true); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); /* failed during ring init, it couldn't have issued any requests */ if (ctx->rings) { @@ -10991,7 +11018,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx) enum io_wq_cancel cret; bool ret = false; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); list_for_each_entry(node, &ctx->tctx_list, ctx_node) { struct io_uring_task *tctx = node->task->io_uring; @@ -11004,7 +11031,7 @@ static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx) cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true); ret |= (cret != IO_WQ_CANCEL_NOTFOUND); } - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); return ret; } @@ -11091,9 +11118,9 @@ static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx) return ret; } - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); list_add(&node->ctx_node, &ctx->tctx_list); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } tctx->last = ctx; return 0; @@ -11128,9 +11155,9 @@ static __cold void io_uring_del_tctx_node(unsigned long index) WARN_ON_ONCE(current != node->task); WARN_ON_ONCE(list_empty(&node->ctx_node)); - mutex_lock(&node->ctx->uring_lock); + ctx_lock(node->ctx); list_del(&node->ctx_node); - mutex_unlock(&node->ctx->uring_lock); + ctx_unlock(node->ctx); if (tctx->last == node->ctx) tctx->last = NULL; @@ -11296,9 +11323,9 @@ static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg, if (!nr_args || nr_args > IO_RINGFD_REG_MAX) return -EINVAL; - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); ret = io_uring_add_tctx_node(ctx); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); if (ret) return ret; @@ -11583,15 +11610,15 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, if (unlikely(ret)) goto out; - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); ret = io_submit_sqes(ctx, to_submit); if (ret != to_submit) { - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); goto out; } if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll) goto iopoll_locked; - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } if (flags & IORING_ENTER_GETEVENTS) { int ret2; @@ -11602,7 +11629,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, * prevent racing with polled issue that got punted to * a workqueue. */ - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); iopoll_locked: ret2 = io_validate_ext_arg(flags, argp, argsz); if (likely(!ret2)) { @@ -11610,7 +11637,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ctx->cq_entries); ret2 = io_iopoll_check(ctx, min_complete); } - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); } else { const sigset_t __user *sig; struct __kernel_timespec __user *ts; @@ -12372,9 +12399,9 @@ static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx, * a ref to the ctx. */ refcount_inc(&sqd->refs); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); mutex_lock(&sqd->lock); - mutex_lock(&ctx->uring_lock); + ctx_unlock(ctx); if (sqd->thread) tctx = sqd->thread->io_uring; } @@ -12668,9 +12695,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, io_run_task_work(); - mutex_lock(&ctx->uring_lock); + ctx_lock(ctx); ret = __io_uring_register(ctx, opcode, arg, nr_args); - mutex_unlock(&ctx->uring_lock); + ctx_unlock(ctx); trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret); out_fput: fdput(f); -- Jens Axboe