Re: [PATCH v3 1/2] ceph: add ceph_async_put_cap_refs to avoid double lock and deadlock

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 2020/5/27 10:16, Yan, Zheng wrote:
On 5/27/20 9:42 AM, xiubli@xxxxxxxxxx wrote:
From: Xiubo Li <xiubli@xxxxxxxxxx>

In the ceph_check_caps() it may call the session->s_mutex lock/unlock
and in ceph_flush_snaps() it may call the mdsc->mutex lock/unlock. And
both of them called from ceph_mdsc_put_request(), which may under the
session or mdsc mutex lock/unlock pair already, we will hit the dead
lock or double lock issue.

There have some deadlock cases, like:
handle_forward()

MDS should never forward an async request.

Okay.



...
mutex_lock(&mdsc->mutex)
...
ceph_mdsc_put_request()
   --> ceph_mdsc_release_request()
     --> ceph_put_cap_request()
       --> ceph_put_cap_refs()
         --> ceph_check_caps()
...
mutex_unlock(&mdsc->mutex)

In normal case, async dirop caps should be put by ceph_async_foo_cb()
This only happens when session gets closed and cleaning up async requests. calling ceph_put_cap_refs_no_check_caps() here should work.

Checked it again, yeah, you are right. In normal case, the ceph_mdsc_release_dir_caps() in ceph_mdsc_release_request() shall never be called.


And also there maybe has some double session lock cases, like:

send_mds_reconnect()
...
mutex_lock(&session->s_mutex);
...
   --> replay_unsafe_requests()
     --> ceph_mdsc_release_dir_caps()
       --> ceph_put_cap_refs()
         --> ceph_check_caps()
...
mutex_unlock(&session->s_mutex);

calling ceph_put_cap_refs_no_check_caps() here should work too.



This patch will just delays to call them in a queue work to avoid
the dead lock and double lock issues.


The issue happens only in a few rare cases. I don't think it's worth to call all ceph_check_caps() async.

Will drop the async check caps stuff.

Thanks
BRs
Xiubo



Regards
Yan, Zheng

URL: https://tracker.ceph.com/issues/45635
Signed-off-by: Xiubo Li <xiubli@xxxxxxxxxx>
---
  fs/ceph/caps.c       | 89 ++++++++++++++++++++++++++++++++++++++++++++--------
  fs/ceph/inode.c      |  3 ++
  fs/ceph/mds_client.c |  2 +-
  fs/ceph/super.h      |  5 +++
  4 files changed, 85 insertions(+), 14 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 27c2e60..996bedb 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -9,6 +9,7 @@
  #include <linux/wait.h>
  #include <linux/writeback.h>
  #include <linux/iversion.h>
+#include <linux/bits.h>
    #include "super.h"
  #include "mds_client.h"
@@ -3007,19 +3008,15 @@ static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
      return 0;
  }
  -/*
- * Release cap refs.
- *
- * If we released the last ref on any given cap, call ceph_check_caps
- * to release (or schedule a release).
- *
- * If we are releasing a WR cap (from a sync write), finalize any affected
- * cap_snap, and wake up any waiters.
- */
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+#define CAP_REF_LAST_BIT       0
+#define CAP_REF_FLUSHSNAPS_BIT 1
+#define CAP_REF_WAKE_BIT       2
+
+static int __ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
  {
      struct inode *inode = &ci->vfs_inode;
      int last = 0, put = 0, flushsnaps = 0, wake = 0;
+    unsigned long flags = 0;
        spin_lock(&ci->i_ceph_lock);
      if (had & CEPH_CAP_PIN)
@@ -3073,13 +3070,79 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
           last ? " last" : "", put ? " put" : "");
        if (last)
-        ceph_check_caps(ci, 0, NULL);
+        set_bit(CAP_REF_LAST_BIT, &flags);
      else if (flushsnaps)
-        ceph_flush_snaps(ci, NULL);
+        set_bit(CAP_REF_FLUSHSNAPS_BIT, &flags);
      if (wake)
-        wake_up_all(&ci->i_cap_wq);
+        set_bit(CAP_REF_WAKE_BIT, &flags);
      while (put-- > 0)
          iput(inode);
+
+    return flags;
+}
+
+/*
+ * This is the bottow half of __ceph_put_cap_refs(), which
+ * may take the mdsc->mutex and session->s_mutex and this
+ * will be called in a queue work to void dead/double lock
+ * issues if called from ceph_mdsc_put_request().
+ */
+static void __ceph_put_cap_refs_bh(struct ceph_inode_info *ci,
+                   unsigned long flags)
+{
+    if (test_bit(CAP_REF_LAST_BIT, &flags))
+        ceph_check_caps(ci, 0, NULL);
+    else if (test_bit(CAP_REF_FLUSHSNAPS_BIT, &flags))
+        ceph_flush_snaps(ci, NULL);
+    if (test_bit(CAP_REF_WAKE_BIT, &flags))
+        wake_up_all(&ci->i_cap_wq);
+}
+
+/*
+ * Release cap refs.
+ *
+ * If we released the last ref on any given cap, call ceph_check_caps
+ * to release (or schedule a release).
+ *
+ * If we are releasing a WR cap (from a sync write), finalize any affected
+ * cap_snap, and wake up any waiters.
+ */
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+    unsigned long flags;
+
+    flags = __ceph_put_cap_refs(ci, had);
+    __ceph_put_cap_refs_bh(ci, flags);
+}
+
+void ceph_async_put_cap_refs_work(struct work_struct *work)
+{
+    struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+                          put_cap_refs_work);
+    unsigned long flags;
+
+    spin_lock(&ci->i_ceph_lock);
+    flags = xchg(&ci->pending_put_flags, 0);
+    spin_unlock(&ci->i_ceph_lock);
+    if (!flags)
+        return;
+
+    __ceph_put_cap_refs_bh(ci, flags);
+}
+
+void ceph_async_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+    struct inode *inode = &ci->vfs_inode;
+    unsigned long flags;
+
+    flags = __ceph_put_cap_refs(ci, had);
+
+    spin_lock(&ci->i_ceph_lock);
+    ci->pending_put_flags |= flags;
+    spin_unlock(&ci->i_ceph_lock);
+
+    queue_work(ceph_inode_to_client(inode)->inode_wq,
+           &ci->put_cap_refs_work);
  }
    /*
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 357c937..e0ea508 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -517,6 +517,9 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
      INIT_LIST_HEAD(&ci->i_snap_realm_item);
      INIT_LIST_HEAD(&ci->i_snap_flush_item);
  +    INIT_WORK(&ci->put_cap_refs_work, ceph_async_put_cap_refs_work);
+    ci->pending_put_flags = 0;
+
      INIT_WORK(&ci->i_work, ceph_inode_work);
      ci->i_work_mask = 0;
      memset(&ci->i_btime, '\0', sizeof(ci->i_btime));
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 0e0ab01..12506b7 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -3398,7 +3398,7 @@ void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
      dcaps = xchg(&req->r_dir_caps, 0);
      if (dcaps) {
          dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
-        ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
+        ceph_async_put_cap_refs(ceph_inode(req->r_parent), dcaps);
      }
  }
  diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 226f19c..ece94fc 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -421,6 +421,9 @@ struct ceph_inode_info {
      struct timespec64 i_btime;
      struct timespec64 i_snap_btime;
  +    struct work_struct put_cap_refs_work;
+    unsigned long pending_put_flags;
+
      struct work_struct i_work;
      unsigned long  i_work_mask;
  @@ -1095,6 +1098,8 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
                  bool snap_rwsem_locked);
  extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
  extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_async_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_async_put_cap_refs_work(struct work_struct *work);
  extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                         struct ceph_snap_context *snapc);
  extern void ceph_flush_snaps(struct ceph_inode_info *ci,






[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux