On Thu, 2019-05-23 at 16:13 +0800, Yan, Zheng wrote: > iput_final() may wait for reahahead pages. The wait can cause deadlock. > For example: > > Workqueue: ceph-msgr ceph_con_workfn [libceph] > Call Trace: > schedule+0x36/0x80 > io_schedule+0x16/0x40 > __lock_page+0x101/0x140 > truncate_inode_pages_range+0x556/0x9f0 > truncate_inode_pages_final+0x4d/0x60 > evict+0x182/0x1a0 > iput+0x1d2/0x220 > iterate_session_caps+0x82/0x230 [ceph] > dispatch+0x678/0xa80 [ceph] > ceph_con_workfn+0x95b/0x1560 [libceph] > process_one_work+0x14d/0x410 > worker_thread+0x4b/0x460 > kthread+0x105/0x140 > ret_from_fork+0x22/0x40 > > Workqueue: ceph-msgr ceph_con_workfn [libceph] > Call Trace: > __schedule+0x3d6/0x8b0 > schedule+0x36/0x80 > schedule_preempt_disabled+0xe/0x10 > mutex_lock+0x2f/0x40 > ceph_check_caps+0x505/0xa80 [ceph] > ceph_put_wrbuffer_cap_refs+0x1e5/0x2c0 [ceph] > writepages_finish+0x2d3/0x410 [ceph] > __complete_request+0x26/0x60 [libceph] > handle_reply+0x6c8/0xa10 [libceph] > dispatch+0x29a/0xbb0 [libceph] > ceph_con_workfn+0x95b/0x1560 [libceph] > process_one_work+0x14d/0x410 > worker_thread+0x4b/0x460 > kthread+0x105/0x140 > ret_from_fork+0x22/0x40 > > In above example, truncate_inode_pages_range() waits for readahead pages > while holding s_mutex. ceph_check_caps() waits for s_mutex and blocks > OSD dispatch thread. Later OSD replies (for readahead) can't be handled. > > ceph_check_caps() also may lock snap_rwsem for read. So similar deadlock > can happen if iput_final() is called while holding snap_rwsem. > > In general, it's not good to call iput_final() inside MDS/OSD threads or > while holding any mutex. > > The fix is introducing ceph_async_iput(), which calls iput_final() in > workqueue. > > Signed-off-by: "Yan, Zheng" <zyan@xxxxxxxxxx> > --- > fs/ceph/caps.c | 12 ++++++++---- > fs/ceph/inode.c | 31 +++++++++++++++++++++++++++---- > fs/ceph/mds_client.c | 28 ++++++++++++++++++---------- > fs/ceph/quota.c | 9 ++++++--- > fs/ceph/snap.c | 16 +++++++++++----- > fs/ceph/super.h | 2 +- > 6 files changed, 71 insertions(+), 27 deletions(-) > > diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c > index 079d0df9650c..0176241eaea7 100644 > --- a/fs/ceph/caps.c > +++ b/fs/ceph/caps.c > @@ -2992,8 +2992,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, > } > if (complete_capsnap) > wake_up_all(&ci->i_cap_wq); > - while (put-- > 0) > - iput(inode); > + while (put-- > 0) { > + /* avoid calling iput_final() in osd dispatch threads */ > + ceph_async_iput(inode); > + } > } > > /* > @@ -3964,8 +3966,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, > done: > mutex_unlock(&session->s_mutex); > done_unlocked: > - iput(inode); > ceph_put_string(extra_info.pool_ns); > + /* avoid calling iput_final() in mds dispatch threads */ > + ceph_async_iput(inode); > return; > > flush_cap_releases: > @@ -4011,7 +4014,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) > if (inode) { > dout("check_delayed_caps on %p\n", inode); > ceph_check_caps(ci, flags, NULL); > - iput(inode); > + /* avoid calling iput_final() in tick thread */ > + ceph_async_iput(inode); > } > } > spin_unlock(&mdsc->cap_delay_lock); > diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c > index d9ff349821f0..8cfece240ffe 100644 > --- a/fs/ceph/inode.c > +++ b/fs/ceph/inode.c > @@ -1480,7 +1480,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, > pr_err("fill_inode badness on %p got %d\n", in, rc); > err = rc; > } > - iput(in); > + /* avoid calling iput_final() in mds dispatch threads */ > + ceph_async_iput(in); > } > > return err; > @@ -1678,8 +1679,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, > &req->r_caps_reservation); > if (ret < 0) { > pr_err("fill_inode badness on %p\n", in); > - if (d_really_is_negative(dn)) > - iput(in); > + if (d_really_is_negative(dn)) { > + /* avoid calling iput_final() in mds > + * dispatch threads */ > + ceph_async_iput(in); > + } > d_drop(dn); > err = ret; > goto next_item; > @@ -1689,7 +1693,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, > if (ceph_security_xattr_deadlock(in)) { > dout(" skip splicing dn %p to inode %p" > " (security xattr deadlock)\n", dn, in); > - iput(in); > + ceph_async_iput(in); > skipped++; > goto next_item; > } > @@ -1740,6 +1744,25 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) > return ret; > } > > +/* > + * Put reference to inode, but avoid calling iput_final() in current thread. > + * iput_final() may wait for reahahead pages. The wait can cause deadlock in > + * some contexts. > + */ > +void ceph_async_iput(struct inode *inode) > +{ > + if (!inode) > + return; > + for (;;) { > + if (atomic_add_unless(&inode->i_count, -1, 1)) > + break; > + if (queue_work(ceph_inode_to_client(inode)->inode_wq, > + &ceph_inode(inode)->i_work)) > + break; > + /* queue work failed, i_count must be at least 2 */ > + } > +} > + Clever! > /* > * Write back inode data in a worker thread. (This can't be done > * in the message handler context.) > diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c > index e979d1d543e4..60e8ddbdfdc5 100644 > --- a/fs/ceph/mds_client.c > +++ b/fs/ceph/mds_client.c > @@ -700,11 +700,12 @@ void ceph_mdsc_release_request(struct kref *kref) > ceph_msg_put(req->r_reply); > if (req->r_inode) { > ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); > - iput(req->r_inode); > + /* avoid calling iput_final() in mds dispatch threads */ > + ceph_async_iput(req->r_inode); > } > if (req->r_parent) > ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); > - iput(req->r_target_inode); > + ceph_async_iput(req->r_target_inode); > if (req->r_dentry) > dput(req->r_dentry); > if (req->r_old_dentry) > @@ -718,7 +719,7 @@ void ceph_mdsc_release_request(struct kref *kref) > */ > ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), > CEPH_CAP_PIN); > - iput(req->r_old_dentry_dir); > + ceph_async_iput(req->r_old_dentry_dir); > } > kfree(req->r_path1); > kfree(req->r_path2); > @@ -828,7 +829,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, > } > > if (req->r_unsafe_dir) { > - iput(req->r_unsafe_dir); > + /* avoid calling iput_final() in mds dispatch threads */ > + ceph_async_iput(req->r_unsafe_dir); > req->r_unsafe_dir = NULL; > } > > @@ -993,7 +995,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, > cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); > if (!cap) { > spin_unlock(&ci->i_ceph_lock); > - iput(inode); > + ceph_async_iput(inode); > goto random; > } > mds = cap->session->s_mds; > @@ -1002,7 +1004,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, > cap == ci->i_auth_cap ? "auth " : "", cap); > spin_unlock(&ci->i_ceph_lock); > out: > - iput(inode); > + /* avoid calling iput_final() while holding mdsc->mutex or > + * in mds dispatch threads */ > + ceph_async_iput(inode); > return mds; > > random: > @@ -1312,7 +1316,9 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, > spin_unlock(&session->s_cap_lock); > > if (last_inode) { > - iput(last_inode); > + /* avoid calling iput_final() while holding > + * s_mutex or in mds dispatch threads */ > + ceph_async_iput(last_inode); > last_inode = NULL; > } > if (old_cap) { > @@ -1345,7 +1351,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, > session->s_cap_iterator = NULL; > spin_unlock(&session->s_cap_lock); > > - iput(last_inode); > + ceph_async_iput(last_inode); > if (old_cap) > ceph_put_cap(session->s_mdsc, old_cap); > > @@ -1481,7 +1487,8 @@ static void remove_session_caps(struct ceph_mds_session *session) > spin_unlock(&session->s_cap_lock); > > inode = ceph_find_inode(sb, vino); > - iput(inode); > + /* avoid calling iput_final() while holding s_mutex */ > + ceph_async_iput(inode); > > spin_lock(&session->s_cap_lock); > } > @@ -3923,8 +3930,9 @@ static void handle_lease(struct ceph_mds_client *mdsc, > ceph_con_send(&session->s_con, msg); > > out: > - iput(inode); > mutex_unlock(&session->s_mutex); > + /* avoid calling iput_final() in mds dispatch threads */ > + ceph_async_iput(inode); > return; > > bad: > diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c > index c4522212872c..d629fc857450 100644 > --- a/fs/ceph/quota.c > +++ b/fs/ceph/quota.c > @@ -74,7 +74,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, > le64_to_cpu(h->max_files)); > spin_unlock(&ci->i_ceph_lock); > > - iput(inode); > + /* avoid calling iput_final() in dispatch thread */ > + ceph_async_iput(inode); > } > > static struct ceph_quotarealm_inode * > @@ -235,7 +236,8 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, > > ci = ceph_inode(in); > has_quota = __ceph_has_any_quota(ci); > - iput(in); > + /* avoid calling iput_final() while holding mdsc->snap_rwsem */ > + ceph_async_iput(in); > > next = realm->parent; > if (has_quota || !next) > @@ -372,7 +374,8 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, > pr_warn("Invalid quota check op (%d)\n", op); > exceeded = true; /* Just break the loop */ > } > - iput(in); > + /* avoid calling iput_final() while holding mdsc->snap_rwsem */ > + ceph_async_iput(in); > > next = realm->parent; > if (exceeded || !next) > diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c > index b26e12cd8ec3..72c6c022f02b 100644 > --- a/fs/ceph/snap.c > +++ b/fs/ceph/snap.c > @@ -648,13 +648,15 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) > if (!inode) > continue; > spin_unlock(&realm->inodes_with_caps_lock); > - iput(lastinode); > + /* avoid calling iput_final() while holding > + * mdsc->snap_rwsem or in mds dispatch threads */ > + ceph_async_iput(lastinode); > lastinode = inode; > ceph_queue_cap_snap(ci); > spin_lock(&realm->inodes_with_caps_lock); > } > spin_unlock(&realm->inodes_with_caps_lock); > - iput(lastinode); > + ceph_async_iput(lastinode); > > dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); > } > @@ -806,7 +808,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc) > ihold(inode); > spin_unlock(&mdsc->snap_flush_lock); > ceph_flush_snaps(ci, &session); > - iput(inode); > + /* avoid calling iput_final() while holding > + * session->s_mutex or in mds dispatch threads */ > + ceph_async_iput(inode); > spin_lock(&mdsc->snap_flush_lock); > } > spin_unlock(&mdsc->snap_flush_lock); > @@ -950,12 +954,14 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, > ceph_get_snap_realm(mdsc, realm); > ceph_put_snap_realm(mdsc, oldrealm); > > - iput(inode); > + /* avoid calling iput_final() while holding > + * mdsc->snap_rwsem or mds in dispatch threads */ > + ceph_async_iput(inode); > continue; > > skip_inode: > spin_unlock(&ci->i_ceph_lock); > - iput(inode); > + ceph_async_iput(inode); > } > > /* we may have taken some of the old realm's children. */ > diff --git a/fs/ceph/super.h b/fs/ceph/super.h > index 234610ce4155..11aeb540b0cf 100644 > --- a/fs/ceph/super.h > +++ b/fs/ceph/super.h > @@ -904,9 +904,9 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask); > extern bool ceph_inode_set_size(struct inode *inode, loff_t size); > extern void __ceph_do_pending_vmtruncate(struct inode *inode); > extern void ceph_queue_vmtruncate(struct inode *inode); > - > extern void ceph_queue_invalidate(struct inode *inode); > extern void ceph_queue_writeback(struct inode *inode); > +extern void ceph_async_iput(struct inode *inode); > > extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, > int mask, bool force); I think this looks reasonable. Reviewed-by: Jeff Layton <jlayton@xxxxxxxxxx>