This should incorporate everything we've covered so far; the one thing I still have to look at is making it work as a kvfree_rcu() backend. I decided not to do the "store the callback in the radix tree" optimization for generic use - it made the code fairly ugly, and just replacing the linked list with a radix tree should already be a significant improvement. Thoughts? -- >8 -- Generic data structure for explicitly tracking pending RCU items, allowing items to be dequeued (i.e. allocate from items pending freeing). - Works with conventional RCU and SRCU; pass in NULL for the srcu_struct for regular RCU - Tracks pending items in a radix tree; falls back to linked list if radix tree allocation fails todo: add support for being a kvfree_rcu() backend Signed-off-by: Kent Overstreet <kent.overstreet@xxxxxxxxx> diff --git a/fs/bcachefs/Makefile b/fs/bcachefs/Makefile index 0ab533a2b03b..c8394ec6f22f 100644 --- a/fs/bcachefs/Makefile +++ b/fs/bcachefs/Makefile @@ -68,6 +68,7 @@ bcachefs-y := \ opts.o \ printbuf.o \ quota.o \ + rcu_pending.o \ rebalance.o \ recovery.o \ recovery_passes.o \ diff --git a/fs/bcachefs/rcu_pending.c b/fs/bcachefs/rcu_pending.c new file mode 100644 index 000000000000..9b2f4da94425 --- /dev/null +++ b/fs/bcachefs/rcu_pending.c @@ -0,0 +1,278 @@ +// SPDX-License-Identifier: GPL-2.0 + +#include <linux/generic-radix-tree.h> +#include <linux/percpu.h> +#include <linux/srcu.h> +#include "rcu_pending.h" + +static inline unsigned long __get_state_synchronize_rcu(struct srcu_struct *ssp) +{ + return ssp + ? get_state_synchronize_srcu(ssp) + : get_state_synchronize_rcu(); +} + +static inline unsigned long __start_poll_synchronize_rcu(struct srcu_struct *ssp) +{ + return ssp + ? start_poll_synchronize_srcu(ssp) + : start_poll_synchronize_rcu(); +} + +static inline void __rcu_barrier(struct srcu_struct *ssp) +{ + return ssp + ? srcu_barrier(ssp) + : rcu_barrier(); +} + +struct pending_rcu_items_seq { + GENRADIX(struct rcu_head *) objs; + size_t nr; + struct rcu_head *list; + unsigned long seq; +}; + +struct pending_rcu_items_pcpu { + struct pending_rcu_items *parent; + spinlock_t lock; + bool rcu_armed; + struct pending_rcu_items_seq objs[2]; + struct rcu_head rcu; + struct work_struct work; +}; + +static bool get_finished_items(struct pending_rcu_items *pending, + struct pending_rcu_items_pcpu *p, + struct pending_rcu_items_seq *out) +{ + for (struct pending_rcu_items_seq *objs = p->objs; + objs < p->objs + ARRAY_SIZE(p->objs); objs++) + if ((objs->nr || objs->list) && + poll_state_synchronize_srcu(pending->srcu, objs->seq)) { + *out = (struct pending_rcu_items_seq) { + /* + * the genradix can only be modified with atomic instructions, + * since we allocate new nodes without + * pending_rcu_items_pcpu.lock + */ + .objs.tree.root = xchg(&objs->objs.tree.root, NULL), + .nr = objs->nr, + .list = objs->list, + }; + objs->nr = 0; + objs->list = NULL; + return true; + } + return false; +} + +static void process_finished_items(struct pending_rcu_items *pending, + struct pending_rcu_items_seq *objs) +{ + for (size_t i = 0; i < objs->nr; i++) + pending->process(pending, *genradix_ptr(&objs->objs, i)); + genradix_free(&objs->objs); + + while (objs->list) { + struct rcu_head *obj = objs->list; + objs->list = obj->next; + pending->process(pending, obj); + } +} + +static void pending_rcu_items_rcu_cb(struct rcu_head *rcu) +{ + struct pending_rcu_items_pcpu *p = + container_of(rcu, struct pending_rcu_items_pcpu, rcu); + + schedule_work(&p->work); +} + +static bool __pending_rcu_items_has_pending(struct pending_rcu_items_pcpu *p) +{ + for (struct pending_rcu_items_seq *objs = p->objs; + objs < p->objs + ARRAY_SIZE(p->objs); objs++) + if (objs->nr || objs->list) + return true; + return false; +} + +static void pending_rcu_items_work(struct work_struct *work) +{ + struct pending_rcu_items_pcpu *p = + container_of(work, struct pending_rcu_items_pcpu, work); + struct pending_rcu_items *pending = p->parent; +again: + spin_lock_irq(&p->lock); + struct pending_rcu_items_seq finished; + while (get_finished_items(pending, p, &finished)) { + spin_unlock_irq(&p->lock); + process_finished_items(pending, &finished); + goto again; + } + + BUG_ON(!p->rcu_armed); + p->rcu_armed = __pending_rcu_items_has_pending(p); + if (p->rcu_armed) + call_srcu(pending->srcu, &p->rcu, pending_rcu_items_rcu_cb); + spin_unlock_irq(&p->lock); +} + +void bch2_pending_rcu_item_enqueue(struct pending_rcu_items *pending, struct rcu_head *obj) +{ + struct pending_rcu_items_pcpu *p = raw_cpu_ptr(pending->p); + bool alloc_failed = false; + unsigned long flags; +retry: + spin_lock_irqsave(&p->lock, flags); + + struct pending_rcu_items_seq finished; + while (get_finished_items(pending, p, &finished)) { + spin_unlock_irqrestore(&p->lock, flags); + process_finished_items(pending, &finished); + goto retry; + } + + struct pending_rcu_items_seq *objs; + + unsigned long seq = __get_state_synchronize_rcu(pending->srcu); + for (objs = p->objs; objs < p->objs + ARRAY_SIZE(p->objs); objs++) + if (objs->nr && objs->seq == seq) + goto add; + + seq = __start_poll_synchronize_rcu(pending->srcu); + for (objs = p->objs; objs < p->objs + ARRAY_SIZE(p->objs); objs++) + if (!objs->nr) { + objs->seq = seq; + goto add; + } + + BUG(); + struct rcu_head **entry; +add: + entry = genradix_ptr_alloc(&objs->objs, objs->nr, GFP_ATOMIC|__GFP_NOWARN); + if (likely(entry)) { + *entry = obj; + objs->nr++; + } else if (likely(!alloc_failed)) { + spin_unlock_irqrestore(&p->lock, flags); + alloc_failed = !genradix_ptr_alloc(&objs->objs, objs->nr, GFP_KERNEL); + goto retry; + } else { + obj->next = objs->list; + objs->list = obj; + } + + if (!p->rcu_armed) { + call_srcu(pending->srcu, &p->rcu, pending_rcu_items_rcu_cb); + p->rcu_armed = true; + } + spin_unlock_irqrestore(&p->lock, flags); +} + +static struct rcu_head *pending_rcu_item_pcpu_dequeue(struct pending_rcu_items_pcpu *p) +{ + struct rcu_head *ret = NULL; + + spin_lock_irq(&p->lock); + unsigned idx = p->objs[1].seq > p->objs[0].seq; + + for (unsigned i = 0; i < 2; i++, idx ^= 1) { + struct pending_rcu_items_seq *objs = p->objs + idx; + + if (objs->nr) { + ret = *genradix_ptr(&objs->objs, --objs->nr); + break; + } + + if (objs->list) { + ret = objs->list; + objs->list = ret->next; + break; + } + } + spin_unlock_irq(&p->lock); + + return ret; +} + +struct rcu_head *bch2_pending_rcu_item_dequeue(struct pending_rcu_items *pending) +{ + return pending_rcu_item_pcpu_dequeue(raw_cpu_ptr(pending->p)); +} + +struct rcu_head *bch2_pending_rcu_item_dequeue_from_all(struct pending_rcu_items *pending) +{ + struct rcu_head *ret = NULL; + int cpu; + for_each_possible_cpu(cpu) { + ret = pending_rcu_item_pcpu_dequeue(per_cpu_ptr(pending->p, cpu)); + if (ret) + break; + } + return ret; +} + +static bool pending_rcu_items_has_pending(struct pending_rcu_items *pending) +{ + int cpu; + for_each_possible_cpu(cpu) { + struct pending_rcu_items_pcpu *p = per_cpu_ptr(pending->p, cpu); + spin_lock_irq(&p->lock); + if (__pending_rcu_items_has_pending(p)) { + spin_unlock_irq(&p->lock); + return true; + } + spin_unlock_irq(&p->lock); + } + + return false; +} + +void bch2_pending_rcu_items_exit(struct pending_rcu_items *pending) +{ + if (!pending->p) + return; + + while (pending_rcu_items_has_pending(pending)) + __rcu_barrier(pending->srcu); + + int cpu; + for_each_possible_cpu(cpu) { + struct pending_rcu_items_pcpu *p = per_cpu_ptr(pending->p, cpu); + + flush_work(&p->work); + + WARN_ON(p->objs[0].nr); + WARN_ON(p->objs[1].nr); + WARN_ON(p->objs[0].list); + WARN_ON(p->objs[1].list); + + genradix_free(&p->objs[0].objs); + genradix_free(&p->objs[1].objs); + } + free_percpu(pending->p); +} + +int bch2_pending_rcu_items_init(struct pending_rcu_items *pending, + struct srcu_struct *srcu, + pending_rcu_item_process_fn process) +{ + pending->p = alloc_percpu(struct pending_rcu_items_pcpu); + if (!pending->p) + return -ENOMEM; + + int cpu; + for_each_possible_cpu(cpu) { + struct pending_rcu_items_pcpu *p = per_cpu_ptr(pending->p, cpu); + p->parent = pending; + spin_lock_init(&p->lock); + INIT_WORK(&p->work, pending_rcu_items_work); + } + + pending->srcu = srcu; + pending->process = process; + + return 0; +} diff --git a/fs/bcachefs/rcu_pending.h b/fs/bcachefs/rcu_pending.h new file mode 100644 index 000000000000..e45ede074443 --- /dev/null +++ b/fs/bcachefs/rcu_pending.h @@ -0,0 +1,25 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _BCACHEFS_RCU_PENDING_H +#define _BCACHEFS_RCU_PENDING_H + +struct pending_rcu_items; +typedef void (*pending_rcu_item_process_fn)(struct pending_rcu_items *, struct rcu_head *); + +struct pending_rcu_items_pcpu; + +struct pending_rcu_items { + struct pending_rcu_items_pcpu __percpu *p; + struct srcu_struct *srcu; + pending_rcu_item_process_fn process; +}; + +void bch2_pending_rcu_item_enqueue(struct pending_rcu_items *pending, struct rcu_head *obj); +struct rcu_head *bch2_pending_rcu_item_dequeue(struct pending_rcu_items *pending); +struct rcu_head *bch2_pending_rcu_item_dequeue_from_all(struct pending_rcu_items *pending); + +void bch2_pending_rcu_items_exit(struct pending_rcu_items *pending); +int bch2_pending_rcu_items_init(struct pending_rcu_items *pending, + struct srcu_struct *srcu, + pending_rcu_item_process_fn process); + +#endif /* _BCACHEFS_RCU_PENDING_H */