If we're going to ever support multiple types of resources we need shared rsrc nodes to not bloat requests, that is implemented in this patch. It also gives a nicer API and saves one pointer dereference in io_req_set_rsrc_node(). We may say that all requests bound to a resource belong to one and only one rsrc node, and considering that nodes are removed and recycled strictly in-order, this separates requests into generations, where generation are changed on each node switch (i.e. io_rsrc_node_switch()). The API is simple, io_rsrc_node_switch() switches to a new generation if needed, and also optionally kills a passed in io_rsrc_data. Each call to io_rsrc_node_switch() have to be preceded with io_rsrc_node_switch_start(). The start function is idempotent and should not necessarily be followed by switch. One difference is that once a node was set it will always retain a valid rsrc node, even on unregister. It may be a nuisance at the moment, but makes much sense for multiple types of resources, though requires some additional handling on free. Another thing changed is that nodes are bound to/associated with a io_rsrc_data later just before killing (i.e. switching). Signed-off-by: Pavel Begunkov <asml.silence@xxxxxxxxx> --- fs/io_uring.c | 95 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/fs/io_uring.c b/fs/io_uring.c index 175dd2c00991..b20caae75be6 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -222,6 +222,9 @@ struct io_rsrc_node { struct io_rsrc_data *rsrc_data; struct llist_node llist; bool done; + + /* use only with dying ctx, lists and rsrc_data should not be used */ + struct completion *completion; }; typedef void (rsrc_put_fn)(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc); @@ -231,7 +234,6 @@ struct io_rsrc_data { struct io_ring_ctx *ctx; rsrc_put_fn *do_put; - struct io_rsrc_node *node; struct percpu_ref refs; struct completion done; bool quiesce; @@ -444,6 +446,7 @@ struct io_ring_ctx { struct llist_head rsrc_put_llist; struct list_head rsrc_ref_list; spinlock_t rsrc_ref_lock; + struct io_rsrc_node *rsrc_node; struct io_rsrc_node *rsrc_backup_node; struct io_restriction restrictions; @@ -1064,7 +1067,7 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req) struct io_ring_ctx *ctx = req->ctx; if (!req->fixed_rsrc_refs) { - req->fixed_rsrc_refs = &ctx->file_data->node->refs; + req->fixed_rsrc_refs = &ctx->rsrc_node->refs; percpu_ref_get(req->fixed_rsrc_refs); } } @@ -6958,36 +6961,32 @@ static inline void io_rsrc_ref_unlock(struct io_ring_ctx *ctx) spin_unlock_bh(&ctx->rsrc_ref_lock); } -static void io_rsrc_node_set(struct io_ring_ctx *ctx, - struct io_rsrc_data *rsrc_data) +static void io_rsrc_node_switch(struct io_ring_ctx *ctx, + struct io_rsrc_data *data_to_kill) { - struct io_rsrc_node *rsrc_node = ctx->rsrc_backup_node; - - WARN_ON_ONCE(!rsrc_node); + WARN_ON_ONCE(!ctx->rsrc_backup_node); + WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node); - ctx->rsrc_backup_node = NULL; - rsrc_node->rsrc_data = rsrc_data; + if (data_to_kill) { + struct io_rsrc_node *rsrc_node = ctx->rsrc_node; - io_rsrc_ref_lock(ctx); - rsrc_data->node = rsrc_node; - list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list); - io_rsrc_ref_unlock(ctx); - percpu_ref_get(&rsrc_data->refs); -} + rsrc_node->rsrc_data = data_to_kill; + io_rsrc_ref_lock(ctx); + list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list); + io_rsrc_ref_unlock(ctx); -static void io_rsrc_node_kill(struct io_ring_ctx *ctx, struct io_rsrc_data *data) -{ - struct io_rsrc_node *ref_node = NULL; + percpu_ref_get(&data_to_kill->refs); + percpu_ref_kill(&rsrc_node->refs); + ctx->rsrc_node = NULL; + } - io_rsrc_ref_lock(ctx); - ref_node = data->node; - data->node = NULL; - io_rsrc_ref_unlock(ctx); - if (ref_node) - percpu_ref_kill(&ref_node->refs); + if (!ctx->rsrc_node) { + ctx->rsrc_node = ctx->rsrc_backup_node; + ctx->rsrc_backup_node = NULL; + } } -static int io_rsrc_node_prealloc(struct io_ring_ctx *ctx) +static int io_rsrc_node_switch_start(struct io_ring_ctx *ctx) { if (ctx->rsrc_backup_node) return 0; @@ -7004,10 +7003,11 @@ static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ct data->quiesce = true; do { - ret = io_rsrc_node_prealloc(ctx); + ret = io_rsrc_node_switch_start(ctx); if (ret) break; - io_rsrc_node_kill(ctx, data); + io_rsrc_node_switch(ctx, data); + percpu_ref_kill(&data->refs); flush_delayed_work(&ctx->rsrc_put_work); @@ -7016,7 +7016,6 @@ static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ct break; percpu_ref_resurrect(&data->refs); - io_rsrc_node_set(ctx, data); reinit_completion(&data->done); mutex_unlock(&ctx->uring_lock); @@ -7434,10 +7433,18 @@ static void io_rsrc_node_ref_zero(struct percpu_ref *ref) { struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs); struct io_rsrc_data *data = node->rsrc_data; - struct io_ring_ctx *ctx = data->ctx; + struct io_ring_ctx *ctx; bool first_add = false; int delay; + if (node->completion) { + complete(node->completion); + percpu_ref_exit(&node->refs); + kfree(node); + return; + } + + ctx = data->ctx; io_rsrc_ref_lock(ctx); node->done = true; @@ -7497,7 +7504,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, return -EINVAL; if (nr_args > IORING_MAX_FIXED_FILES) return -EMFILE; - ret = io_rsrc_node_prealloc(ctx); + ret = io_rsrc_node_switch_start(ctx); if (ret) return ret; @@ -7559,7 +7566,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, return ret; } - io_rsrc_node_set(ctx, file_data); + io_rsrc_node_switch(ctx, NULL); return ret; out_fput: for (i = 0; i < ctx->nr_user_files; i++) { @@ -7648,7 +7655,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, return -EOVERFLOW; if (done > ctx->nr_user_files) return -EINVAL; - err = io_rsrc_node_prealloc(ctx); + err = io_rsrc_node_switch_start(ctx); if (err) return err; @@ -7667,7 +7674,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, if (*file_slot) { file = (struct file *) ((unsigned long) *file_slot & FFS_MASK); - err = io_queue_rsrc_removal(data, data->node, file); + err = io_queue_rsrc_removal(data, ctx->rsrc_node, file); if (err) break; *file_slot = NULL; @@ -7702,10 +7709,8 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, } } - if (needs_switch) { - percpu_ref_kill(&data->node->refs); - io_rsrc_node_set(ctx, data); - } + if (needs_switch) + io_rsrc_node_switch(ctx, data); return done ? done : err; } @@ -8376,6 +8381,22 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) io_eventfd_unregister(ctx); io_destroy_buffers(ctx); + if (ctx->rsrc_node) { + struct io_rsrc_node *node = ctx->rsrc_node; + struct completion compl; + + ctx->rsrc_node = NULL; + init_completion(&compl); + node->completion = &compl; + node->rsrc_data = NULL; + percpu_ref_kill(&node->refs); + wait_for_completion(&compl); + } + flush_delayed_work(&ctx->rsrc_put_work); + + WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)); + WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist)); + if (ctx->rsrc_backup_node) io_rsrc_node_destroy(ctx->rsrc_backup_node); -- 2.24.0