Re: [PATCH RFC] ext4: fix potential race between online resizing and write operations

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Feb 17, 2020 at 02:33:14PM -0500, Theodore Y. Ts'o wrote:
> On Mon, Feb 17, 2020 at 05:08:27PM +0100, Uladzislau Rezki wrote:
> > Hello, Joel, Paul, Ted. 
> > 
> > > 
> > > Good point!
> > > 
> > > Now that kfree_rcu() is on its way to being less of a hack deeply
> > > entangled into the bowels of RCU, this might be fairly easy to implement.
> > > It might well be simply a matter of a function pointer and a kvfree_rcu()
> > > API.  Adding Uladzislau Rezki and Joel Fernandez on CC for their thoughts.
> > > 
> > I think it makes sense. For example i see there is a similar demand in
> > the mm/list_lru.c too. As for implementation, it will not be hard, i think. 
> > 
> > The easiest way is to inject kvfree() support directly into existing kfree_call_rcu()
> > logic(probably will need to rename that function), i.e. to free vmalloc() allocations
> > only in "emergency path" by just calling kvfree(). So that function in its turn will
> > figure out if it is _vmalloc_ address or not and trigger corresponding "free" path.
> 
> The other difference between ext4_kvfree_array_rcu() and kfree_rcu()
> is that kfree_rcu() is a magic macro which frees a structure, which
> has to contain a struct rcu_head.  In this case, I'm freeing a pointer
> to set of structures, or in another case, a set of buffer_heads, which
> do *not* have an rcu_head structure.
> 
We can implement kvfree_rcu() that will deal with pointer only, it is not
an issue. I mean without embedding rcu_head to the structure or whatever
else.

I tried to implement it with less number of changes to make it more clear
and readable. Please have a look:

<snip>
diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
index 2678a37c3169..9e75c15b53f9 100644
--- a/include/linux/rcupdate.h
+++ b/include/linux/rcupdate.h
@@ -805,7 +805,7 @@ static inline notrace void rcu_read_unlock_sched_notrace(void)
 #define __kfree_rcu(head, offset) \
        do { \  
                BUILD_BUG_ON(!__is_kfree_rcu_offset(offset)); \
-               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset)); \
+               kfree_call_rcu(head, (rcu_callback_t)(unsigned long)(offset), NULL); \
        } while (0)

 /**
@@ -842,6 +842,14 @@ do {                                                                       \
                __kfree_rcu(&((___p)->rhf), offsetof(typeof(*(ptr)), rhf)); \
 } while (0)

+#define kvfree_rcu(ptr)                                                \
+do {                                                                   \
+       typeof (ptr) ___p = (ptr);                                      \
+                                                                       \
+       if (___p)                                                       \
+               kfree_call_rcu(NULL, (rcu_callback_t)(unsigned long)(0), ___p); \
+} while (0)
+
 /*
  * Place this after a lock-acquisition primitive to guarantee that
  * an UNLOCK+LOCK pair acts as a full barrier.  This guarantee applies
diff --git a/include/linux/rcutiny.h b/include/linux/rcutiny.h
index 045c28b71f4f..a12ecc1645fa 100644
--- a/include/linux/rcutiny.h
+++ b/include/linux/rcutiny.h
@@ -34,7 +34,7 @@ static inline void synchronize_rcu_expedited(void)
        synchronize_rcu();
 }

-static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr)
 {
        call_rcu(head, func);
 }
diff --git a/include/linux/rcutree.h b/include/linux/rcutree.h
index 45f3f66bb04d..1e445b566c01 100644
--- a/include/linux/rcutree.h
+++ b/include/linux/rcutree.h
@@ -33,7 +33,7 @@ static inline void rcu_virt_note_context_switch(int cpu)
 }
 
 void synchronize_rcu_expedited(void);
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr);
 
 void rcu_barrier(void);
 bool rcu_eqs_special_set(int cpu);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 4a885af2ff73..5f22f369cb98 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2746,6 +2746,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
  * kfree_rcu_bulk_data structure becomes exactly one page.
  */
 #define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3)
