When pending cap releases fill up one message, start a work to send cap release message. (old way is sending cap releases every 5 seconds) Signed-off-by: "Yan, Zheng" <zyan@xxxxxxxxxx> --- fs/ceph/caps.c | 29 +++++++++------------- fs/ceph/inode.c | 2 +- fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++------- fs/ceph/mds_client.h | 10 +++++--- fs/ceph/super.c | 9 ++++++- fs/ceph/super.h | 6 +++-- 6 files changed, 80 insertions(+), 34 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index ef57491157fc..0f69b97205d4 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1113,9 +1113,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { cap->queue_release = 1; if (removed) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); removed = 0; } } else { @@ -1277,7 +1275,7 @@ static int send_cap_msg(struct cap_msg_args *arg) * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_ceph_lock. */ -void ceph_queue_caps_release(struct inode *inode) +void __ceph_remove_caps(struct inode *inode) { struct ceph_inode_info *ci = ceph_inode(inode); struct rb_node *p; @@ -3918,12 +3916,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, cap->seq = seq; cap->issue_seq = seq; spin_lock(&session->s_cap_lock); - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; + __ceph_queue_cap_release(session, cap); spin_unlock(&session->s_cap_lock); } - goto flush_cap_releases; + goto done; } /* these will work even if we don't have a cap yet */ @@ -3993,7 +3989,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_cap_op_name(op)); } - goto done; +done: + mutex_unlock(&session->s_mutex); +done_unlocked: + iput(inode); + ceph_put_string(extra_info.pool_ns); + return; flush_cap_releases: /* @@ -4001,14 +4002,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, * along for the mds (who clearly thinks we still have this * cap). */ - ceph_send_cap_releases(mdsc, session); - -done: - mutex_unlock(&session->s_mutex); -done_unlocked: - iput(inode); - ceph_put_string(extra_info.pool_ns); - return; + ceph_flush_cap_releases(mdsc, session); + goto done; bad: pr_err("ceph_handle_caps: corrupt message\n"); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e6012de58aae..f588b2d7b598 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode) ceph_fscache_unregister_inode_cookie(ci); - ceph_queue_caps_release(inode); + __ceph_remove_caps(inode); if (__ceph_has_any_quota(ci)) ceph_adjust_quota_realms_count(inode, false); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index c9f1b1e8fa03..15df7108a68f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -57,6 +57,7 @@ struct ceph_reconnect_state { static void __wake_requests(struct ceph_mds_client *mdsc, struct list_head *head); +static void ceph_cap_release_work(struct work_struct *work); static const struct ceph_connection_operations mds_con_ops; @@ -634,6 +635,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); + INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); + INIT_LIST_HEAD(&s->s_cap_flushing); mdsc->sessions[mds] = s; @@ -659,6 +662,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc, dout("__unregister_session mds%d %p\n", s->s_mds, s); BUG_ON(mdsc->sessions[s->s_mds] != s); mdsc->sessions[s->s_mds] = NULL; + s->s_state = 0; ceph_con_close(&s->s_con); ceph_put_mds_session(s); atomic_dec(&mdsc->num_sessions); @@ -1321,13 +1325,10 @@ static int iterate_session_caps(struct ceph_mds_session *session, cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - if (cap->queue_release) { - list_add_tail(&cap->session_caps, - &session->s_cap_releases); - session->s_num_cap_releases++; - } else { + if (cap->queue_release) + __ceph_queue_cap_release(session, cap); + else old_cap = cap; /* put_cap it w/o locks held */ - } } if (ret < 0) goto out; @@ -1762,7 +1763,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_send_cap_releases(mdsc, session); + ceph_flush_cap_releases(mdsc, session); return 0; } @@ -1805,8 +1806,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, /* * called under s_mutex */ -void ceph_send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) { struct ceph_msg *msg = NULL; struct ceph_mds_cap_release *head; @@ -1898,6 +1899,45 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc, spin_unlock(&session->s_cap_lock); } +static void ceph_cap_release_work(struct work_struct *work) +{ + struct ceph_mds_session *session = + container_of(work, struct ceph_mds_session, s_cap_release_work); + + mutex_lock(&session->s_mutex); + if (session->s_state == CEPH_MDS_SESSION_OPEN || + session->s_state == CEPH_MDS_SESSION_HUNG) + ceph_send_cap_releases(session->s_mdsc, session); + mutex_unlock(&session->s_mutex); + ceph_put_mds_session(session); +} + +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + get_session(session); + if (queue_work(mdsc->fsc->cap_release_wq, + &session->s_cap_release_work)) { + dout("cap release work queued\n"); + } else { + ceph_put_mds_session(session); + dout("failed to queue cap release work\n"); + } +} + +/* + * caller holds session->s_cap_lock + */ +void __ceph_queue_cap_release(struct ceph_mds_session *session, + struct ceph_cap *cap) +{ + list_add_tail(&cap->session_caps, &session->s_cap_releases); + session->s_num_cap_releases++; + + if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) + ceph_flush_cap_releases(session->s_mdsc, session); +} + /* * requests */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 94fe2312c092..a6052fb79733 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -172,12 +172,13 @@ struct ceph_mds_session { /* protected by s_cap_lock */ spinlock_t s_cap_lock; struct list_head s_caps; /* all caps issued by this session */ + struct ceph_cap *s_cap_iterator; int s_nr_caps, s_trim_caps; int s_num_cap_releases; int s_cap_reconnect; int s_readonly; struct list_head s_cap_releases; /* waiting cap_release messages */ - struct ceph_cap *s_cap_iterator; + struct work_struct s_cap_release_work; /* protected by mutex */ struct list_head s_cap_flushing; /* inodes w/ flushing caps */ @@ -458,9 +459,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) kref_put(&req->r_kref, ceph_mdsc_release_request); } -extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session); - +extern void __ceph_queue_cap_release(struct ceph_mds_session *session, + struct ceph_cap *cap); +extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc); extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 93404e3c89db..0e85dbd9ac8d 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -680,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1); if (!fsc->trunc_wq) goto fail_pg_inv_wq; + fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1); + if (!fsc->cap_release_wq) + goto fail_trunc_wq; /* set up mempools */ err = -ENOMEM; @@ -687,10 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, size = sizeof (struct page *) * (page_count ? page_count : 1); fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size); if (!fsc->wb_pagevec_pool) - goto fail_trunc_wq; + goto fail_cap_release_wq; return fsc; +fail_cap_release_wq: + destroy_workqueue(fsc->cap_release_wq); fail_trunc_wq: destroy_workqueue(fsc->trunc_wq); fail_pg_inv_wq: @@ -712,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc) flush_workqueue(fsc->wb_wq); flush_workqueue(fsc->pg_inv_wq); flush_workqueue(fsc->trunc_wq); + flush_workqueue(fsc->cap_release_wq); } static void destroy_fs_client(struct ceph_fs_client *fsc) @@ -721,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) destroy_workqueue(fsc->wb_wq); destroy_workqueue(fsc->pg_inv_wq); destroy_workqueue(fsc->trunc_wq); + destroy_workqueue(fsc->cap_release_wq); mempool_destroy(fsc->wb_pagevec_pool); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 631b46e824a8..3eab95c02cec 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -107,10 +107,12 @@ struct ceph_fs_client { /* writeback */ mempool_t *wb_pagevec_pool; + atomic_long_t writeback_count; + struct workqueue_struct *wb_wq; struct workqueue_struct *pg_inv_wq; struct workqueue_struct *trunc_wq; - atomic_long_t writeback_count; + struct workqueue_struct *cap_release_wq; #ifdef CONFIG_DEBUG_FS struct dentry *debugfs_dentry_lru, *debugfs_caps; @@ -989,11 +991,11 @@ extern void ceph_add_cap(struct inode *inode, unsigned cap, unsigned seq, u64 realmino, int flags, struct ceph_cap **new_cap); extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release); +extern void __ceph_remove_caps(struct inode* inode); extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern int ceph_is_any_caps(struct inode *inode); -extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync); -- 2.17.2