On Mon, Jul 01, 2024 at 03:39:27PM -0700, Andrii Nakryiko wrote: > One, attempted initially, way to solve this is through using > atomic_inc_not_zero() approach, turning get_uprobe() into > try_get_uprobe(), This is the canonical thing to do. Everybody does this. > which can fail to bump refcount if uprobe is already > destined to be destroyed. This, unfortunately, turns out to be a rather > expensive due to underlying cmpxchg() operation in > atomic_inc_not_zero() and scales rather poorly with increased amount of > parallel threads triggering uprobes. Different archs different trade-offs. You'll not see this on LL/SC archs for example. > Furthermore, CPU profiling showed the following overall CPU usage: > - try_get_uprobe (19.3%) + put_uprobe (8.2%) = 27.5% CPU usage for > atomic_inc_not_zero approach; > - __get_uprobe (12.3%) + put_uprobe (9.9%) = 22.2% CPU usage for > atomic_add_and_return approach implemented by this patch. I think those numbers suggest trying to not have a refcount in the first place. Both are pretty terrible, yes one is less terrible than the other, but still terrible. Specifically, I'm thinking it is the refcounting in handlw_swbp() that is actually the problem, all the other stuff is noise. So if you have SRCU protected consumers, what is the reason for still having a refcount in handlw_swbp() ? Simply have the whole of it inside a single SRCU critical section, then all consumers you find get a hit. Hmm, return probes are a pain, they require the uprobe to stay extant between handle_swbp() and handle_trampoline(). I'm thinking we can do that with SRCU as well. When I cobble all that together (it really shouldn't be one patch, but you get the idea I hope) it looks a little something like the below. I *think* it should work, but perhaps I've missed something? TL;DR replace treelock with seqcount+SRCU replace register_rwsem with SRCU replace handle_swbp() refcount with SRCU replace return_instance refcount with a second SRCU Paul, I had to do something vile with SRCU. The basic problem is that we want to keep a SRCU critical section across fork(), which leads to both parent and child doing srcu_read_unlock(&srcu, idx). As such, I need an extra increment on the @idx ssp counter to even things out, see __srcu_read_clone_lock(). --- include/linux/rbtree.h | 45 +++++++++++++ include/linux/srcu.h | 2 + include/linux/uprobes.h | 2 + kernel/events/uprobes.c | 166 +++++++++++++++++++++++++++++++----------------- kernel/rcu/srcutree.c | 5 ++ 5 files changed, 161 insertions(+), 59 deletions(-) diff --git a/include/linux/rbtree.h b/include/linux/rbtree.h index f7edca369eda..9847fa58a287 100644 --- a/include/linux/rbtree.h +++ b/include/linux/rbtree.h @@ -244,6 +244,31 @@ rb_find_add(struct rb_node *node, struct rb_root *tree, return NULL; } +static __always_inline struct rb_node * +rb_find_add_rcu(struct rb_node *node, struct rb_root *tree, + int (*cmp)(struct rb_node *, const struct rb_node *)) +{ + struct rb_node **link = &tree->rb_node; + struct rb_node *parent = NULL; + int c; + + while (*link) { + parent = *link; + c = cmp(node, parent); + + if (c < 0) + link = &parent->rb_left; + else if (c > 0) + link = &parent->rb_right; + else + return parent; + } + + rb_link_node_rcu(node, parent, link); + rb_insert_color(node, tree); + return NULL; +} + /** * rb_find() - find @key in tree @tree * @key: key to match @@ -272,6 +297,26 @@ rb_find(const void *key, const struct rb_root *tree, return NULL; } +static __always_inline struct rb_node * +rb_find_rcu(const void *key, const struct rb_root *tree, + int (*cmp)(const void *key, const struct rb_node *)) +{ + struct rb_node *node = tree->rb_node; + + while (node) { + int c = cmp(key, node); + + if (c < 0) + node = rcu_dereference_raw(node->rb_left); + else if (c > 0) + node = rcu_dereference_raw(node->rb_right); + else + return node; + } + + return NULL; +} + /** * rb_find_first() - find the first @key in @tree * @key: key to match diff --git a/include/linux/srcu.h b/include/linux/srcu.h index 236610e4a8fa..9b14acecbb9d 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -55,7 +55,9 @@ void call_srcu(struct srcu_struct *ssp, struct rcu_head *head, void (*func)(struct rcu_head *head)); void cleanup_srcu_struct(struct srcu_struct *ssp); int __srcu_read_lock(struct srcu_struct *ssp) __acquires(ssp); +void __srcu_read_clone_lock(struct srcu_struct *ssp, int idx); void __srcu_read_unlock(struct srcu_struct *ssp, int idx) __releases(ssp); + void synchronize_srcu(struct srcu_struct *ssp); unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp); unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp); diff --git a/include/linux/uprobes.h b/include/linux/uprobes.h index f46e0ca0169c..354cab634341 100644 --- a/include/linux/uprobes.h +++ b/include/linux/uprobes.h @@ -78,6 +78,7 @@ struct uprobe_task { struct return_instance *return_instances; unsigned int depth; + unsigned int active_srcu_idx; }; struct return_instance { @@ -86,6 +87,7 @@ struct return_instance { unsigned long stack; /* stack pointer */ unsigned long orig_ret_vaddr; /* original return address */ bool chained; /* true, if instance is nested */ + int srcu_idx; struct return_instance *next; /* keep as stack */ }; diff --git a/kernel/events/uprobes.c b/kernel/events/uprobes.c index 2c83ba776fc7..0b7574a54093 100644 --- a/kernel/events/uprobes.c +++ b/kernel/events/uprobes.c @@ -26,6 +26,7 @@ #include <linux/task_work.h> #include <linux/shmem_fs.h> #include <linux/khugepaged.h> +#include <linux/srcu.h> #include <linux/uprobes.h> @@ -40,6 +41,17 @@ static struct rb_root uprobes_tree = RB_ROOT; #define no_uprobe_events() RB_EMPTY_ROOT(&uprobes_tree) static DEFINE_RWLOCK(uprobes_treelock); /* serialize rbtree access */ +static seqcount_rwlock_t uprobes_seqcount = SEQCNT_RWLOCK_ZERO(uprobes_seqcount, &uprobes_treelock); + +/* + * Used for both the uprobes_tree and the uprobe->consumer list. + */ +DEFINE_STATIC_SRCU(uprobe_srcu); +/* + * Used for return_instance and single-step uprobe lifetime. Separate from + * uprobe_srcu in order to minimize the synchronize_srcu() cost at unregister. + */ +DEFINE_STATIC_SRCU(uretprobe_srcu); #define UPROBES_HASH_SZ 13 /* serialize uprobe->pending_list */ @@ -54,7 +66,8 @@ DEFINE_STATIC_PERCPU_RWSEM(dup_mmap_sem); struct uprobe { struct rb_node rb_node; /* node in the rb tree */ refcount_t ref; - struct rw_semaphore register_rwsem; + struct rcu_head rcu; + struct mutex register_mutex; struct rw_semaphore consumer_rwsem; struct list_head pending_list; struct uprobe_consumer *consumers; @@ -67,7 +80,7 @@ struct uprobe { * The generic code assumes that it has two members of unknown type * owned by the arch-specific code: * - * insn - copy_insn() saves the original instruction here for + * insn - copy_insn() saves the original instruction here for * arch_uprobe_analyze_insn(). * * ixol - potentially modified instruction to execute out of @@ -205,7 +218,7 @@ static int __replace_page(struct vm_area_struct *vma, unsigned long addr, folio_put(old_folio); err = 0; - unlock: +unlock: mmu_notifier_invalidate_range_end(&range); folio_unlock(old_folio); return err; @@ -593,6 +606,22 @@ static struct uprobe *get_uprobe(struct uprobe *uprobe) return uprobe; } +static void uprobe_free_stage2(struct rcu_head *rcu) +{ + struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu); + kfree(uprobe); +} + +static void uprobe_free_stage1(struct rcu_head *rcu) +{ + struct uprobe *uprobe = container_of(rcu, struct uprobe, rcu); + /* + * At this point all the consumers are complete and gone, but retprobe + * and single-step might still reference the uprobe itself. + */ + call_srcu(&uretprobe_srcu, &uprobe->rcu, uprobe_free_stage2); +} + static void put_uprobe(struct uprobe *uprobe) { if (refcount_dec_and_test(&uprobe->ref)) { @@ -604,7 +633,8 @@ static void put_uprobe(struct uprobe *uprobe) mutex_lock(&delayed_uprobe_lock); delayed_uprobe_remove(uprobe, NULL); mutex_unlock(&delayed_uprobe_lock); - kfree(uprobe); + + call_srcu(&uprobe_srcu, &uprobe->rcu, uprobe_free_stage1); } } @@ -653,10 +683,10 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset) .inode = inode, .offset = offset, }; - struct rb_node *node = rb_find(&key, &uprobes_tree, __uprobe_cmp_key); + struct rb_node *node = rb_find_rcu(&key, &uprobes_tree, __uprobe_cmp_key); if (node) - return get_uprobe(__node_2_uprobe(node)); + return __node_2_uprobe(node); return NULL; } @@ -667,20 +697,32 @@ static struct uprobe *__find_uprobe(struct inode *inode, loff_t offset) */ static struct uprobe *find_uprobe(struct inode *inode, loff_t offset) { - struct uprobe *uprobe; + unsigned seq; - read_lock(&uprobes_treelock); - uprobe = __find_uprobe(inode, offset); - read_unlock(&uprobes_treelock); + lockdep_assert(srcu_read_lock_held(&uprobe_srcu)); - return uprobe; + do { + seq = read_seqcount_begin(&uprobes_seqcount); + struct uprobe *uprobe = __find_uprobe(inode, offset); + if (uprobe) { + /* + * Lockless RB-tree lookups are prone to false-negatives. + * If they find something, it's good. If they do not find, + * it needs to be validated. + */ + return uprobe; + } + } while (read_seqcount_retry(&uprobes_seqcount, seq)); + + /* Really didn't find anything. */ + return NULL; } static struct uprobe *__insert_uprobe(struct uprobe *uprobe) { struct rb_node *node; - node = rb_find_add(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp); + node = rb_find_add_rcu(&uprobe->rb_node, &uprobes_tree, __uprobe_cmp); if (node) return get_uprobe(__node_2_uprobe(node)); @@ -702,7 +744,9 @@ static struct uprobe *insert_uprobe(struct uprobe *uprobe) struct uprobe *u; write_lock(&uprobes_treelock); + write_seqcount_begin(&uprobes_seqcount); u = __insert_uprobe(uprobe); + write_seqcount_end(&uprobes_seqcount); write_unlock(&uprobes_treelock); return u; @@ -730,7 +774,7 @@ static struct uprobe *alloc_uprobe(struct inode *inode, loff_t offset, uprobe->inode = inode; uprobe->offset = offset; uprobe->ref_ctr_offset = ref_ctr_offset; - init_rwsem(&uprobe->register_rwsem); + mutex_init(&uprobe->register_mutex); init_rwsem(&uprobe->consumer_rwsem); /* add to uprobes_tree, sorted on inode:offset */ @@ -754,7 +798,7 @@ static void consumer_add(struct uprobe *uprobe, struct uprobe_consumer *uc) { down_write(&uprobe->consumer_rwsem); uc->next = uprobe->consumers; - uprobe->consumers = uc; + rcu_assign_pointer(uprobe->consumers, uc); up_write(&uprobe->consumer_rwsem); } @@ -771,7 +815,7 @@ static bool consumer_del(struct uprobe *uprobe, struct uprobe_consumer *uc) down_write(&uprobe->consumer_rwsem); for (con = &uprobe->consumers; *con; con = &(*con)->next) { if (*con == uc) { - *con = uc->next; + rcu_assign_pointer(*con, uc->next); ret = true; break; } @@ -857,7 +901,7 @@ static int prepare_uprobe(struct uprobe *uprobe, struct file *file, smp_wmb(); /* pairs with the smp_rmb() in handle_swbp() */ set_bit(UPROBE_COPY_INSN, &uprobe->flags); - out: +out: up_write(&uprobe->consumer_rwsem); return ret; @@ -936,7 +980,9 @@ static void delete_uprobe(struct uprobe *uprobe) return; write_lock(&uprobes_treelock); + write_seqcount_begin(&uprobes_seqcount); rb_erase(&uprobe->rb_node, &uprobes_tree); + write_seqcount_end(&uprobes_seqcount); write_unlock(&uprobes_treelock); RB_CLEAR_NODE(&uprobe->rb_node); /* for uprobe_is_active() */ put_uprobe(uprobe); @@ -965,7 +1011,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register) struct map_info *info; int more = 0; - again: +again: i_mmap_lock_read(mapping); vma_interval_tree_foreach(vma, &mapping->i_mmap, pgoff, pgoff) { if (!valid_vma(vma, is_register)) @@ -1019,7 +1065,7 @@ build_map_info(struct address_space *mapping, loff_t offset, bool is_register) } while (--more); goto again; - out: +out: while (prev) prev = free_map_info(prev); return curr; @@ -1068,13 +1114,13 @@ register_for_each_vma(struct uprobe *uprobe, struct uprobe_consumer *new) err |= remove_breakpoint(uprobe, mm, info->vaddr); } - unlock: +unlock: mmap_write_unlock(mm); - free: +free: mmput(mm); info = free_map_info(info); } - out: +out: percpu_up_write(&dup_mmap_sem); return err; } @@ -1101,16 +1147,17 @@ __uprobe_unregister(struct uprobe *uprobe, struct uprobe_consumer *uc) */ void uprobe_unregister(struct inode *inode, loff_t offset, struct uprobe_consumer *uc) { - struct uprobe *uprobe; + scoped_guard (srcu, &uprobe_srcu) { + struct uprobe *uprobe = find_uprobe(inode, offset); + if (WARN_ON(!uprobe)) + return; - uprobe = find_uprobe(inode, offset); - if (WARN_ON(!uprobe)) - return; + mutex_lock(&uprobe->register_mutex); + __uprobe_unregister(uprobe, uc); + mutex_unlock(&uprobe->register_mutex); + } - down_write(&uprobe->register_rwsem); - __uprobe_unregister(uprobe, uc); - up_write(&uprobe->register_rwsem); - put_uprobe(uprobe); + synchronize_srcu(&uprobe_srcu); // XXX amortize / batch } EXPORT_SYMBOL_GPL(uprobe_unregister); @@ -1159,7 +1206,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset, if (!IS_ALIGNED(ref_ctr_offset, sizeof(short))) return -EINVAL; - retry: +retry: uprobe = alloc_uprobe(inode, offset, ref_ctr_offset); if (!uprobe) return -ENOMEM; @@ -1170,7 +1217,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset, * We can race with uprobe_unregister()->delete_uprobe(). * Check uprobe_is_active() and retry if it is false. */ - down_write(&uprobe->register_rwsem); + mutex_lock(&uprobe->register_mutex); ret = -EAGAIN; if (likely(uprobe_is_active(uprobe))) { consumer_add(uprobe, uc); @@ -1178,7 +1225,7 @@ static int __uprobe_register(struct inode *inode, loff_t offset, if (ret) __uprobe_unregister(uprobe, uc); } - up_write(&uprobe->register_rwsem); + mutex_unlock(&uprobe->register_mutex); put_uprobe(uprobe); if (unlikely(ret == -EAGAIN)) @@ -1214,17 +1261,18 @@ int uprobe_apply(struct inode *inode, loff_t offset, struct uprobe_consumer *con; int ret = -ENOENT; + guard(srcu)(&uprobe_srcu); + uprobe = find_uprobe(inode, offset); if (WARN_ON(!uprobe)) return ret; - down_write(&uprobe->register_rwsem); + mutex_lock(&uprobe->register_mutex); for (con = uprobe->consumers; con && con != uc ; con = con->next) ; if (con) ret = register_for_each_vma(uprobe, add ? uc : NULL); - up_write(&uprobe->register_rwsem); - put_uprobe(uprobe); + mutex_unlock(&uprobe->register_mutex); return ret; } @@ -1468,7 +1516,7 @@ static int xol_add_vma(struct mm_struct *mm, struct xol_area *area) ret = 0; /* pairs with get_xol_area() */ smp_store_release(&mm->uprobes_state.xol_area, area); /* ^^^ */ - fail: +fail: mmap_write_unlock(mm); return ret; @@ -1512,7 +1560,7 @@ static struct xol_area *__create_xol_area(unsigned long vaddr) kfree(area->bitmap); free_area: kfree(area); - out: +out: return NULL; } @@ -1700,7 +1748,7 @@ unsigned long uprobe_get_trap_addr(struct pt_regs *regs) static struct return_instance *free_ret_instance(struct return_instance *ri) { struct return_instance *next = ri->next; - put_uprobe(ri->uprobe); + srcu_read_unlock(&uretprobe_srcu, ri->srcu_idx); kfree(ri); return next; } @@ -1718,7 +1766,7 @@ void uprobe_free_utask(struct task_struct *t) return; if (utask->active_uprobe) - put_uprobe(utask->active_uprobe); + srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx); ri = utask->return_instances; while (ri) @@ -1761,7 +1809,7 @@ static int dup_utask(struct task_struct *t, struct uprobe_task *o_utask) return -ENOMEM; *n = *o; - get_uprobe(n->uprobe); + __srcu_read_clone_lock(&uretprobe_srcu, n->srcu_idx); n->next = NULL; *p = n; @@ -1904,7 +1952,8 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs) orig_ret_vaddr = utask->return_instances->orig_ret_vaddr; } - ri->uprobe = get_uprobe(uprobe); + ri->srcu_idx = srcu_read_lock(&uretprobe_srcu); + ri->uprobe = uprobe; ri->func = instruction_pointer(regs); ri->stack = user_stack_pointer(regs); ri->orig_ret_vaddr = orig_ret_vaddr; @@ -1915,7 +1964,7 @@ static void prepare_uretprobe(struct uprobe *uprobe, struct pt_regs *regs) utask->return_instances = ri; return; - fail: +fail: kfree(ri); } @@ -1944,6 +1993,7 @@ pre_ssout(struct uprobe *uprobe, struct pt_regs *regs, unsigned long bp_vaddr) return err; } + utask->active_srcu_idx = srcu_read_lock(&uretprobe_srcu); utask->active_uprobe = uprobe; utask->state = UTASK_SSTEP; return 0; @@ -2031,7 +2081,7 @@ static int is_trap_at_addr(struct mm_struct *mm, unsigned long vaddr) copy_from_page(page, vaddr, &opcode, UPROBE_SWBP_INSN_SIZE); put_page(page); - out: +out: /* This needs to return true for any variant of the trap insn */ return is_trap_insn(&opcode); } @@ -2071,8 +2121,9 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs) int remove = UPROBE_HANDLER_REMOVE; bool need_prep = false; /* prepare return uprobe, when needed */ - down_read(&uprobe->register_rwsem); - for (uc = uprobe->consumers; uc; uc = uc->next) { + lockdep_assert(srcu_read_lock_held(&uprobe_srcu)); + + for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference(uc->next)) { int rc = 0; if (uc->handler) { @@ -2094,7 +2145,6 @@ static void handler_chain(struct uprobe *uprobe, struct pt_regs *regs) WARN_ON(!uprobe_is_active(uprobe)); unapply_uprobe(uprobe, current->mm); } - up_read(&uprobe->register_rwsem); } static void @@ -2103,12 +2153,12 @@ handle_uretprobe_chain(struct return_instance *ri, struct pt_regs *regs) struct uprobe *uprobe = ri->uprobe; struct uprobe_consumer *uc; - down_read(&uprobe->register_rwsem); - for (uc = uprobe->consumers; uc; uc = uc->next) { + guard(srcu)(&uprobe_srcu); + + for (uc = rcu_dereference_raw(uprobe->consumers); uc; uc = rcu_dereference_raw(uc->next)) { if (uc->ret_handler) uc->ret_handler(uc, ri->func, regs); } - up_read(&uprobe->register_rwsem); } static struct return_instance *find_next_ret_chain(struct return_instance *ri) @@ -2159,7 +2209,7 @@ static void handle_trampoline(struct pt_regs *regs) utask->return_instances = ri; return; - sigill: +sigill: uprobe_warn(current, "handle uretprobe, sending SIGILL."); force_sig(SIGILL); @@ -2190,6 +2240,8 @@ static void handle_swbp(struct pt_regs *regs) if (bp_vaddr == get_trampoline_vaddr()) return handle_trampoline(regs); + guard(srcu)(&uprobe_srcu); + uprobe = find_active_uprobe(bp_vaddr, &is_swbp); if (!uprobe) { if (is_swbp > 0) { @@ -2218,7 +2270,7 @@ static void handle_swbp(struct pt_regs *regs) * new and not-yet-analyzed uprobe at the same address, restart. */ if (unlikely(!test_bit(UPROBE_COPY_INSN, &uprobe->flags))) - goto out; + return; /* * Pairs with the smp_wmb() in prepare_uprobe(). @@ -2231,22 +2283,18 @@ static void handle_swbp(struct pt_regs *regs) /* Tracing handlers use ->utask to communicate with fetch methods */ if (!get_utask()) - goto out; + return; if (arch_uprobe_ignore(&uprobe->arch, regs)) - goto out; + return; handler_chain(uprobe, regs); if (arch_uprobe_skip_sstep(&uprobe->arch, regs)) - goto out; + return; if (!pre_ssout(uprobe, regs, bp_vaddr)) return; - - /* arch_uprobe_skip_sstep() succeeded, or restart if can't singlestep */ -out: - put_uprobe(uprobe); } /* @@ -2266,7 +2314,7 @@ static void handle_singlestep(struct uprobe_task *utask, struct pt_regs *regs) else WARN_ON_ONCE(1); - put_uprobe(uprobe); + srcu_read_unlock(&uretprobe_srcu, utask->active_srcu_idx); utask->active_uprobe = NULL; utask->state = UTASK_RUNNING; xol_free_insn_slot(current); diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c index bc4b58b0204e..d8cda9003da4 100644 --- a/kernel/rcu/srcutree.c +++ b/kernel/rcu/srcutree.c @@ -720,6 +720,11 @@ int __srcu_read_lock(struct srcu_struct *ssp) } EXPORT_SYMBOL_GPL(__srcu_read_lock); +int __srcu_read_clone_lock(struct srcu_struct *ssp, int idx) +{ + this_cpu_inc(ssp->sda->srcu_lock_count[idx].counter); +} + /* * Removes the count for the old reader from the appropriate per-CPU * element of the srcu_struct. Note that this may well be a different