On Tue, 2019-01-15 at 16:03 +0800, Yan, Zheng wrote: > When pending cap releases fill up one message, start a work to send > cap release message. (old way is sending cap releases every 5 seconds) > > Signed-off-by: "Yan, Zheng" <zyan@xxxxxxxxxx> > --- > fs/ceph/caps.c | 29 +++++++++------------- > fs/ceph/inode.c | 2 +- > fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++------- > fs/ceph/mds_client.h | 10 +++++--- > fs/ceph/super.c | 9 ++++++- > fs/ceph/super.h | 6 +++-- > 6 files changed, 80 insertions(+), 34 deletions(-) > > diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c > index ef57491157fc..0f69b97205d4 100644 > --- a/fs/ceph/caps.c > +++ b/fs/ceph/caps.c > @@ -1113,9 +1113,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) > (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { > cap->queue_release = 1; > if (removed) { > - list_add_tail(&cap->session_caps, > - &session->s_cap_releases); > - session->s_num_cap_releases++; > + __ceph_queue_cap_release(session, cap); > removed = 0; > } > } else { > @@ -1277,7 +1275,7 @@ static int send_cap_msg(struct cap_msg_args *arg) > * Queue cap releases when an inode is dropped from our cache. Since > * inode is about to be destroyed, there is no need for i_ceph_lock. > */ > -void ceph_queue_caps_release(struct inode *inode) > +void __ceph_remove_caps(struct inode *inode) > { > struct ceph_inode_info *ci = ceph_inode(inode); > struct rb_node *p; > @@ -3918,12 +3916,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, > cap->seq = seq; > cap->issue_seq = seq; > spin_lock(&session->s_cap_lock); > - list_add_tail(&cap->session_caps, > - &session->s_cap_releases); > - session->s_num_cap_releases++; > + __ceph_queue_cap_release(session, cap); > spin_unlock(&session->s_cap_lock); > } > - goto flush_cap_releases; > + goto done; > } > > /* these will work even if we don't have a cap yet */ > @@ -3993,7 +3989,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, > ceph_cap_op_name(op)); > } > > - goto done; > +done: > + mutex_unlock(&session->s_mutex); > +done_unlocked: > + iput(inode); > + ceph_put_string(extra_info.pool_ns); > + return; > > flush_cap_releases: > /* > @@ -4001,14 +4002,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, > * along for the mds (who clearly thinks we still have this > * cap). > */ > - ceph_send_cap_releases(mdsc, session); > - > -done: > - mutex_unlock(&session->s_mutex); > -done_unlocked: > - iput(inode); > - ceph_put_string(extra_info.pool_ns); > - return; > + ceph_flush_cap_releases(mdsc, session); > + goto done; > > bad: > pr_err("ceph_handle_caps: corrupt message\n"); > diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c > index e6012de58aae..f588b2d7b598 100644 > --- a/fs/ceph/inode.c > +++ b/fs/ceph/inode.c > @@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode) > > ceph_fscache_unregister_inode_cookie(ci); > > - ceph_queue_caps_release(inode); > + __ceph_remove_caps(inode); > > if (__ceph_has_any_quota(ci)) > ceph_adjust_quota_realms_count(inode, false); > diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c > index c9f1b1e8fa03..15df7108a68f 100644 > --- a/fs/ceph/mds_client.c > +++ b/fs/ceph/mds_client.c > @@ -57,6 +57,7 @@ struct ceph_reconnect_state { > > static void __wake_requests(struct ceph_mds_client *mdsc, > struct list_head *head); > +static void ceph_cap_release_work(struct work_struct *work); > > static const struct ceph_connection_operations mds_con_ops; > > @@ -634,6 +635,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, > s->s_cap_reconnect = 0; > s->s_cap_iterator = NULL; > INIT_LIST_HEAD(&s->s_cap_releases); > + INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); > + > INIT_LIST_HEAD(&s->s_cap_flushing); > > mdsc->sessions[mds] = s; > @@ -659,6 +662,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, > dout("__unregister_session mds%d %p\n", s->s_mds, s); > BUG_ON(mdsc->sessions[s->s_mds] != s); > mdsc->sessions[s->s_mds] = NULL; > + s->s_state = 0; > ceph_con_close(&s->s_con); > ceph_put_mds_session(s); > atomic_dec(&mdsc->num_sessions); > @@ -1321,13 +1325,10 @@ static int iterate_session_caps(struct ceph_mds_session *session, > cap->session = NULL; > list_del_init(&cap->session_caps); > session->s_nr_caps--; > - if (cap->queue_release) { > - list_add_tail(&cap->session_caps, > - &session->s_cap_releases); > - session->s_num_cap_releases++; > - } else { > + if (cap->queue_release) > + __ceph_queue_cap_release(session, cap); > + else > old_cap = cap; /* put_cap it w/o locks held */ > - } > } > if (ret < 0) > goto out; > @@ -1762,7 +1763,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, > session->s_trim_caps = 0; > } > > - ceph_send_cap_releases(mdsc, session); > + ceph_flush_cap_releases(mdsc, session); > return 0; > } > > @@ -1805,8 +1806,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, > /* > * called under s_mutex > */ > -void ceph_send_cap_releases(struct ceph_mds_client *mdsc, > - struct ceph_mds_session *session) > +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, > + struct ceph_mds_session *session) > { > struct ceph_msg *msg = NULL; > struct ceph_mds_cap_release *head; > @@ -1898,6 +1899,45 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc, > spin_unlock(&session->s_cap_lock); > } > > +static void ceph_cap_release_work(struct work_struct *work) > +{ > + struct ceph_mds_session *session = > + container_of(work, struct ceph_mds_session, s_cap_release_work); > + > + mutex_lock(&session->s_mutex); > + if (session->s_state == CEPH_MDS_SESSION_OPEN || > + session->s_state == CEPH_MDS_SESSION_HUNG) > + ceph_send_cap_releases(session->s_mdsc, session); > + mutex_unlock(&session->s_mutex); > + ceph_put_mds_session(session); > +} > + > +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, > + struct ceph_mds_session *session) > +{ > + get_session(session); > + if (queue_work(mdsc->fsc->cap_release_wq, > + &session->s_cap_release_work)) { > + dout("cap release work queued\n"); > + } else { > + ceph_put_mds_session(session); > + dout("failed to queue cap release work\n"); > + } > +} > + > +/* > + * caller holds session->s_cap_lock > + */ > +void __ceph_queue_cap_release(struct ceph_mds_session *session, > + struct ceph_cap *cap) > +{ > + list_add_tail(&cap->session_caps, &session->s_cap_releases); > + session->s_num_cap_releases++; > + > + if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) > + ceph_flush_cap_releases(session->s_mdsc, session); > +} > + > /* > * requests > */ > diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h > index 94fe2312c092..a6052fb79733 100644 > --- a/fs/ceph/mds_client.h > +++ b/fs/ceph/mds_client.h > @@ -172,12 +172,13 @@ struct ceph_mds_session { > /* protected by s_cap_lock */ > spinlock_t s_cap_lock; > struct list_head s_caps; /* all caps issued by this session */ > + struct ceph_cap *s_cap_iterator; > int s_nr_caps, s_trim_caps; > int s_num_cap_releases; > int s_cap_reconnect; > int s_readonly; > struct list_head s_cap_releases; /* waiting cap_release messages */ > - struct ceph_cap *s_cap_iterator; > + struct work_struct s_cap_release_work; > > /* protected by mutex */ > struct list_head s_cap_flushing; /* inodes w/ flushing caps */ > @@ -458,9 +459,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) > kref_put(&req->r_kref, ceph_mdsc_release_request); > } > > -extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, > - struct ceph_mds_session *session); > - > +extern void __ceph_queue_cap_release(struct ceph_mds_session *session, > + struct ceph_cap *cap); > +extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, > + struct ceph_mds_session *session); > extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); > > extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, > diff --git a/fs/ceph/super.c b/fs/ceph/super.c > index 93404e3c89db..0e85dbd9ac8d 100644 > --- a/fs/ceph/super.c > +++ b/fs/ceph/super.c > @@ -680,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, > fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1); > if (!fsc->trunc_wq) > goto fail_pg_inv_wq; > + fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1); > + if (!fsc->cap_release_wq) > + goto fail_trunc_wq; > > /* set up mempools */ > err = -ENOMEM; > @@ -687,10 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, > size = sizeof (struct page *) * (page_count ? page_count : 1); > fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size); > if (!fsc->wb_pagevec_pool) > - goto fail_trunc_wq; > + goto fail_cap_release_wq; > > return fsc; > > +fail_cap_release_wq: > + destroy_workqueue(fsc->cap_release_wq); > fail_trunc_wq: > destroy_workqueue(fsc->trunc_wq); > fail_pg_inv_wq: > @@ -712,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc) > flush_workqueue(fsc->wb_wq); > flush_workqueue(fsc->pg_inv_wq); > flush_workqueue(fsc->trunc_wq); > + flush_workqueue(fsc->cap_release_wq); > } > > static void destroy_fs_client(struct ceph_fs_client *fsc) > @@ -721,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) > destroy_workqueue(fsc->wb_wq); > destroy_workqueue(fsc->pg_inv_wq); > destroy_workqueue(fsc->trunc_wq); > + destroy_workqueue(fsc->cap_release_wq); > > mempool_destroy(fsc->wb_pagevec_pool); > > diff --git a/fs/ceph/super.h b/fs/ceph/super.h > index 631b46e824a8..3eab95c02cec 100644 > --- a/fs/ceph/super.h > +++ b/fs/ceph/super.h > @@ -107,10 +107,12 @@ struct ceph_fs_client { > > /* writeback */ > mempool_t *wb_pagevec_pool; > + atomic_long_t writeback_count; > + > struct workqueue_struct *wb_wq; > struct workqueue_struct *pg_inv_wq; > struct workqueue_struct *trunc_wq; > - atomic_long_t writeback_count; > + struct workqueue_struct *cap_release_wq; > > #ifdef CONFIG_DEBUG_FS > struct dentry *debugfs_dentry_lru, *debugfs_caps; > @@ -989,11 +991,11 @@ extern void ceph_add_cap(struct inode *inode, > unsigned cap, unsigned seq, u64 realmino, int flags, > struct ceph_cap **new_cap); > extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); > +extern void __ceph_remove_caps(struct inode* inode); > extern void ceph_put_cap(struct ceph_mds_client *mdsc, > struct ceph_cap *cap); > extern int ceph_is_any_caps(struct inode *inode); > > -extern void ceph_queue_caps_release(struct inode *inode); > extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); > extern int ceph_fsync(struct file *file, loff_t start, loff_t end, > int datasync); Yes. Sorely needed. Reviewed-by: Jeff Layton <jlayton@xxxxxxxxxx>