On Fri, Feb 21, 2020 at 07:06:18AM -0500, Joel Fernandes wrote: > On Tue, Feb 18, 2020 at 06:08:57PM +0100, Uladzislau Rezki wrote: > > On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote: > > > On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote: > > > > Hello, Joel, Paul, Ted. > > > > > > > > > > > > > > Good point! > > > > > > > > > > Now that kfree_rcu() is on its way to being less of a hack deeply > > > > > entangled into the bowels of RCU, this might be fairly easy to implement. > > > > > It might well be simply a matter of a function pointer and a kvfree_rcu() > > > > > API. Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts. > > > > > > > > > I think it makes sense. For example i see there is a similar demand in > > > > the mm/list_lru.c too. As for implementation, it will not be hard, i think. > > > > > > > > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu() > > > > logic(probably will need to rename that function), i.e. to free vmalloc() allocations > > > > only in "emergency path" by just calling kvfree(). So that function in its turn will > > > > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path. > > > > > > The other difference between ext4_kvfree_array_rcu() and kfree_rcu() > > > is that kfree_rcu() is a magic macro which frees a structure, which > > > has to contain a struct rcu_head. In this case, I'm freeing a pointer > > > to set of structures, or in another case, a set of buffer_heads, which > > > do *not* have an rcu_head structure. > > > > > We can implement kvfree_rcu() that will deal with pointer only, it is not > > an issue. I mean without embedding rcu_head to the structure or whatever > > else. > > > > I tried to implement it with less number of changes to make it more clear > > and readable. Please have a look: > > > > <snip> > > diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h > > Overall this implementation is nice. You are basically avoiding allocating > rcu_head like Ted did by using the array-of-pointers technique we used for > the previous kfree_rcu() work. > > One thing stands out, the path where we could not allocate a page for the new > block node: > > > @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) > > if (krcp->initialized) > > spin_unlock(&krcp->lock); > > local_irq_restore(flags); > > + > > + if (!skip_call_rcu) { > > + synchronize_rcu(); > > + kvfree(ptr_to_free); > > We can't block, it has to be async otherwise everything else blocks, and I > think this can also be used from interrupt handlers which would at least be > an SWA violation. So perhaps it needs to allocate an rcu_head wrapper object > itself for the 'emergeny case' and use the regular techniques. > > Another thing that stands out is the code duplication, if we can make this > reuse as much as of the previous code as possible, that'd be great. I'd like > to avoid bvcached and bvhead if possible. Maybe we can store information > about the fact that this is a 'special object' in some of the lower-order > bits of the pointer. Then we can detect that it is 'special' and free it > using kvfree() during the reclaim I was thinking something like the following, only build-tested -- just to show the idea. Perhaps the synchronize_rcu() should be done from a workqueue handler to prevent IRQ crapping out? Basically what I did different is: 1. Use the existing kfree_rcu_bulk_data::records array to store the to-be-freed array. 2. In case of emergency, allocate a new wrapper and tag the pointer. Read the tag later to figure its an array wrapper and do additional kvfree. debug_objects bits wouldn't work obviously for the !emergency kvfree case, not sure what we can do there. ---8<----------------------- diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 2678a37c31696..19fd7c74ad532 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void) #define __kfree_rcu(head, offset) \ do { \ BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \ - kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \ + kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \ } while (0) /** @@ -842,6 +842,14 @@ do { \ __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \ } while (0) +#define kvfree_rcu(ptr) \ +do { \ + typeof (ptr) ___p = (ptr); \ + \ + if (___p) \ + kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \ +} while (0) + /* * Place this after a lock-acquisition primitive to guarantee that * an UNLOCK+LOCK pair acts as a full barrier. This guarantee applies diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h index 045c28b71f4f3..a12ecc1645fa9 100644 --- a/include/linux/rcutiny.h +++ b/include/linux/rcutiny.h @@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void) synchronize_rcu(); } -static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) +static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr) { call_rcu(head, func); } diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h index 45f3f66bb04df..1e445b566c019 100644 --- a/include/linux/rcutree.h +++ b/include/linux/rcutree.h @@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu) } void synchronize_rcu_expedited(void); -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func); +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr); void rcu_barrier(void); bool rcu_eqs_special_set(int cpu); diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index ec81139cc4c6a..7b6ab4160f080 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -2814,6 +2814,15 @@ struct kfree_rcu_cpu { bool initialized; }; +/* + * Used in a situation where array of pointers could + * not be put onto a kfree_bulk_data::records array. + */ +struct kfree_rcu_wrap_kvarray { + struct rcu_head head; + void *ptr; +}; + static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc); static __always_inline void @@ -2873,12 +2882,25 @@ static void kfree_rcu_work(struct work_struct *work) */ for (; head; head = next) { unsigned long offset = (unsigned long)head->func; + bool is_array_ptr = false; + + if (((unsigned long)head - offset) & BIT(0)) { + is_array_ptr = true; + offset = offset - 1; + } next = head->next; - debug_rcu_head_unqueue(head); + if (!is_array_ptr) + debug_rcu_head_unqueue(head); + rcu_lock_acquire(&rcu_callback_map); trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset); + if (is_array_ptr) { + struct kfree_rcu_wrap_kvarray *kv = (void *)head - offset; + kvfree((void *)kv->ptr); + } + if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset))) kfree((void *)head - offset); @@ -2975,7 +2997,7 @@ static void kfree_rcu_monitor(struct work_struct *work) static inline bool kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, - struct rcu_head *head, rcu_callback_t func) + struct rcu_head *head, rcu_callback_t func, void *ptr) { struct kfree_rcu_bulk_data *bnode; @@ -3009,14 +3031,20 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, } #ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD - head->func = func; - head->next = krcp->bhead->head_free_debug; - krcp->bhead->head_free_debug = head; + /* debug_objects doesn't work for kvfree */ + if (!ptr) { + head->func = func; + head->next = krcp->bhead->head_free_debug; + krcp->bhead->head_free_debug = head; + } #endif /* Finally insert. */ - krcp->bhead->records[krcp->bhead->nr_records++] = - (void *) head - (unsigned long) func; + if (ptr) + krcp->bhead->records[krcp->bhead->nr_records++] = ptr; + else + krcp->bhead->records[krcp->bhead->nr_records++] = + (void *) head - (unsigned long) func; return true; } @@ -3033,10 +3061,11 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, * be free'd in workqueue context. This allows us to: batch requests together to * reduce the number of grace periods during heavy kfree_rcu() load. */ -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr) { unsigned long flags; struct kfree_rcu_cpu *krcp; + bool ret; local_irq_save(flags); // For safely calling this_cpu_ptr(). krcp = this_cpu_ptr(&krc); @@ -3044,7 +3073,8 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) spin_lock(&krcp->lock); // Queue the object but don't yet schedule the batch. - if (debug_rcu_head_queue(head)) { + // NOTE: debug objects doesn't work for kvfree. + if (!ptr && debug_rcu_head_queue(head)) { // Probable double kfree_rcu(), just leak. WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", __func__, head); @@ -3055,7 +3085,29 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) * Under high memory pressure GFP_NOWAIT can fail, * in that case the emergency path is maintained. */ - if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) { + ret = !kfree_call_rcu_add_ptr_to_bulk(krcp, head, func, ptr); + if (unlikely(!ret)) { + if (ptr) { + struct kfree_rcu_wrap_kvarray *kvwrap; + + kvwrap = kzalloc(sizeof(*kvwrap), GFP_KERNEL); + + // If memory is really low, just try inline-freeing. + if (!kvwrap) { + // NOT SURE if permitted due to IRQ. Maybe we + // should try doing this from WQ? + synchronize_rcu(); + kvfree(ptr); + } + + kvwrap->ptr = ptr; + ptr = NULL; + head = &(kvwrap->head); + func = offsetof(typeof(*kvwrap), head); + // Tag the array as wrapper + func = (rcu_callback_t)((unsigned long)func + 1); + } + head->func = func; head->next = krcp->head; krcp->head = head;