On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote: > On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote: > > Hello, Joel, Paul, Ted. > > > > > > > > Good point! > > > > > > Now that kfree_rcu() is on its way to being less of a hack deeply > > > entangled into the bowels of RCU, this might be fairly easy to implement. > > > It might well be simply a matter of a function pointer and a kvfree_rcu() > > > API. Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts. > > > > > I think it makes sense. For example i see there is a similar demand in > > the mm/list_lru.c too. As for implementation, it will not be hard, i think. > > > > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu() > > logic(probably will need to rename that function), i.e. to free vmalloc() allocations > > only in "emergency path" by just calling kvfree(). So that function in its turn will > > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path. > > The other difference between ext4_kvfree_array_rcu() and kfree_rcu() > is that kfree_rcu() is a magic macro which frees a structure, which > has to contain a struct rcu_head. In this case, I'm freeing a pointer > to set of structures, or in another case, a set of buffer_heads, which > do *not* have an rcu_head structure. > We can implement kvfree_rcu() that will deal with pointer only, it is not an issue. I mean without embedding rcu_head to the structure or whatever else. I tried to implement it with less number of changes to make it more clear and readable. Please have a look: <snip> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h index 2678a37c3169..9e75c15b53f9 100644 --- a/include/linux/rcupdate.h +++ b/include/linux/rcupdate.h @@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void) #define __kfree_rcu(head, offset) \ do { \ BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \ - kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \ + kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \ } while (0) /** @@ -842,6 +842,14 @@ do { \ __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \ } while (0) +#define kvfree_rcu(ptr) \ +do { \ + typeof (ptr) ___p = (ptr); \ + \ + if (___p) \ + kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \ +} while (0) + /* * Place this after a lock-acquisition primitive to guarantee that * an UNLOCK+LOCK pair acts as a full barrier. This guarantee applies diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h index 045c28b71f4f..a12ecc1645fa 100644 --- a/include/linux/rcutiny.h +++ b/include/linux/rcutiny.h @@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void) synchronize_rcu(); } -static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) +static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr) { call_rcu(head, func); } diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h index 45f3f66bb04d..1e445b566c01 100644 --- a/include/linux/rcutree.h +++ b/include/linux/rcutree.h @@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu) } void synchronize_rcu_expedited(void); -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func); +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr); void rcu_barrier(void); bool rcu_eqs_special_set(int cpu); diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 4a885af2ff73..5f22f369cb98 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -2746,6 +2746,7 @@ EXPORT_SYMBOL_GPL(call_rcu); * kfree_rcu_bulk_data structure becomes exactly one page. */ #define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3) +#define KVFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 2) /** * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers @@ -2761,6 +2762,12 @@ struct kfree_rcu_bulk_data { struct rcu_head *head_free_debug; }; +struct kvfree_rcu_bulk_data { + unsigned long nr_records; + void *records[KVFREE_BULK_MAX_ENTR]; + struct kvfree_rcu_bulk_data *next; +}; + /** * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period @@ -2773,6 +2780,7 @@ struct kfree_rcu_cpu_work { struct rcu_work rcu_work; struct rcu_head *head_free; struct kfree_rcu_bulk_data *bhead_free; + struct kvfree_rcu_bulk_data *bvhead_free; struct kfree_rcu_cpu *krcp; }; @@ -2796,6 +2804,10 @@ struct kfree_rcu_cpu { struct rcu_head *head; struct kfree_rcu_bulk_data *bhead; struct kfree_rcu_bulk_data *bcached; + struct kvfree_rcu_bulk_data *bvhead; + struct kvfree_rcu_bulk_data *bvcached; + + /* rename to "free_rcu_cpu_work" */ struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES]; spinlock_t lock; struct delayed_work monitor_work; @@ -2823,20 +2835,30 @@ static void kfree_rcu_work(struct work_struct *work) unsigned long flags; struct rcu_head *head, *next; struct kfree_rcu_bulk_data *bhead, *bnext; + struct kvfree_rcu_bulk_data *bvhead, *bvnext; struct kfree_rcu_cpu *krcp; struct kfree_rcu_cpu_work *krwp; + int i; krwp = container_of(to_rcu_work(work), struct kfree_rcu_cpu_work, rcu_work); + krcp = krwp->krcp; spin_lock_irqsave(&krcp->lock, flags); + /* Channel 1. */ head = krwp->head_free; krwp->head_free = NULL; + + /* Channel 2. */ bhead = krwp->bhead_free; krwp->bhead_free = NULL; + + /* Channel 3. */ + bvhead = krwp->bvhead_free; + krwp->bvhead_free = NULL; spin_unlock_irqrestore(&krcp->lock, flags); - /* "bhead" is now private, so traverse locklessly. */ + /* kmalloc()/kfree_rcu() channel. */ for (; bhead; bhead = bnext) { bnext = bhead->next; @@ -2855,6 +2877,21 @@ static void kfree_rcu_work(struct work_struct *work) cond_resched_tasks_rcu_qs(); } + /* kvmalloc()/kvfree_rcu() channel. */ + for (; bvhead; bvhead = bvnext) { + bvnext = bvhead->next; + + rcu_lock_acquire(&rcu_callback_map); + for (i = 0; i < bvhead->nr_records; i++) + kvfree(bvhead->records[i]); + rcu_lock_release(&rcu_callback_map); + + if (cmpxchg(&krcp->bvcached, NULL, bvhead)) + free_page((unsigned long) bvhead); + + cond_resched_tasks_rcu_qs(); + } + /* * Emergency case only. It can happen under low memory * condition when an allocation gets failed, so the "bulk" @@ -2901,7 +2938,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp) * return false to tell caller to retry. */ if ((krcp->bhead && !krwp->bhead_free) || - (krcp->head && !krwp->head_free)) { + (krcp->head && !krwp->head_free) || + (krcp->bvhead && !krwp->bvhead_free)) { /* Channel 1. */ if (!krwp->bhead_free) { krwp->bhead_free = krcp->bhead; @@ -2914,11 +2952,17 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp) krcp->head = NULL; } + /* Channel 3. */ + if (!krwp->bvhead_free) { + krwp->bvhead_free = krcp->bvhead; + krcp->bvhead = NULL; + } + /* - * One work is per one batch, so there are two "free channels", - * "bhead_free" and "head_free" the batch can handle. It can be - * that the work is in the pending state when two channels have - * been detached following each other, one by one. + * One work is per one batch, so there are three "free channels", + * "bhead_free", "head_free" and "bvhead_free" the batch can handle. + * It can be that the work is in the pending state when two channels + * have been detached following each other, one by one. */ queue_rcu_work(system_wq, &krwp->rcu_work); queued = true; @@ -3010,6 +3054,42 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, return true; } +static inline bool +kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr) +{ + struct kvfree_rcu_bulk_data *bnode; + + if (unlikely(!krcp->initialized)) + return false; + + lockdep_assert_held(&krcp->lock); + + if (!krcp->bvhead || + krcp->bvhead->nr_records == KVFREE_BULK_MAX_ENTR) { + bnode = xchg(&krcp->bvcached, NULL); + if (!bnode) { + WARN_ON_ONCE(sizeof(struct kvfree_rcu_bulk_data) > PAGE_SIZE); + + bnode = (struct kvfree_rcu_bulk_data *) + __get_free_page(GFP_NOWAIT | __GFP_NOWARN); + } + + if (unlikely(!bnode)) + return false; + + /* Initialize the new block. */ + bnode->nr_records = 0; + bnode->next = krcp->bvhead; + + /* Attach it to the bvhead. */ + krcp->bvhead = bnode; + } + + /* Finally insert. */ + krcp->bvhead->records[krcp->bvhead->nr_records++] = ptr; + return true; +} + /* * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace * period. Please note there are two paths are maintained, one is the main one @@ -3022,32 +3102,39 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, * be free'd in workqueue context. This allows us to: batch requests together to * reduce the number of grace periods during heavy kfree_rcu() load. */ -void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) +void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr_to_free) { unsigned long flags; struct kfree_rcu_cpu *krcp; + bool skip_call_rcu = true; local_irq_save(flags); // For safely calling this_cpu_ptr(). krcp = this_cpu_ptr(&krc); if (krcp->initialized) spin_lock(&krcp->lock); - // Queue the object but don't yet schedule the batch. - if (debug_rcu_head_queue(head)) { - // Probable double kfree_rcu(), just leak. - WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", - __func__, head); - goto unlock_return; - } + if (ptr_to_free) { + skip_call_rcu = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr_to_free); + if (!skip_call_rcu) + goto unlock_return; + } else { + // Queue the object but don't yet schedule the batch. + if (debug_rcu_head_queue(head)) { + // Probable double kfree_rcu(), just leak. + WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n", + __func__, head); + goto unlock_return; + } - /* - * Under high memory pressure GFP_NOWAIT can fail, - * in that case the emergency path is maintained. - */ - if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) { - head->func = func; - head->next = krcp->head; - krcp->head = head; + /* + * Under high memory pressure GFP_NOWAIT can fail, + * in that case the emergency path is maintained. + */ + if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) { + head->func = func; + head->next = krcp->head; + krcp->head = head; + } } // Set timer to drain after KFREE_DRAIN_JIFFIES. @@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) if (krcp->initialized) spin_unlock(&krcp->lock); local_irq_restore(flags); + + if (!skip_call_rcu) { + synchronize_rcu(); + kvfree(ptr_to_free); + } } EXPORT_SYMBOL_GPL(kfree_call_rcu); diff --git a/mm/list_lru.c b/mm/list_lru.c index 0f1f6b06b7f3..0efb849b4f1f 100644 --- a/mm/list_lru.c +++ b/mm/list_lru.c @@ -390,14 +390,6 @@ static void memcg_destroy_list_lru_node(struct list_lru_node *nlru) kvfree(memcg_lrus); } -static void kvfree_rcu(struct rcu_head *head) -{ - struct list_lru_memcg *mlru; - - mlru = container_of(head, struct list_lru_memcg, rcu); - kvfree(mlru); -} - static int memcg_update_list_lru_node(struct list_lru_node *nlru, int old_size, int new_size) { @@ -429,7 +421,7 @@ static int memcg_update_list_lru_node(struct list_lru_node *nlru, rcu_assign_pointer(nlru->memcg_lrus, new); spin_unlock_irq(&nlru->lock); - call_rcu(&old->rcu, kvfree_rcu); + kvfree_rcu(old); return 0; } <snip> now it becomes possible to use it like: ... void *p = kvmalloc(PAGE_SIZE); kvfree_rcu(p); ... also have a look at the example in the mm/list_lru.c diff. I can send out the RFC/PATCH that implements kvfree_rcu() API without need of "rcu_head" structure. Paul, Joel what are your thoughts? Thank you in advance! -- Vlad Rezki