Re: [PATCH 2/2] ceph: send cap releases more aggressively

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, 2019-01-15 at 16:03 +0800, Yan, Zheng wrote:
> When pending cap releases fill up one message, start a work to send
> cap release message. (old way is sending cap releases every 5 seconds)
> 
> Signed-off-by: "Yan, Zheng" <zyan@xxxxxxxxxx>
> ---
>  fs/ceph/caps.c       | 29 +++++++++-------------
>  fs/ceph/inode.c      |  2 +-
>  fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++-------
>  fs/ceph/mds_client.h | 10 +++++---
>  fs/ceph/super.c      |  9 ++++++-
>  fs/ceph/super.h      |  6 +++--
>  6 files changed, 80 insertions(+), 34 deletions(-)
> 
> diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
> index ef57491157fc..0f69b97205d4 100644
> --- a/fs/ceph/caps.c
> +++ b/fs/ceph/caps.c
> @@ -1113,9 +1113,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
>  	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
>  		cap->queue_release = 1;
>  		if (removed) {
> -			list_add_tail(&cap->session_caps,
> -				      &session->s_cap_releases);
> -			session->s_num_cap_releases++;
> +			__ceph_queue_cap_release(session, cap);
>  			removed = 0;
>  		}
>  	} else {
> @@ -1277,7 +1275,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
>   * Queue cap releases when an inode is dropped from our cache.  Since
>   * inode is about to be destroyed, there is no need for i_ceph_lock.
>   */
> -void ceph_queue_caps_release(struct inode *inode)
> +void __ceph_remove_caps(struct inode *inode)
>  {
>  	struct ceph_inode_info *ci = ceph_inode(inode);
>  	struct rb_node *p;
> @@ -3918,12 +3916,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  			cap->seq = seq;
>  			cap->issue_seq = seq;
>  			spin_lock(&session->s_cap_lock);
> -			list_add_tail(&cap->session_caps,
> -					&session->s_cap_releases);
> -			session->s_num_cap_releases++;
> +			__ceph_queue_cap_release(session, cap);
>  			spin_unlock(&session->s_cap_lock);
>  		}
> -		goto flush_cap_releases;
> +		goto done;
>  	}
>  
>  	/* these will work even if we don't have a cap yet */
> @@ -3993,7 +3989,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  		       ceph_cap_op_name(op));
>  	}
>  
> -	goto done;
> +done:
> +	mutex_unlock(&session->s_mutex);
> +done_unlocked:
> +	iput(inode);
> +	ceph_put_string(extra_info.pool_ns);
> +	return;
>  
>  flush_cap_releases:
>  	/*
> @@ -4001,14 +4002,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
>  	 * along for the mds (who clearly thinks we still have this
>  	 * cap).
>  	 */
> -	ceph_send_cap_releases(mdsc, session);
> -
> -done:
> -	mutex_unlock(&session->s_mutex);
> -done_unlocked:
> -	iput(inode);
> -	ceph_put_string(extra_info.pool_ns);
> -	return;
> +	ceph_flush_cap_releases(mdsc, session);
> +	goto done;
>  
>  bad:
>  	pr_err("ceph_handle_caps: corrupt message\n");
> diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
> index e6012de58aae..f588b2d7b598 100644
> --- a/fs/ceph/inode.c
> +++ b/fs/ceph/inode.c
> @@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
>  
>  	ceph_fscache_unregister_inode_cookie(ci);
>  
> -	ceph_queue_caps_release(inode);
> +	__ceph_remove_caps(inode);
>  
>  	if (__ceph_has_any_quota(ci))
>  		ceph_adjust_quota_realms_count(inode, false);
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index c9f1b1e8fa03..15df7108a68f 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -57,6 +57,7 @@ struct ceph_reconnect_state {
>  
>  static void __wake_requests(struct ceph_mds_client *mdsc,
>  			    struct list_head *head);
> +static void ceph_cap_release_work(struct work_struct *work);
>  
>  static const struct ceph_connection_operations mds_con_ops;
>  
> @@ -634,6 +635,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
>  	s->s_cap_reconnect = 0;
>  	s->s_cap_iterator = NULL;
>  	INIT_LIST_HEAD(&s->s_cap_releases);
> +	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
> +
>  	INIT_LIST_HEAD(&s->s_cap_flushing);
>  
>  	mdsc->sessions[mds] = s;
> @@ -659,6 +662,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
>  	dout("__unregister_session mds%d %p\n", s->s_mds, s);
>  	BUG_ON(mdsc->sessions[s->s_mds] != s);
>  	mdsc->sessions[s->s_mds] = NULL;
> +	s->s_state = 0;
>  	ceph_con_close(&s->s_con);
>  	ceph_put_mds_session(s);
>  	atomic_dec(&mdsc->num_sessions);
> @@ -1321,13 +1325,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
>  			cap->session = NULL;
>  			list_del_init(&cap->session_caps);
>  			session->s_nr_caps--;
> -			if (cap->queue_release) {
> -				list_add_tail(&cap->session_caps,
> -					      &session->s_cap_releases);
> -				session->s_num_cap_releases++;
> -			} else {
> +			if (cap->queue_release)
> +				__ceph_queue_cap_release(session, cap);
> +			else
>  				old_cap = cap;  /* put_cap it w/o locks held */
> -			}
>  		}
>  		if (ret < 0)
>  			goto out;
> @@ -1762,7 +1763,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
>  		session->s_trim_caps = 0;
>  	}
>  
> -	ceph_send_cap_releases(mdsc, session);
> +	ceph_flush_cap_releases(mdsc, session);
>  	return 0;
>  }
>  
> @@ -1805,8 +1806,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
>  /*
>   * called under s_mutex
>   */
> -void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> -			    struct ceph_mds_session *session)
> +static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> +				   struct ceph_mds_session *session)
>  {
>  	struct ceph_msg *msg = NULL;
>  	struct ceph_mds_cap_release *head;
> @@ -1898,6 +1899,45 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
>  	spin_unlock(&session->s_cap_lock);
>  }
>  
> +static void ceph_cap_release_work(struct work_struct *work)
> +{
> +	struct ceph_mds_session *session =
> +		container_of(work, struct ceph_mds_session, s_cap_release_work);
> +
> +	mutex_lock(&session->s_mutex);
> +	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
> +	    session->s_state == CEPH_MDS_SESSION_HUNG)
> +		ceph_send_cap_releases(session->s_mdsc, session);
> +	mutex_unlock(&session->s_mutex);
> +	ceph_put_mds_session(session);
> +}
> +
> +void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
> +		             struct ceph_mds_session *session)
> +{
> +	get_session(session);
> +	if (queue_work(mdsc->fsc->cap_release_wq,
> +		       &session->s_cap_release_work)) {
> +		dout("cap release work queued\n");
> +	} else {
> +		ceph_put_mds_session(session);
> +		dout("failed to queue cap release work\n");
> +	}
> +}
> +
> +/*
> + * caller holds session->s_cap_lock
> + */
> +void __ceph_queue_cap_release(struct ceph_mds_session *session,
> +			      struct ceph_cap *cap)
> +{
> +	list_add_tail(&cap->session_caps, &session->s_cap_releases);
> +	session->s_num_cap_releases++;
> +
> +	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
> +		ceph_flush_cap_releases(session->s_mdsc, session);
> +}
> +
>  /*
>   * requests
>   */
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index 94fe2312c092..a6052fb79733 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -172,12 +172,13 @@ struct ceph_mds_session {
>  	/* protected by s_cap_lock */
>  	spinlock_t        s_cap_lock;
>  	struct list_head  s_caps;     /* all caps issued by this session */
> +	struct ceph_cap  *s_cap_iterator;
>  	int               s_nr_caps, s_trim_caps;
>  	int               s_num_cap_releases;
>  	int		  s_cap_reconnect;
>  	int		  s_readonly;
>  	struct list_head  s_cap_releases; /* waiting cap_release messages */
> -	struct ceph_cap  *s_cap_iterator;
> +	struct work_struct s_cap_release_work;
>  
>  	/* protected by mutex */
>  	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
> @@ -458,9 +459,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
>  	kref_put(&req->r_kref, ceph_mdsc_release_request);
>  }
>  
> -extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
> -				   struct ceph_mds_session *session);
> -
> +extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
> +				    struct ceph_cap *cap);
> +extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
> +				    struct ceph_mds_session *session);
>  extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
>  
>  extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
> index 93404e3c89db..0e85dbd9ac8d 100644
> --- a/fs/ceph/super.c
> +++ b/fs/ceph/super.c
> @@ -680,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
>  	fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
>  	if (!fsc->trunc_wq)
>  		goto fail_pg_inv_wq;
> +	fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1);
> +	if (!fsc->cap_release_wq)
> +		goto fail_trunc_wq;
>  
>  	/* set up mempools */
>  	err = -ENOMEM;
> @@ -687,10 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
>  	size = sizeof (struct page *) * (page_count ? page_count : 1);
>  	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
>  	if (!fsc->wb_pagevec_pool)
> -		goto fail_trunc_wq;
> +		goto fail_cap_release_wq;
>  
>  	return fsc;
>  
> +fail_cap_release_wq:
> +	destroy_workqueue(fsc->cap_release_wq);
>  fail_trunc_wq:
>  	destroy_workqueue(fsc->trunc_wq);
>  fail_pg_inv_wq:
> @@ -712,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
>  	flush_workqueue(fsc->wb_wq);
>  	flush_workqueue(fsc->pg_inv_wq);
>  	flush_workqueue(fsc->trunc_wq);
> +	flush_workqueue(fsc->cap_release_wq);
>  }
>  
>  static void destroy_fs_client(struct ceph_fs_client *fsc)
> @@ -721,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
>  	destroy_workqueue(fsc->wb_wq);
>  	destroy_workqueue(fsc->pg_inv_wq);
>  	destroy_workqueue(fsc->trunc_wq);
> +	destroy_workqueue(fsc->cap_release_wq);
>  
>  	mempool_destroy(fsc->wb_pagevec_pool);
>  
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index 631b46e824a8..3eab95c02cec 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -107,10 +107,12 @@ struct ceph_fs_client {
>  
>  	/* writeback */
>  	mempool_t *wb_pagevec_pool;
> +	atomic_long_t writeback_count;
> +
>  	struct workqueue_struct *wb_wq;
>  	struct workqueue_struct *pg_inv_wq;
>  	struct workqueue_struct *trunc_wq;
> -	atomic_long_t writeback_count;
> +	struct workqueue_struct *cap_release_wq;
>  
>  #ifdef CONFIG_DEBUG_FS
>  	struct dentry *debugfs_dentry_lru, *debugfs_caps;
> @@ -989,11 +991,11 @@ extern void ceph_add_cap(struct inode *inode,
>  			 unsigned cap, unsigned seq, u64 realmino, int flags,
>  			 struct ceph_cap **new_cap);
>  extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
> +extern void __ceph_remove_caps(struct inode* inode);
>  extern void ceph_put_cap(struct ceph_mds_client *mdsc,
>  			 struct ceph_cap *cap);
>  extern int ceph_is_any_caps(struct inode *inode);
>  
> -extern void ceph_queue_caps_release(struct inode *inode);
>  extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
>  extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
>  		      int datasync);

Yes. Sorely needed.

Reviewed-by: Jeff Layton <jlayton@xxxxxxxxxx>




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux