[PATCH 2/2] ceph: send cap releases more aggressively

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)

Signed-off-by: "Yan, Zheng" <zyan@xxxxxxxxxx>
---
 fs/ceph/caps.c       | 29 +++++++++-------------
 fs/ceph/inode.c      |  2 +-
 fs/ceph/mds_client.c | 58 +++++++++++++++++++++++++++++++++++++-------
 fs/ceph/mds_client.h | 10 +++++---
 fs/ceph/super.c      |  9 ++++++-
 fs/ceph/super.h      |  6 +++--
 6 files changed, 80 insertions(+), 34 deletions(-)

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ef57491157fc..0f69b97205d4 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1113,9 +1113,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
 		cap->queue_release = 1;
 		if (removed) {
-			list_add_tail(&cap->session_caps,
-				      &session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			removed = 0;
 		}
 	} else {
@@ -1277,7 +1275,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
  */
-void ceph_queue_caps_release(struct inode *inode)
+void __ceph_remove_caps(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct rb_node *p;
@@ -3918,12 +3916,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 			cap->seq = seq;
 			cap->issue_seq = seq;
 			spin_lock(&session->s_cap_lock);
-			list_add_tail(&cap->session_caps,
-					&session->s_cap_releases);
-			session->s_num_cap_releases++;
+			__ceph_queue_cap_release(session, cap);
 			spin_unlock(&session->s_cap_lock);
 		}
-		goto flush_cap_releases;
+		goto done;
 	}
 
 	/* these will work even if we don't have a cap yet */
@@ -3993,7 +3989,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		       ceph_cap_op_name(op));
 	}
 
-	goto done;
+done:
+	mutex_unlock(&session->s_mutex);
+done_unlocked:
+	iput(inode);
+	ceph_put_string(extra_info.pool_ns);
+	return;
 
 flush_cap_releases:
 	/*
@@ -4001,14 +4002,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	 * along for the mds (who clearly thinks we still have this
 	 * cap).
 	 */
-	ceph_send_cap_releases(mdsc, session);
-
-done:
-	mutex_unlock(&session->s_mutex);
-done_unlocked:
-	iput(inode);
-	ceph_put_string(extra_info.pool_ns);
-	return;
+	ceph_flush_cap_releases(mdsc, session);
+	goto done;
 
 bad:
 	pr_err("ceph_handle_caps: corrupt message\n");
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e6012de58aae..f588b2d7b598 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
 
 	ceph_fscache_unregister_inode_cookie(ci);
 
-	ceph_queue_caps_release(inode);
+	__ceph_remove_caps(inode);
 
 	if (__ceph_has_any_quota(ci))
 		ceph_adjust_quota_realms_count(inode, false);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index c9f1b1e8fa03..15df7108a68f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -57,6 +57,7 @@ struct ceph_reconnect_state {
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head);
+static void ceph_cap_release_work(struct work_struct *work);
 
 static const struct ceph_connection_operations mds_con_ops;
 
@@ -634,6 +635,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
+	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
+
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 
 	mdsc->sessions[mds] = s;
@@ -659,6 +662,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
 	BUG_ON(mdsc->sessions[s->s_mds] != s);
 	mdsc->sessions[s->s_mds] = NULL;
+	s->s_state = 0;
 	ceph_con_close(&s->s_con);
 	ceph_put_mds_session(s);
 	atomic_dec(&mdsc->num_sessions);
@@ -1321,13 +1325,10 @@ static int iterate_session_caps(struct ceph_mds_session *session,
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			if (cap->queue_release) {
-				list_add_tail(&cap->session_caps,
-					      &session->s_cap_releases);
-				session->s_num_cap_releases++;
-			} else {
+			if (cap->queue_release)
+				__ceph_queue_cap_release(session, cap);
+			else
 				old_cap = cap;  /* put_cap it w/o locks held */
-			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1762,7 +1763,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
 		session->s_trim_caps = 0;
 	}
 
-	ceph_send_cap_releases(mdsc, session);
+	ceph_flush_cap_releases(mdsc, session);
 	return 0;
 }
 
@@ -1805,8 +1806,8 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
 /*
  * called under s_mutex
  */
-void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-			    struct ceph_mds_session *session)
+static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
@@ -1898,6 +1899,45 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 	spin_unlock(&session->s_cap_lock);
 }
 