+#define KVFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 2)
 
 /**
  * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers
@@ -2761,6 +2762,12 @@ struct kfree_rcu_bulk_data {
        struct rcu_head *head_free_debug;
 };
+struct kvfree_rcu_bulk_data {
+       unsigned long nr_records;
+       void *records[KVFREE_BULK_MAX_ENTR];
+       struct kvfree_rcu_bulk_data *next;
+};
+
 /**
  * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
  * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
@@ -2773,6 +2780,7 @@ struct kfree_rcu_cpu_work {
        struct rcu_work rcu_work;
        struct rcu_head *head_free;
        struct kfree_rcu_bulk_data *bhead_free;
+       struct kvfree_rcu_bulk_data *bvhead_free;
        struct kfree_rcu_cpu *krcp;
 };
 
@@ -2796,6 +2804,10 @@ struct kfree_rcu_cpu {
        struct rcu_head *head;
        struct kfree_rcu_bulk_data *bhead;
        struct kfree_rcu_bulk_data *bcached;
+       struct kvfree_rcu_bulk_data *bvhead;
+       struct kvfree_rcu_bulk_data *bvcached;
+
+       /* rename to "free_rcu_cpu_work" */
        struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
        spinlock_t lock;
        struct delayed_work monitor_work;
@@ -2823,20 +2835,30 @@ static void kfree_rcu_work(struct work_struct *work)
        unsigned long flags;
        struct rcu_head *head, *next;
        struct kfree_rcu_bulk_data *bhead, *bnext;
+       struct kvfree_rcu_bulk_data *bvhead, *bvnext;
        struct kfree_rcu_cpu *krcp;
        struct kfree_rcu_cpu_work *krwp;
+       int i;
        krwp = container_of(to_rcu_work(work),
                            struct kfree_rcu_cpu_work, rcu_work);
+
        krcp = krwp->krcp;
        spin_lock_irqsave(&krcp->lock, flags);
+       /* Channel 1. */
        head = krwp->head_free;
        krwp->head_free = NULL;
+
+       /* Channel 2. */
        bhead = krwp->bhead_free;
        krwp->bhead_free = NULL;
+
+       /* Channel 3. */
+       bvhead = krwp->bvhead_free;
+       krwp->bvhead_free = NULL;
        spin_unlock_irqrestore(&krcp->lock, flags);
 
-       /* "bhead" is now private, so traverse locklessly. */
+       /* kmalloc()/kfree_rcu() channel. */
        for (; bhead; bhead = bnext) {
                bnext = bhead->next;
 
@@ -2855,6 +2877,21 @@ static void kfree_rcu_work(struct work_struct *work)
                cond_resched_tasks_rcu_qs();
        }
 
+       /* kvmalloc()/kvfree_rcu() channel. */
+       for (; bvhead; bvhead = bvnext) {
+               bvnext = bvhead->next;
+
+               rcu_lock_acquire(&rcu_callback_map);
+               for (i = 0; i < bvhead->nr_records; i++)
+                       kvfree(bvhead->records[i]);
+               rcu_lock_release(&rcu_callback_map);
+
+               if (cmpxchg(&krcp->bvcached, NULL, bvhead))
+                       free_page((unsigned long) bvhead);
+
+               cond_resched_tasks_rcu_qs();
+       }
+
        /*
         * Emergency case only. It can happen under low memory
         * condition when an allocation gets failed, so the "bulk"
@@ -2901,7 +2938,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
                 * return false to tell caller to retry.
                 */
                if ((krcp->bhead && !krwp->bhead_free) ||
-                               (krcp->head && !krwp->head_free)) {
+                               (krcp->head && !krwp->head_free) ||
+                               (krcp->bvhead && !krwp->bvhead_free)) {
                        /* Channel 1. */
                        if (!krwp->bhead_free) {
                                krwp->bhead_free = krcp->bhead;
@@ -2914,11 +2952,17 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
                                krcp->head = NULL;
                        }
 
+                       /* Channel 3. */
+                       if (!krwp->bvhead_free) {
+                               krwp->bvhead_free = krcp->bvhead;
+                               krcp->bvhead = NULL;
+                       }
+
                        /*
-                        * One work is per one batch, so there are two "free channels",
-                        * "bhead_free" and "head_free" the batch can handle. It can be
-                        * that the work is in the pending state when two channels have
-                        * been detached following each other, one by one.
+                        * One work is per one batch, so there are three "free channels",
+                        * "bhead_free", "head_free" and "bvhead_free" the batch can handle.
+                        * It can be that the work is in the pending state when two channels
+                        * have been detached following each other, one by one.
                         */
                        queue_rcu_work(system_wq, &krwp->rcu_work);
                        queued = true;
@@ -3010,6 +3054,42 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
        return true;
 }
 
+static inline bool
+kvfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp, void *ptr)
+{
+       struct kvfree_rcu_bulk_data *bnode;
+
+       if (unlikely(!krcp->initialized))
+               return false;
+
+       lockdep_assert_held(&krcp->lock);
+
+       if (!krcp->bvhead ||
+                       krcp->bvhead->nr_records == KVFREE_BULK_MAX_ENTR) {
+               bnode = xchg(&krcp->bvcached, NULL);
+               if (!bnode) {
+                       WARN_ON_ONCE(sizeof(struct kvfree_rcu_bulk_data) > PAGE_SIZE);
+
+                       bnode = (struct kvfree_rcu_bulk_data *)
+                               __get_free_page(GFP_NOWAIT | __GFP_NOWARN);
+               }
+
+               if (unlikely(!bnode))
+                       return false;
+
+               /* Initialize the new block. */
+               bnode->nr_records = 0;
+               bnode->next = krcp->bvhead;
+
+               /* Attach it to the bvhead. */
+               krcp->bvhead = bnode;
+       }
+
+       /* Finally insert. */
+       krcp->bvhead->records[krcp->bvhead->nr_records++] = ptr;
+       return true;
+}
+
 /*
  * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace
  * period. Please note there are two paths are maintained, one is the main one
@@ -3022,32 +3102,39 @@ kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
  * be free'd in workqueue context. This allows us to: batch requests together to
  * reduce the number of grace periods during heavy kfree_rcu() load.
  */
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func, void *ptr_to_free)
 {
        unsigned long flags;
        struct kfree_rcu_cpu *krcp;
+       bool skip_call_rcu = true;
 
        local_irq_save(flags);  // For safely calling this_cpu_ptr().
        krcp = this_cpu_ptr(&krc);
        if (krcp->initialized)
                spin_lock(&krcp->lock);
 
-       // Queue the object but don't yet schedule the batch.
-       if (debug_rcu_head_queue(head)) {
-               // Probable double kfree_rcu(), just leak.
-               WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
-                         __func__, head);
-               goto unlock_return;
-       }
+       if (ptr_to_free) {
+               skip_call_rcu = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr_to_free);
+               if (!skip_call_rcu)
+                       goto unlock_return;
+       } else {
+               // Queue the object but don't yet schedule the batch.
+               if (debug_rcu_head_queue(head)) {
+                       // Probable double kfree_rcu(), just leak.
+                       WARN_ONCE(1, "%s(): Double-freed call. rcu_head %p\n",
+                               __func__, head);
+                       goto unlock_return;
+               }
 
-       /*
-        * Under high memory pressure GFP_NOWAIT can fail,
-        * in that case the emergency path is maintained.
-        */
-       if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
-               head->func = func;
-               head->next = krcp->head;
-               krcp->head = head;
+               /*
+                * Under high memory pressure GFP_NOWAIT can fail,
+                * in that case the emergency path is maintained.
+                */
+               if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+                       head->func = func;
+                       head->next = krcp->head;
+                       krcp->head = head;
+               }
        }
 
        // Set timer to drain after KFREE_DRAIN_JIFFIES.
@@ -3061,6 +3148,11 @@ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
        if (krcp->initialized)
                spin_unlock(&krcp->lock);
        local_irq_restore(flags);
+
+       if (!skip_call_rcu) {
+               synchronize_rcu();
+               kvfree(ptr_to_free);
+       }
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);

diff --git a/mm/list_lru.c b/mm/list_lru.c
index 0f1f6b06b7f3..0efb849b4f1f 100644
--- a/mm/list_lru.c
+++ b/mm/list_lru.c
@@ -390,14 +390,6 @@ static void memcg_destroy_list_lru_node(struct list_lru_node *nlru)
        kvfree(memcg_lrus);
 }

-static void kvfree_rcu(struct rcu_head *head)
-{
-       struct list_lru_memcg *mlru;
-
-       mlru = container_of(head, struct list_lru_memcg, rcu);
-       kvfree(mlru);
-}
-
 static int memcg_update_list_lru_node(struct list_lru_node *nlru,
                                      int old_size, int new_size)
 {
@@ -429,7 +421,7 @@ static int memcg_update_list_lru_node(struct list_lru_node *nlru,
        rcu_assign_pointer(nlru->memcg_lrus, new);
        spin_unlock_irq(&nlru->lock);

-       call_rcu(&old->rcu, kvfree_rcu);
+       kvfree_rcu(old);
        return 0;
 }
<snip>

now it becomes possible to use it like: 
	...
	void *p = kvmalloc(PAGE_SIZE);
	kvfree_rcu(p);
	...
also have a look at the example in the mm/list_lru.c diff.

I can send out the RFC/PATCH that implements kvfree_rcu() API without need
of "rcu_head" structure. 

Paul, Joel what are your thoughts?

Thank you in advance!

--
Vlad Rezki



[Index of Archives]     [Reiser Filesystem Development]     [Ceph FS]     [Kernel Newbies]     [Security]     [Netfilter]     [Bugtraq]     [Linux FS]     [Yosemite National Park]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux RAID]     [Samba]     [Device Mapper]     [Linux Media]

  Powered by Linux