+static void ceph_cap_release_work(struct work_struct *work)
+{
+	struct ceph_mds_session *session =
+		container_of(work, struct ceph_mds_session, s_cap_release_work);
+
+	mutex_lock(&session->s_mutex);
+	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
+	    session->s_state == CEPH_MDS_SESSION_HUNG)
+		ceph_send_cap_releases(session->s_mdsc, session);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+}
+
+void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+		             struct ceph_mds_session *session)
+{
+	get_session(session);
+	if (queue_work(mdsc->fsc->cap_release_wq,
+		       &session->s_cap_release_work)) {
+		dout("cap release work queued\n");
+	} else {
+		ceph_put_mds_session(session);
+		dout("failed to queue cap release work\n");
+	}
+}
+
+/*
+ * caller holds session->s_cap_lock
+ */
+void __ceph_queue_cap_release(struct ceph_mds_session *session,
+			      struct ceph_cap *cap)
+{
+	list_add_tail(&cap->session_caps, &session->s_cap_releases);
+	session->s_num_cap_releases++;
+
+	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
+		ceph_flush_cap_releases(session->s_mdsc, session);
+}
+
 /*
  * requests
  */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 94fe2312c092..a6052fb79733 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -172,12 +172,13 @@ struct ceph_mds_session {
 	/* protected by s_cap_lock */
 	spinlock_t        s_cap_lock;
 	struct list_head  s_caps;     /* all caps issued by this session */
+	struct ceph_cap  *s_cap_iterator;
 	int               s_nr_caps, s_trim_caps;
 	int               s_num_cap_releases;
 	int		  s_cap_reconnect;
 	int		  s_readonly;
 	struct list_head  s_cap_releases; /* waiting cap_release messages */
-	struct ceph_cap  *s_cap_iterator;
+	struct work_struct s_cap_release_work;
 
 	/* protected by mutex */
 	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
@@ -458,9 +459,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
 	kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
-				   struct ceph_mds_session *session);
-
+extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
+				    struct ceph_cap *cap);
+extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
+				    struct ceph_mds_session *session);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 93404e3c89db..0e85dbd9ac8d 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -680,6 +680,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
 	if (!fsc->trunc_wq)
 		goto fail_pg_inv_wq;
+	fsc->cap_release_wq = alloc_workqueue("ceph-cap-release", 0, 1);
+	if (!fsc->cap_release_wq)
+		goto fail_trunc_wq;
 
 	/* set up mempools */
 	err = -ENOMEM;
@@ -687,10 +690,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	size = sizeof (struct page *) * (page_count ? page_count : 1);
 	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
 	if (!fsc->wb_pagevec_pool)
-		goto fail_trunc_wq;
+		goto fail_cap_release_wq;
 
 	return fsc;
 
+fail_cap_release_wq:
+	destroy_workqueue(fsc->cap_release_wq);
 fail_trunc_wq:
 	destroy_workqueue(fsc->trunc_wq);
 fail_pg_inv_wq:
@@ -712,6 +717,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
 	flush_workqueue(fsc->wb_wq);
 	flush_workqueue(fsc->pg_inv_wq);
 	flush_workqueue(fsc->trunc_wq);
+	flush_workqueue(fsc->cap_release_wq);
 }
 
 static void destroy_fs_client(struct ceph_fs_client *fsc)
@@ -721,6 +727,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
 	destroy_workqueue(fsc->wb_wq);
 	destroy_workqueue(fsc->pg_inv_wq);
 	destroy_workqueue(fsc->trunc_wq);
+	destroy_workqueue(fsc->cap_release_wq);
 
 	mempool_destroy(fsc->wb_pagevec_pool);
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 631b46e824a8..3eab95c02cec 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -107,10 +107,12 @@ struct ceph_fs_client {
 
 	/* writeback */
 	mempool_t *wb_pagevec_pool;
+	atomic_long_t writeback_count;
+
 	struct workqueue_struct *wb_wq;
 	struct workqueue_struct *pg_inv_wq;
 	struct workqueue_struct *trunc_wq;
-	atomic_long_t writeback_count;
+	struct workqueue_struct *cap_release_wq;
 
 #ifdef CONFIG_DEBUG_FS
 	struct dentry *debugfs_dentry_lru, *debugfs_caps;
@@ -989,11 +991,11 @@ extern void ceph_add_cap(struct inode *inode,
 			 unsigned cap, unsigned seq, u64 realmino, int flags,
 			 struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
+extern void __ceph_remove_caps(struct inode* inode);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
 		      int datasync);
-- 
2.17.2




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux