[PATCH 09/20] ceph: MDS client

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The MDS client is responsible for submitting requests to the MDS
cluster and parsing the response.  We decide which MDS to submit each
request to based on cached information about the current partition of
the directory hierarchy across the cluster.  A stateful session is
opened with each MDS before we submit requests to it, and a mutex is
used to control the ordering of messages within each session.

An MDS request may generate two responses.  The first indicates the
operation was a success and returns any result.  A second reply is
sent when the operation commits to the journal.  Note that locking
on the MDS ensures that the results of updates are visible only to
the updating client until the operation commits.

Requests are linked to the containing directory so that an fsync will
wait for them to commit.

If an MDS fails and/or recovers, we resubmit requests as needed.

Signed-off-by: Sage Weil <sage@xxxxxxxxxxxx>
---
 fs/ceph/mds_client.c | 2391 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/mds_client.h |  314 +++++++
 fs/ceph/mdsmap.c     |  118 +++
 fs/ceph/mdsmap.h     |   94 ++
 4 files changed, 2917 insertions(+), 0 deletions(-)
 create mode 100644 fs/ceph/mds_client.c
 create mode 100644 fs/ceph/mds_client.h
 create mode 100644 fs/ceph/mdsmap.c
 create mode 100644 fs/ceph/mdsmap.h

diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
new file mode 100644
index 0000000..426ecb1
--- /dev/null
+++ b/fs/ceph/mds_client.c
@@ -0,0 +1,2391 @@
+
+#include <linux/wait.h>
+#include <linux/sched.h>
+#include "mds_client.h"
+#include "mon_client.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_mdsc __read_mostly = -1;
+#define DOUT_VAR ceph_debug_mdsc
+#define DOUT_MASK DOUT_MASK_MDSC
+#include "super.h"
+#include "messenger.h"
+#include "decode.h"
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+			    struct list_head *head);
+
+/*
+ * address and send message to a given mds
+ */
+void ceph_send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
+		       int mds)
+{
+	msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
+	msg->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
+	msg->hdr.dst.name.num = cpu_to_le32(mds);
+	ceph_msg_send(mdsc->client->msgr, msg, BASE_DELAY_INTERVAL);
+}
+
+
+/*
+ * mds reply parsing
+ */
+
+/*
+ * parse individual inode info
+ */
+static int parse_reply_info_in(void **p, void *end,
+			       struct ceph_mds_reply_info_in *info)
+{
+	int err = -EINVAL;
+
+	info->in = *p;
+	*p += sizeof(struct ceph_mds_reply_inode) +
+		sizeof(*info->in->fragtree.splits) *
+		le32_to_cpu(info->in->fragtree.nsplits);
+
+	ceph_decode_32_safe(p, end, info->symlink_len, bad);
+	ceph_decode_need(p, end, info->symlink_len, bad);
+	info->symlink = *p;
+	*p += info->symlink_len;
+
+	ceph_decode_32_safe(p, end, info->xattr_len, bad);
+	ceph_decode_need(p, end, info->xattr_len, bad);
+	info->xattr_data = *p;
+	*p += info->xattr_len;
+	return 0;
+bad:
+	return err;
+}
+
+/*
+ * parse a full metadata trace from the mds: inode, dirinfo, dentry, inode...
+ * sequence.
+ */
+static int parse_reply_info_trace(void **p, void *end,
+				  struct ceph_mds_reply_info_parsed *info)
+{
+	u16 numi, numd, snapdirpos;
+	int err;
+
+	ceph_decode_need(p, end, 3*sizeof(u16), bad);
+	ceph_decode_16(p, numi);
+	ceph_decode_16(p, numd);
+	ceph_decode_16(p, snapdirpos);
+	info->trace_numi = numi;
+	info->trace_numd = numd;
+	info->trace_snapdirpos = snapdirpos;
+	if (numi == 0) {
+		info->trace_in = NULL;
+		goto done;   /* hrm, this shouldn't actually happen, but.. */
+	}
+
+	/* alloc one big block of memory for all of these arrays */
+	info->trace_in = kmalloc(numi * (sizeof(*info->trace_in) +
+					 sizeof(*info->trace_dir) +
+					 sizeof(*info->trace_dname) +
+					 sizeof(*info->trace_dname_len) +
+					 sizeof(*info->trace_dlease)),
+				 GFP_NOFS);
+	if (info->trace_in == NULL) {
+		err = -ENOMEM;
+		goto out_bad;
+	}
+	info->trace_dir = (void *)(info->trace_in + numi);
+	info->trace_dname = (void *)(info->trace_dir + numd);
+	info->trace_dname_len = (void *)(info->trace_dname + numd);
+	info->trace_dlease = (void *)(info->trace_dname_len + numd);
+
+	/*
+	 * the trace starts at the deepest point, and works up toward
+	 * the root inode.
+	 */
+	if (numi == numd)
+		goto dentry;
+inode:
+	if (!numi)
+		goto done;
+	numi--;
+	err = parse_reply_info_in(p, end, &info->trace_in[numi]);
+	if (err < 0)
+		goto out_bad;
+
+dentry:
+	if (!numd)
+		goto done;
+	numd--;
+	ceph_decode_32_safe(p, end, info->trace_dname_len[numd], bad);
+	ceph_decode_need(p, end, info->trace_dname_len[numd], bad);
+	info->trace_dname[numd] = *p;
+	*p += info->trace_dname_len[numd];
+	info->trace_dlease[numd] = *p;
+	*p += sizeof(struct ceph_mds_reply_lease);
+
+	/* dir frag info */
+	if (unlikely(*p + sizeof(struct ceph_mds_reply_dirfrag) > end))
+		goto bad;
+	info->trace_dir[numd] = *p;
+	*p += sizeof(struct ceph_mds_reply_dirfrag) +
+		sizeof(u32)*le32_to_cpu(info->trace_dir[numd]->ndist);
+	if (unlikely(*p > end))
+		goto bad;
+	goto inode;
+
+done:
+	if (unlikely(*p != end))
+		goto bad;
+	return 0;
+
+bad:
+	err = -EINVAL;
+out_bad:
+	derr(1, "problem parsing trace %d\n", err);
+	return err;
+}
+
+/*
+ * parse readdir results
+ */
+static int parse_reply_info_dir(void **p, void *end,
+				struct ceph_mds_reply_info_parsed *info)
+{
+	u32 num, i = 0;
+	int err;
+
+	info->dir_dir = *p;
+	if (*p + sizeof(*info->dir_dir) > end)
+		goto bad;
+	*p += sizeof(*info->dir_dir) +
+		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
+	if (*p > end)
+		goto bad;
+
+	ceph_decode_32_safe(p, end, num, bad);
+	if (num == 0)
+		goto done;
+
+	/* alloc large array */
+	info->dir_nr = num;
+	info->dir_in = kmalloc(num * (sizeof(*info->dir_in) +
+				      sizeof(*info->dir_dname) +
+				      sizeof(*info->dir_dname_len) +
+				      sizeof(*info->dir_dlease)),
+			       GFP_NOFS);
+	if (info->dir_in == NULL) {
+		err = -ENOMEM;
+		goto out_bad;
+	}
+	info->dir_dname = (void *)(info->dir_in + num);
+	info->dir_dname_len = (void *)(info->dir_dname + num);
+	info->dir_dlease = (void *)(info->dir_dname_len + num);
+
+	while (num) {
+		/* dentry */
+		ceph_decode_32_safe(p, end, info->dir_dname_len[i], bad);
+		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
+		info->dir_dname[i] = *p;
+		*p += info->dir_dname_len[i];
+		dout(20, "parsed dir dname '%.*s'\n", info->dir_dname_len[i],
+		     info->dir_dname[i]);
+		info->dir_dlease[i] = *p;
+		*p += sizeof(struct ceph_mds_reply_lease);
+
+		/* inode */
+		err = parse_reply_info_in(p, end, &info->dir_in[i]);
+		if (err < 0)
+			goto out_bad;
+		i++;
+		num--;
+	}
+
+done:
+	return 0;
+
+bad:
+	err = -EINVAL;
+out_bad:
+	derr(1, "problem parsing dir contents %d\n", err);
+	return err;
+}
+
+/*
+ * parse entire mds reply
+ */
+static int parse_reply_info(struct ceph_msg *msg,
+			    struct ceph_mds_reply_info_parsed *info)
+{
+	void *p, *end;
+	u32 len;
+	int err;
+
+	info->head = msg->front.iov_base;
+	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
+	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
+
+	/* trace */
+	ceph_decode_32_safe(&p, end, len, bad);
+	if (len > 0) {
+		err = parse_reply_info_trace(&p, p+len, info);
+		if (err < 0)
+			goto out_bad;
+	}
+
+	/* dir content */
+	ceph_decode_32_safe(&p, end, len, bad);
+	if (len > 0) {
+		err = parse_reply_info_dir(&p, p+len, info);
+		if (err < 0)
+			goto out_bad;
+	}
+
+	/* snap blob */
+	ceph_decode_32_safe(&p, end, len, bad);
+	info->snapblob_len = len;
+	info->snapblob = p;
+	p += len;
+
+	if (p != end)
+		goto bad;
+	return 0;
+
+bad:
+	err = -EINVAL;
+out_bad:
+	derr(1, "parse_reply err %d\n", err);
+	return err;
+}
+
+static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
+{
+	kfree(info->trace_in);
+	kfree(info->dir_in);
+}
+
+
+/*
+ * sessions
+ */
+static const char *session_state_name(int s)
+{
+	switch (s) {
+	case CEPH_MDS_SESSION_NEW: return "new";
+	case CEPH_MDS_SESSION_OPENING: return "opening";
+	case CEPH_MDS_SESSION_OPEN: return "open";
+	case CEPH_MDS_SESSION_FLUSHING: return "flushing";
+	case CEPH_MDS_SESSION_CLOSING: return "closing";
+	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
+	default: return "???";
+	}
+}
+
+static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+{
+	dout(30, "get_session %p %d -> %d\n", s,
+	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)+1);
+	atomic_inc(&s->s_ref);
+	return s;
+}
+
+void ceph_put_mds_session(struct ceph_mds_session *s)
+{
+	dout(30, "put_session %p %d -> %d\n", s,
+	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
+	if (atomic_dec_and_test(&s->s_ref))
+		kfree(s);
+}
+
+/*
+ * called under mdsc->mutex
+ */
+struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
+						   int mds)
+{
+	struct ceph_mds_session *session;
+
+	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
+		return NULL;
+	session = mdsc->sessions[mds];
+	dout(30, "lookup_mds_session %p %d -> %d\n", session,
+	     atomic_read(&session->s_ref), atomic_read(&session->s_ref)+1);
+	get_session(session);
+	return session;
+}
+
+
+/*
+ * create+register a new session for given mds.
+ * called under mdsc->mutex.
+ */
+static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
+						 int mds)
+{
+	struct ceph_mds_session *s;
+
+	s = kmalloc(sizeof(*s), GFP_NOFS);
+	s->s_mds = mds;
+	s->s_state = CEPH_MDS_SESSION_NEW;
+	s->s_ttl = 0;
+	s->s_seq = 0;
+	mutex_init(&s->s_mutex);
+	spin_lock_init(&s->s_cap_lock);
+	s->s_cap_gen = 0;
+	s->s_cap_ttl = 0;
+	s->s_renew_requested = 0;
+	INIT_LIST_HEAD(&s->s_caps);
+	INIT_LIST_HEAD(&s->s_rdcaps);
+	spin_lock_init(&s->s_rdcaps_lock);
+	s->s_nr_caps = 0;
+	atomic_set(&s->s_ref, 1);
+	INIT_LIST_HEAD(&s->s_waiting);
+	INIT_LIST_HEAD(&s->s_unsafe);
+
+	dout(10, "register_session mds%d\n", mds);
+	if (mds >= mdsc->max_sessions) {
+		int newmax = 1 << get_count_order(mds+1);
+		struct ceph_mds_session **sa;
+
+		dout(50, "register_session realloc to %d\n", newmax);
+		sa = kzalloc(newmax * sizeof(void *), GFP_NOFS);
+		if (sa == NULL)
+			return ERR_PTR(-ENOMEM);
+		if (mdsc->sessions) {
+			memcpy(sa, mdsc->sessions,
+			       mdsc->max_sessions * sizeof(void *));
+			kfree(mdsc->sessions);
+		}
+		mdsc->sessions = sa;
+		mdsc->max_sessions = newmax;
+	}
+	mdsc->sessions[mds] = s;
+	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
+	return s;
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static void unregister_session(struct ceph_mds_client *mdsc, int mds)
+{
+	dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
+	ceph_put_mds_session(mdsc->sessions[mds]);
+	mdsc->sessions[mds] = NULL;
+}
+
+/* drop session refs in request */
+static void put_request_sessions(struct ceph_mds_request *req)
+{
+	if (req->r_session) {
+		ceph_put_mds_session(req->r_session);
+		req->r_session = NULL;
+	}
+	if (req->r_fwd_session) {
+		ceph_put_mds_session(req->r_fwd_session);
+		req->r_fwd_session = NULL;
+	}
+}
+
+void ceph_mdsc_put_request(struct ceph_mds_request *req)
+{
+	dout(10, "put_request %p %d -> %d\n", req,
+	     atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
+	if (atomic_dec_and_test(&req->r_ref)) {
+		if (req->r_request)
+			ceph_msg_put(req->r_request);
+		if (req->r_reply) {
+			ceph_msg_put(req->r_reply);
+			destroy_reply_info(&req->r_reply_info);
+		}
+		if (req->r_last_inode)
+			iput(req->r_last_inode);
+		if (req->r_dentry)
+			dput(req->r_dentry);
+		if (req->r_old_dentry)
+			dput(req->r_old_dentry);
+		kfree(req->r_expected_cap);
+		put_request_sessions(req);
+		kfree(req);
+	}
+}
+
+/*
+ * lookup session, bump ref if found.
+ *
+ * called under mdsc->mutex.
+ */
+static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
+					     u64 tid)
+{
+	struct ceph_mds_request *req;
+	req = radix_tree_lookup(&mdsc->request_tree, tid);
+	if (req)
+		ceph_mdsc_get_request(req);
+	return req;
+}
+
+/*
+ * Register an in-flight request, and assign a tid in msg request header.
+ *
+ * Called under mdsc->mutex.
+ */
+static void __register_request(struct ceph_mds_client *mdsc,
+			       struct ceph_mds_request *req,
+			       struct inode *listener)
+{
+	req->r_tid = ++mdsc->last_tid;
+	dout(30, "__register_request %p tid %lld\n", req, req->r_tid);
+	ceph_mdsc_get_request(req);
+	radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
+
+	if (listener) {
+		struct ceph_inode_info *ci = ceph_inode(listener);
+		
+		spin_lock(&ci->i_listener_lock);
+		req->r_listener = listener;
+		list_add_tail(&req->r_listener_item, &ci->i_listener_list);
+		spin_unlock(&ci->i_listener_lock);
+	}
+
+	ceph_sysfs_mds_req_init(mdsc, req);
+}
+
+static void __unregister_request(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_request *req)
+{
+	dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
+	radix_tree_delete(&mdsc->request_tree, req->r_tid);
+	ceph_mdsc_put_request(req);
+
+	if (req->r_listener) {
+		struct ceph_inode_info *ci = ceph_inode(req->r_listener);
+
+		spin_lock(&ci->i_listener_lock);
+		list_del_init(&req->r_listener_item);
+		spin_unlock(&ci->i_listener_lock);
+	}
+
+	ceph_sysfs_mds_req_cleanup(req);
+}
+
+static bool __have_session(struct ceph_mds_client *mdsc, int mds)
+{
+	if (mds >= mdsc->max_sessions)
+		return false;
+	return mdsc->sessions[mds];
+}
+
+/*
+ * Choose mds to send request to next.  If there is a hint set in
+ * the request (e.g., due to a prior forward hint from the mds), use
+ * that.
+ *
+ * Called under mdsc->mutex.
+ */
+static int __choose_mds(struct ceph_mds_client *mdsc,
+			struct ceph_mds_request *req)
+{
+	int mds = -1;
+	u32 hash = req->r_direct_hash;
+	bool is_hash = req->r_direct_is_hash;
+	struct dentry *dentry = req->r_dentry;
+	struct ceph_inode_info *ci;
+	int mode = req->r_direct_mode;
+
+	/*
+	 * is there a specific mds we should try?  ignore hint if we have
+	 * no session and the mds is not up (active or recovering).
+	 */
+	if (req->r_resend_mds >= 0 &&
+	    (__have_session(mdsc, req->r_resend_mds) ||
+	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
+		dout(20, "choose_mds using resend_mds mds%d\n",
+		     req->r_resend_mds);
+		return req->r_resend_mds;
+	}
+
+	if (mode == USE_CAP_MDS) {
+		mds = ceph_get_cap_mds(dentry->d_inode);
+		if (mds >= 0) {
+			dout(20, "choose_mds %p %llx.%llx mds%d (cap)\n",
+			     dentry->d_inode, ceph_vinop(dentry->d_inode), mds);
+			return mds;
+		}
+		derr(0, "choose_mds %p %llx.%llx has NO CAPS, using auth\n",
+		     dentry->d_inode, ceph_vinop(dentry->d_inode));
+		WARN_ON(1);
+		mode = USE_AUTH_MDS;
+	}
+
+	if (mode == USE_RANDOM_MDS)
+		goto random;
+
+	/*
+	 * try to find an appropriate mds to contact based on the
+	 * given dentry.  walk up the tree until we find delegation info
+	 * in the i_fragtree.
+	 *
+	 * if is_hash is true, direct request at the appropriate directory
+	 * fragment (as with a readdir on a fragmented directory).
+	 */
+	while (dentry) {
+		if (is_hash && dentry->d_inode &&
+		    S_ISDIR(dentry->d_inode->i_mode)) {
+			struct ceph_inode_frag frag;
+			int found;
+
+			ci = ceph_inode(dentry->d_inode);
+			ceph_choose_frag(ci, hash, &frag, &found);
+			if (found) {
+				if (mode == USE_ANY_MDS && frag.ndist > 0) {
+					u8 r;
+
+					/* choose a random replica */
+					get_random_bytes(&r, 1);
+					r %= frag.ndist;
+					mds = frag.dist[r];
+					dout(20, "choose_mds %p %llx.%llx "
+					     "frag %u mds%d (%d/%d)\n",
+					     dentry->d_inode,
+					     ceph_vinop(&ci->vfs_inode),
+					     frag.frag, frag.mds,
+					     (int)r, frag.ndist);
+					return mds;
+				}
+				/* since the more deeply nested item wasn't
+				 * known to be replicated, then we want to
+				 * look for the authoritative mds. */
+				mode = USE_AUTH_MDS;
+				if (frag.mds >= 0) {
+					/* choose auth mds */
+					mds = frag.mds;
+					dout(20, "choose_mds %p %llx.%llx "
+					     "frag %u mds%d (auth)\n",
+					     dentry->d_inode,
+					     ceph_vinop(&ci->vfs_inode),
+					     frag.frag, mds);
+					return mds;
+				}
+			}
+		}
+		if (IS_ROOT(dentry))
+			break;
+
+		/* move up the hierarchy, but direct request based on the hash
+		 * for the child's dentry name */
+		hash = dentry->d_name.hash;
+		is_hash = true;
+		dentry = dentry->d_parent;
+	}
+
+	/* ok, just pick one at random */
+random:
+	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
+	dout(20, "choose_mds chose random mds%d\n", mds);
+	return mds;
+}
+
+
+/*
+ * session messages
+ */
+static struct ceph_msg *create_session_msg(u32 op, u64 seq)
+{
+	struct ceph_msg *msg;
+	struct ceph_mds_session_head *h;
+
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
+	if (IS_ERR(msg)) {
+		derr("ENOMEM creating session msg\n");
+		return ERR_PTR(PTR_ERR(msg));
+	}
+	h = msg->front.iov_base;
+	h->op = cpu_to_le32(op);
+	h->seq = cpu_to_le64(seq);
+	return msg;
+}
+
+/*
+ * send session open request.
+ *
+ * called under mdsc->mutex
+ */
+static int __open_session(struct ceph_mds_client *mdsc,
+			  struct ceph_mds_session *session)
+{
+	struct ceph_msg *msg;
+	int mstate;
+	int mds = session->s_mds;
+	int err = 0;
+
+	/* wait for mds to go active? */
+	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
+	dout(10, "open_session to mds%d, state %d\n", mds, mstate);
+	session->s_state = CEPH_MDS_SESSION_OPENING;
+	session->s_renew_requested = jiffies;
+
+	/* send connect message */
+	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
+	if (IS_ERR(msg)) {
+		err = PTR_ERR(msg);
+		goto out;
+	}
+	ceph_send_msg_mds(mdsc, msg, mds);
+
+out:
+	return 0;
+}
+
+/*
+ * caller must hold session s_mutex
+ */
+static void remove_session_caps(struct ceph_mds_session *session)
+{
+	struct ceph_cap *cap;
+	struct ceph_inode_info *ci;
+
+	dout(10, "remove_session_caps on %p\n", session);
+	while (session->s_nr_caps > 0) {
+		cap = list_entry(session->s_caps.next, struct ceph_cap,
+				 session_caps);
+		ci = cap->ci;
+		dout(10, "removing cap %p, ci is %p, inode is %p\n",
+		     cap, ci, &ci->vfs_inode);
+		ceph_remove_cap(cap);
+	}
+	BUG_ON(session->s_nr_caps > 0);
+}
+
+/*
+ * caller must hold session s_mutex
+ */
+static void revoke_dentry_lease(struct dentry *dentry)
+{
+	struct ceph_dentry_info *di;
+
+	spin_lock(&dentry->d_lock);
+	di = ceph_dentry(dentry);
+	if (di) {
+		ceph_put_mds_session(di->lease_session);
+		kfree(di);
+		dentry->d_fsdata = NULL;
+	}
+	spin_unlock(&dentry->d_lock);
+}
+
+/*
+ * wake up any threads waiting on this session's caps
+ *
+ * caller must hold s_mutex.
+ */
+static void wake_up_session_caps(struct ceph_mds_session *session)
+{
+	struct list_head *p;
+	struct ceph_cap *cap;
+
+	dout(10, "wake_up_session_caps %p mds%d\n", session, session->s_mds);
+	list_for_each(p, &session->s_caps) {
+		cap = list_entry(p, struct ceph_cap, session_caps);
+		wake_up(&cap->ci->i_cap_wq);
+	}
+}
+
+/*
+ * Send periodic message to MDS renewing all currently held caps.  The
+ * ack will reset the expiration for all caps from this session.
+ *
+ * caller holds s_mutex
+ */
+static int send_renew_caps(struct ceph_mds_client *mdsc,
+			   struct ceph_mds_session *session)
+{
+	struct ceph_msg *msg;
+
+	if (time_after_eq(jiffies, session->s_cap_ttl) &&
+	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
+		dout(1, "mds%d session caps stale\n", session->s_mds);
+
+	/* do not try to renew caps until a recovering mds has reconnected
+	 * with its clients. */
+	if (ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds) <
+	    CEPH_MDS_STATE_RECONNECT) {
+		dout(10, "send_renew_caps ignoring mds%d\n", session->s_mds);
+		return 0;
+	}
+
+	dout(10, "send_renew_caps to mds%d\n", session->s_mds);
+	session->s_renew_requested = jiffies;
+	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 0);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+	ceph_send_msg_mds(mdsc, msg, session->s_mds);
+	return 0;
+}
+
+/*
+ * Note new cap ttl, and any transition from stale -> not stale (fresh?).
+ */
+static void renewed_caps(struct ceph_mds_client *mdsc,
+		  struct ceph_mds_session *session, int is_renew)
+{
+	int was_stale;
+	int wake = 0;
+
+	spin_lock(&session->s_cap_lock);
+	was_stale = is_renew && (session->s_cap_ttl == 0 ||
+				 time_after_eq(jiffies, session->s_cap_ttl));
+
+	session->s_cap_ttl = session->s_renew_requested +
+		mdsc->mdsmap->m_session_timeout*HZ;
+
+	if (was_stale) {
+		if (time_before(jiffies, session->s_cap_ttl)) {
+			dout(1, "mds%d caps renewed\n", session->s_mds);
+			wake = 1;
+		} else {
+			dout(1, "mds%d caps still stale\n", session->s_mds);
+		}
+	}
+	dout(10, "renewed_caps mds%d ttl now %lu, was %s, now %s\n",
+	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
+	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
+	spin_unlock(&session->s_cap_lock);
+
+	if (wake)
+		wake_up_session_caps(session);
+}
+
+
+
+static int request_close_session(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session)
+{
+	struct ceph_msg *msg;
+	int err = 0;
+
+	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+				 session->s_seq);
+	if (IS_ERR(msg))
+		err = PTR_ERR(msg);
+	else
+		ceph_send_msg_mds(mdsc, msg, session->s_mds);
+	return err;
+}
+
+/*
+ * check all caps on a session, without allowing release to
+ * be delayed.
+ */
+static void check_all_caps(struct ceph_mds_client *mdsc,
+			 struct ceph_mds_session *session)
+{
+	struct list_head *p, *n;
+
+	list_for_each_safe(p, n, &session->s_caps) {
+		struct ceph_cap *cap =
+			list_entry(p, struct ceph_cap, session_caps);
+		struct inode *inode = &cap->ci->vfs_inode;
+
+		igrab(inode);
+		mutex_unlock(&session->s_mutex);
+		ceph_check_caps(ceph_inode(inode), 1, 0, NULL);
+		mutex_lock(&session->s_mutex);
+		iput(inode);
+	}
+}
+
+/*
+ * Called with s_mutex held.
+ */
+static int __close_session(struct ceph_mds_client *mdsc,
+			 struct ceph_mds_session *session)
+{
+	int mds = session->s_mds;
+	int err = 0;
+
+	dout(10, "close_session mds%d state=%s\n", mds,
+	     session_state_name(session->s_state));
+	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
+		return 0;
+
+	check_all_caps(mdsc, session);
+
+	if (list_empty(&session->s_caps)) {
+		session->s_state = CEPH_MDS_SESSION_CLOSING;
+		err = request_close_session(mdsc, session);
+	} else {
+		session->s_state = CEPH_MDS_SESSION_FLUSHING;
+	}
+	return err;
+}
+
+/*
+ * Called when the last cap for a session has been flushed or
+ * exported.
+ */
+void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
+				struct ceph_mds_session *session)
+{
+	dout(10, "flushed_all_caps for mds%d state %s\n", session->s_mds,
+	     session_state_name(session->s_state));
+	if (session->s_state == CEPH_MDS_SESSION_FLUSHING) {
+		session->s_state = CEPH_MDS_SESSION_CLOSING;
+		request_close_session(mdsc, session);
+	}
+}
+
+
+/*
+ * Create an mds request.
+ */
+struct ceph_mds_request *
+ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
+			 struct dentry *dentry, struct dentry *old_dentry,
+			 const char *path1, const char *path2, int mode)
+{
+	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
+
+	if (!req)
+		return ERR_PTR(-ENOMEM);
+	req->r_started = jiffies;
+	req->r_resend_mds = -1;
+	INIT_LIST_HEAD(&req->r_listener_item);
+	req->r_fmode = -1;
+	atomic_set(&req->r_ref, 1);  /* one for request_tree, one for caller */
+	INIT_LIST_HEAD(&req->r_wait);
+	init_completion(&req->r_completion);
+	init_completion(&req->r_safe_completion);
+	INIT_LIST_HEAD(&req->r_unsafe_item);
+
+	req->r_op = op;
+	if (dentry)
+		req->r_dentry = dget(dentry);
+	if (old_dentry)
+		req->r_old_dentry = dget(old_dentry);
+	req->r_path1 = path1;
+	req->r_path2 = path2;
+	req->r_direct_mode = mode;
+	return req;
+}
+
+/*
+ * return oldest (lowest) tid in request tree, 0 if none.
+ *
+ * called under mdsc->mutex.
+ */
+static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
+{
+	struct ceph_mds_request *first;
+	if (radix_tree_gang_lookup(&mdsc->request_tree,
+				   (void **)&first, 0, 1) <= 0)
+		return 0;
+	return first->r_tid;
+}
+
+/*
+ * build a dentry's path.  allocate on heap; caller must kfree.  based
+ * on build_path_from_dentry in fs/cifs/dir.c.
+ *
+ * encode hidden .snap dirs as a double /, i.e.
+ *   foo/.snap/bar -> foo//bar
+ */
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, int mds)
+{
+	struct dentry *temp;
+	char *path;
+	int len, pos;
+
+	if (dentry == NULL)
+		return ERR_PTR(-EINVAL);
+
+retry:
+	len = 0;
+	for (temp = dentry; !IS_ROOT(temp);) {
+		struct inode *inode = temp->d_inode;
+		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
+			len++;  /* slash only */
+		else
+			len += 1 + temp->d_name.len;
+		temp = temp->d_parent;
+		if (temp == NULL) {
+			derr(1, "corrupt dentry %p\n", dentry);
+			return ERR_PTR(-EINVAL);
+		}
+	}
+	if (len)
+		len--;  /* no leading '/' */
+
+	path = kmalloc(len+1, GFP_NOFS);
+	if (path == NULL)
+		return ERR_PTR(-ENOMEM);
+	pos = len;
+	path[pos] = 0;	/* trailing null */
+	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+		if (temp->d_inode &&
+		    ceph_snap(temp->d_inode) == CEPH_SNAPDIR) {
+			dout(50, "build_path_dentry path+%d: %p SNAPDIR\n",
+			     pos, temp);
+		} else {
+			pos -= temp->d_name.len;
+			if (pos < 0)
+				break;
+			strncpy(path + pos, temp->d_name.name,
+				temp->d_name.len);
+			dout(50, "build_path_dentry path+%d: %p '%.*s'\n",
+			     pos, temp, temp->d_name.len, path + pos);
+		}
+		if (pos)
+			path[--pos] = '/';
+		temp = temp->d_parent;
+		if (temp == NULL) {
+			derr(1, "corrupt dentry\n");
+			kfree(path);
+			return ERR_PTR(-EINVAL);
+		}
+	}
+	if (pos != 0) {
+		derr(1, "did not end path lookup where expected, "
+		     "namelen is %d, pos is %d\n", len, pos);
+		/* presumably this is only possible if racing with a
+		   rename of one of the parent directories (we can not
+		   lock the dentries above us to prevent this, but
+		   retrying should be harmless) */
+		kfree(path);
+		goto retry;
+	}
+
+	*base = ceph_ino(temp->d_inode);
+	*plen = len;
+	dout(10, "build_path_dentry on %p %d built %llx '%.*s'\n",
+	     dentry, atomic_read(&dentry->d_count), *base, len, path);
+	return path;
+}
+
+
+/*
+ * called under mdsc->mutex
+ */
+static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
+					       struct ceph_mds_request *req,
+					       int mds)
+{
+	struct ceph_msg *msg;
+	struct ceph_mds_request_head *head;
+	char *path1 = (char *)req->r_path1;
+	char *path2 = (char *)req->r_path2;
+	u64 ino1 = 1, ino2 = 0;
+	int pathlen1 = 0, pathlen2 = 0;
+	int pathlen;
+	void *p, *end;
+	u32 fhlen = 0;
+
+	if (req->r_op == CEPH_MDS_OP_FINDINODE) {
+		fhlen = *(int *)req->r_path2;
+		path2 = NULL;
+		pathlen = sizeof(u32) + fhlen*sizeof(struct ceph_inopath_item);
+	} else {
+		if (path1)
+			pathlen1 = strlen(path1);
+		else if (req->r_dentry)
+			path1 = ceph_mdsc_build_path(req->r_dentry, &pathlen1, &ino1,
+					   mds);
+		if (path2)
+			pathlen2 = strlen(path2);
+		else if (req->r_old_dentry)
+			path2 = ceph_mdsc_build_path(req->r_old_dentry, &pathlen2, &ino2,
+					   mds);
+		pathlen = pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
+	}
+
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(*head) + pathlen,
+			   0, 0, NULL);
+	if (IS_ERR(msg))
+		goto out;
+
+	head = msg->front.iov_base;
+	p = msg->front.iov_base + sizeof(*head);
+	end = msg->front.iov_base + msg->front.iov_len;
+
+	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
+	head->num_fwd = 0;
+	head->mds_wants_replica_in_dirino = 0;
+	head->op = cpu_to_le32(req->r_op);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 29)
+	head->caller_uid = cpu_to_le32(current_fsuid());
+	head->caller_gid = cpu_to_le32(current_fsgid());
+#else
+	head->caller_uid = cpu_to_le32(current->fsuid);
+	head->caller_gid = cpu_to_le32(current->fsgid);
+#endif
+	head->args = req->r_args;
+
+	if (req->r_op == CEPH_MDS_OP_FINDINODE) {
+		ceph_encode_32(&p, fhlen);
+		memcpy(p, path1, fhlen * sizeof(struct ceph_inopath_item));
+		p += fhlen * sizeof(struct ceph_inopath_item);
+	} else {
+		ceph_encode_filepath(&p, end, ino1, path1);
+		ceph_encode_filepath(&p, end, ino2, path2);
+		if (path1)
+			dout(10, "create_request_message path1 %llx/%s\n",
+			     ino1, path1);
+		if (path2)
+			dout(10, "create_request_message path2 %llx/%s\n",
+			     ino2, path2);
+		if (req->r_dentry)
+			kfree(path1);
+		if (req->r_old_dentry)
+			kfree(path2);
+	}
+
+ 	BUG_ON(p != end);
+
+out:
+	return msg;
+}
+
+/*
+ * called under mdsc->mutex if error, under no mutex if
+ * success.
+ */
+static void complete_request(struct ceph_mds_client *mdsc,
+			     struct ceph_mds_request *req)
+{
+	if (req->r_callback)
+		req->r_callback(mdsc, req);
+	else
+		complete(&req->r_completion);
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static int __prepare_send_request(struct ceph_mds_client *mdsc,
+				  struct ceph_mds_request *req,
+				  int mds)
+{
+	struct ceph_mds_request_head *rhead;
+	struct ceph_msg *msg;
+
+	req->r_attempts++;
+	dout(10, "prepare_send_request %p tid %lld %s (attempt %d)\n", req,
+	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
+
+	if (req->r_request) {
+		ceph_msg_put(req->r_request);
+		req->r_request = NULL;
+	}
+	msg = create_request_message(mdsc, req, mds);
+	if (IS_ERR(msg)) {
+		req->r_reply = ERR_PTR(PTR_ERR(msg));
+		complete_request(mdsc, req);
+		return -PTR_ERR(msg);
+	}
+	req->r_request = msg;
+
+	rhead = msg->front.iov_base;
+	rhead->tid = cpu_to_le64(req->r_tid);
+	if (req->r_got_safe)
+		rhead->retry_attempt = cpu_to_le32(CEPH_MDS_REQUEST_REPLAY);
+	else
+		rhead->retry_attempt = cpu_to_le32(req->r_attempts - 1);
+	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
+	rhead->num_fwd = cpu_to_le32(req->r_num_fwd);
+
+	if (req->r_last_inode)
+		rhead->ino = cpu_to_le64(ceph_ino(req->r_last_inode));
+	else
+		rhead->ino = 0;
+	return 0;
+}
+
+/*
+ * send request, or put it on the appropriate wait list.
+ */
+static int __do_request(struct ceph_mds_client *mdsc,
+			struct ceph_mds_request *req)
+{
+	struct ceph_mds_session *session = NULL;
+	int mds = -1;
+	int err = -EAGAIN;
+
+	if (req->r_reply)
+		goto out;
+
+	if (req->r_timeout &&
+	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
+		dout(10, "do_request timed out\n");
+		err = -EIO;
+		goto finish;
+	}
+
+	mds = __choose_mds(mdsc, req);
+	if (mds < 0 ||
+	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+		dout(30, "do_request no mds or not active, waiting for map\n");
+		list_add(&req->r_wait, &mdsc->waiting_for_map);
+		ceph_monc_request_mdsmap(&mdsc->client->monc,
+					 mdsc->mdsmap->m_epoch+1);
+		goto out;
+	}
+
+	/* get, open session */
+	session = __ceph_lookup_mds_session(mdsc, mds);
+	if (!session)
+		session = register_session(mdsc, mds);
+	dout(30, "do_request mds%d session %p state %s\n", mds, session,
+	     session_state_name(session->s_state));
+	if (session->s_state != CEPH_MDS_SESSION_OPEN) {
+		if (session->s_state == CEPH_MDS_SESSION_NEW ||
+		    session->s_state == CEPH_MDS_SESSION_CLOSING)
+			__open_session(mdsc, session);
+		list_add(&req->r_wait, &session->s_waiting);
+		ceph_monc_request_mdsmap(&mdsc->client->monc,
+					 mdsc->mdsmap->m_epoch+1);
+		goto out_session;
+	}
+
+	/* send request */
+	req->r_session = get_session(session);
+	req->r_resend_mds = -1;   /* forget any previous mds hint */
+
+	if (req->r_request_started == 0)   /* note request start time */
+		req->r_request_started = jiffies;
+
+	err = __prepare_send_request(mdsc, req, mds);
+	if (!err) {
+		ceph_msg_get(req->r_request);
+		ceph_send_msg_mds(mdsc, req->r_request, mds);
+	}
+
+out_session:
+	ceph_put_mds_session(session);
+out:
+	return err;
+
+finish:
+	req->r_reply = ERR_PTR(err);
+	complete_request(mdsc, req);
+	goto out;
+}
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+			    struct list_head *head)
+{
+	struct list_head *p, *n;
+
+	list_for_each_safe(p, n, head) {
+		struct ceph_mds_request *req =
+			list_entry(p, struct ceph_mds_request, r_wait);
+		list_del_init(&req->r_wait);
+		__do_request(mdsc, req);
+	}
+}
+
+/*
+ * Wake up threads with requests pending for @mds, so that they can
+ * resubmit their requests to a possibly different mds.  If @all is set,
+ * wake up if their requests has been forwarded to @mds, too.
+ */
+static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+	struct ceph_mds_request *reqs[10];
+	u64 nexttid = 0;
+	int i, got;
+
+	dout(20, "kick_requests mds%d\n", mds);
+	while (nexttid < mdsc->last_tid) {
+		got = radix_tree_gang_lookup(&mdsc->request_tree,
+					     (void **)&reqs, nexttid, 10);
+		if (got == 0)
+			break;
+		nexttid = reqs[got-1]->r_tid + 1;
+		for (i = 0; i < got; i++) {
+			if (reqs[i]->r_got_unsafe)
+				continue;
+			if (((reqs[i]->r_session &&
+			      reqs[i]->r_session->s_mds == mds) ||
+			     (all && reqs[i]->r_fwd_session &&
+			      reqs[i]->r_fwd_session->s_mds == mds))) {
+				dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
+				put_request_sessions(reqs[i]);
+				__do_request(mdsc, reqs[i]);
+			}
+		}
+	}
+}
+
+void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+			      struct ceph_mds_request *req)
+{
+	dout(30, "submit_request on %p\n", req);
+	mutex_lock(&mdsc->mutex);
+	__register_request(mdsc, req, NULL);
+	__do_request(mdsc, req);
+	mutex_unlock(&mdsc->mutex);
+}
+
+/*
+ * Synchrously perform an mds request.  Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+			 struct inode *listener,
+			 struct ceph_mds_request *req)
+{
+	int err;
+
+	dout(30, "do_request on %p\n", req);
+
+	mutex_lock(&mdsc->mutex);
+	__register_request(mdsc, req, listener);
+	__do_request(mdsc, req);
+
+	if (!req->r_reply) {
+		mutex_unlock(&mdsc->mutex);
+		if (req->r_timeout) {
+			err = wait_for_completion_timeout(&req->r_completion,
+							  req->r_timeout);
+			if (err > 0)
+				err = 0;
+			else if (err == 0)
+				req->r_reply = ERR_PTR(-EIO);
+		} else {
+			wait_for_completion(&req->r_completion);
+		}
+		mutex_lock(&mdsc->mutex);
+	}
+
+	if (IS_ERR(req->r_reply)) {
+		err = PTR_ERR(req->r_reply);
+		req->r_reply = NULL;
+
+		/* clean up */
+		__unregister_request(mdsc, req);
+		if (!list_empty(&req->r_unsafe_item))
+		    list_del_init(&req->r_unsafe_item);
+		complete(&req->r_safe_completion);
+	} else {
+		err = le32_to_cpu(req->r_reply_info.head->result);
+	}
+	mutex_unlock(&mdsc->mutex);
+
+	dout(30, "do_request %p done, result %d\n", req, err);
+	return err;
+}
+
+/*
+ * Handle mds reply.
+ *
+ * We take the session mutex and parse and process the reply immediately.
+ * This preserves the logical ordering of replies, capabilities, etc., sent
+ * by the MDS as they are applied to our local cache.
+ */
+void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	struct ceph_mds_request *req;
+	struct ceph_mds_reply_head *head = msg->front.iov_base;
+	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
+	u64 tid;
+	int err, result;
+	int mds;
+
+	if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+		return;
+	if (msg->front.iov_len < sizeof(*head)) {
+		derr(1, "handle_reply got corrupt (short) reply\n");
+		return;
+	}
+
+	/* get request, session */
+	tid = le64_to_cpu(head->tid);
+	mutex_lock(&mdsc->mutex);
+	req = __lookup_request(mdsc, tid);
+	if (!req) {
+		dout(1, "handle_reply on unknown tid %llu\n", tid);
+		mutex_unlock(&mdsc->mutex);
+		return;
+	}
+	dout(10, "handle_reply %p expected_cap=%p\n", req, req->r_expected_cap);
+	mds = le32_to_cpu(msg->hdr.src.name.num);
+
+	/* dup? */
+	if ((req->r_got_unsafe && !head->safe) ||
+	    (req->r_got_safe && head->safe)) {
+		dout(0, "got a dup %s reply on %llu from mds%d\n",
+		     head->safe ? "safe":"unsafe", tid, mds);
+		mutex_unlock(&mdsc->mutex);
+		ceph_mdsc_put_request(req);
+		return;
+	}
+
+	if (head->safe) {
+		req->r_got_safe = true;
+		__unregister_request(mdsc, req);
+		complete(&req->r_safe_completion);
+
+		if (req->r_got_unsafe) {
+			/*
+			 * We already handled the unsafe response, now do the
+			 * cleanup.  No need to examine the response; the MDS
+			 * doesn't include any result info in the safe
+			 * response.  And even if it did, there is nothing
+			 * useful we could do with a revised return value.
+			 */
+			dout(10, "got safe reply %llu, mds%d\n", tid, mds);
+			BUG_ON(req->r_session == NULL);
+			list_del_init(&req->r_unsafe_item);
+			ceph_mdsc_put_request(req);
+
+			/* last unsafe request during umount? */
+			if (mdsc->stopping && !__get_oldest_tid(mdsc))
+				complete(&mdsc->safe_umount_waiters);
+			mutex_unlock(&mdsc->mutex);
+			return;
+		}
+	}
+
+	if (req->r_session && req->r_session->s_mds != mds) {
+		ceph_put_mds_session(req->r_session);
+		req->r_session = __ceph_lookup_mds_session(mdsc, mds);
+	}
+	if (req->r_session == NULL) {
+		derr(1, "got reply on %llu, but no session for mds%d\n",
+		     tid, mds);
+		mutex_unlock(&mdsc->mutex);
+		ceph_mdsc_put_request(req);
+		return;
+	}
+	BUG_ON(req->r_reply);
+
+	if (!head->safe) {
+		req->r_got_unsafe = true;
+		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
+	}
+
+	mutex_unlock(&mdsc->mutex);
+
+	mutex_lock(&req->r_session->s_mutex);
+
+	/* parse */
+	rinfo = &req->r_reply_info;
+	err = parse_reply_info(msg, rinfo);
+	if (err < 0) {
+		derr(0, "handle_reply got corrupt reply\n");
+		goto done;
+	}
+	result = le32_to_cpu(rinfo->head->result);
+	dout(10, "handle_reply tid %lld result %d\n", tid, result);
+
+	/* snap trace */
+	if (rinfo->snapblob_len) {
+		down_write(&mdsc->snap_rwsem);
+		ceph_update_snap_trace(mdsc, rinfo->snapblob,
+			       rinfo->snapblob + rinfo->snapblob_len,
+			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+		downgrade_write(&mdsc->snap_rwsem);
+	} else {
+		down_read(&mdsc->snap_rwsem);
+	}
+
+	/* insert trace into our cache */
+	err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
+	if (err)
+		goto done;
+	if (result == 0) {
+		/* readdir result? */
+		if (rinfo->dir_nr)
+			ceph_readdir_prepopulate(req, req->r_session);
+	}
+
+done:
+	up_read(&mdsc->snap_rwsem);
+
+	if (err) {
+		req->r_err = err;
+	} else {
+		req->r_reply = msg;
+		ceph_msg_get(msg);
+	}
+
+	mutex_unlock(&req->r_session->s_mutex);
+
+	/* kick calling process */
+	complete_request(mdsc, req);
+	ceph_mdsc_put_request(req);
+
+	return;
+}
+
+
+
+/*
+ * handle mds notification that our request has been forwarded.
+ */
+void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
+			      struct ceph_msg *msg)
+{
+	struct ceph_mds_request *req;
+	u64 tid;
+	u32 next_mds;
+	u32 fwd_seq;
+	u8 must_resend;
+	int err = -EINVAL;
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	int from_mds;
+
+	if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+		goto bad;
+	from_mds = le32_to_cpu(msg->hdr.src.name.num);
+
+	ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
+	ceph_decode_64(&p, tid);
+	ceph_decode_32(&p, next_mds);
+	ceph_decode_32(&p, fwd_seq);
+	ceph_decode_8(&p, must_resend);
+
+	mutex_lock(&mdsc->mutex);
+	req = __lookup_request(mdsc, tid);
+	if (!req) {
+		dout(10, "forward %llu dne\n", tid);
+		goto out;  /* dup reply? */
+	}
+
+	if (fwd_seq <= req->r_num_fwd) {
+		dout(10, "forward %llu to mds%d - old seq %d <= %d\n",
+		     tid, next_mds, req->r_num_fwd, fwd_seq);
+	} else if (!must_resend &&
+		   __have_session(mdsc, next_mds) &&
+		   mdsc->sessions[next_mds]->s_state == CEPH_MDS_SESSION_OPEN) {
+		/* yes.  adjust our sessions, but that's all; the old mds
+		 * forwarded our message for us. */
+		dout(10, "forward %llu to mds%d (mds%d fwded)\n", tid, next_mds,
+		     from_mds);
+		req->r_num_fwd = fwd_seq;
+		put_request_sessions(req);
+		req->r_session = __ceph_lookup_mds_session(mdsc, next_mds);
+		req->r_fwd_session = __ceph_lookup_mds_session(mdsc, from_mds);
+	} else {
+		/* no, resend. */
+		/* forward race not possible; mds would drop */
+		dout(10, "forward %llu to mds%d (we resend)\n", tid, next_mds);
+		req->r_num_fwd = fwd_seq;
+		req->r_resend_mds = next_mds;
+		put_request_sessions(req);
+		__do_request(mdsc, req);
+	}
+	ceph_mdsc_put_request(req);
+out:
+	mutex_unlock(&mdsc->mutex);
+	return;
+
+bad:
+	derr(0, "problem decoding message, err=%d\n", err);
+}
+
+/*
+ * handle a mds session control message
+ */
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+			      struct ceph_msg *msg)
+{
+	u32 op;
+	u64 seq;
+	struct ceph_mds_session *session = NULL;
+	int mds;
+	struct ceph_mds_session_head *h = msg->front.iov_base;
+	int wake = 0;
+
+	if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+		return;
+	mds = le32_to_cpu(msg->hdr.src.name.num);
+
+	/* decode */
+	if (msg->front.iov_len != sizeof(*h))
+		goto bad;
+	op = le32_to_cpu(h->op);
+	seq = le64_to_cpu(h->seq);
+
+	mutex_lock(&mdsc->mutex);
+	session = __ceph_lookup_mds_session(mdsc, mds);
+	if (session && mdsc->mdsmap)
+		/* FIXME: this ttl calculation is generous */
+		session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
+	mutex_unlock(&mdsc->mutex);
+
+	if (!session) {
+		if (op != CEPH_SESSION_OPEN) {
+			dout(10, "handle_session no session for mds%d\n", mds);
+			return;
+		}
+		dout(10, "handle_session creating session for mds%d\n", mds);
+		session = register_session(mdsc, mds);
+	}
+
+	mutex_lock(&session->s_mutex);
+
+	dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+	     mds, ceph_session_op_name(op), session,
+	     session_state_name(session->s_state), seq);
+	switch (op) {
+	case CEPH_SESSION_OPEN:
+		session->s_state = CEPH_MDS_SESSION_OPEN;
+		renewed_caps(mdsc, session, 0);
+		wake = 1;
+		if (mdsc->stopping)
+			__close_session(mdsc, session);
+		break;
+
+	case CEPH_SESSION_RENEWCAPS:
+		renewed_caps(mdsc, session, 1);
+		break;
+
+	case CEPH_SESSION_CLOSE:
+		unregister_session(mdsc, mds);
+		remove_session_caps(session);
+		wake = 1; /* for good measure */
+		complete(&mdsc->session_close_waiters);
+		kick_requests(mdsc, mds, 0);      /* cur only */
+		break;
+
+	case CEPH_SESSION_STALE:
+		dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
+		spin_lock(&session->s_cap_lock);
+		session->s_cap_gen++;
+		session->s_cap_ttl = 0;
+		spin_unlock(&session->s_cap_lock);
+		send_renew_caps(mdsc, session);
+		break;
+
+	default:
+		derr(0, "bad session op %d from mds%d\n", op, mds);
+		WARN_ON(1);
+	}
+
+	mutex_unlock(&session->s_mutex);
+	if (wake) {
+		mutex_lock(&mdsc->mutex);
+		__wake_requests(mdsc, &session->s_waiting);
+		mutex_unlock(&mdsc->mutex);
+	}
+	ceph_put_mds_session(session);
+	return;
+
+bad:
+	derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
+	     (int)msg->front.iov_len, (int)sizeof(*h));
+	return;
+}
+
+
+/*
+ * called under session->mutex.
+ */
+static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
+{
+	struct list_head *p, *n;
+	struct ceph_mds_request *req;
+	int err;
+
+	dout(10, "replay_unsafe_requests mds%d\n", session->s_mds);
+
+	mutex_lock(&mdsc->mutex);
+	list_for_each_safe(p, n, &session->s_unsafe) {
+		req = list_entry(p, struct ceph_mds_request, r_unsafe_item);
+		err = __prepare_send_request(mdsc, req, session->s_mds);
+		if (!err) {
+			ceph_msg_get(req->r_request);
+			ceph_send_msg_mds(mdsc, req->r_request, session->s_mds);
+		}
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
+/*
+ * If an MDS fails and recovers, it needs to reconnect with clients in order
+ * to reestablish shared state.  This includes all caps issued through this
+ * session _and_ the snap_realm hierarchy.  Because it's not clear which
+ * snap realms the mds cares about, we send everything we know about.. that
+ * ensures we'll then get any new info the recovering MDS might have.
+ *
+ * This is a relatively heavyweight operation, but it's rare.
+ *
+ * called with mdsc->mutex held.
+ */
+static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
+{
+	struct ceph_mds_session *session;
+	struct ceph_msg *reply;
+	int newlen, len = 4 + 1;
+	void *p, *end;
+	struct list_head *cp;
+	struct ceph_cap *cap;
+	char *path;
+	int pathlen, err;
+	u64 pathbase;
+	struct dentry *dentry;
+	struct ceph_inode_info *ci;
+	int num_caps, num_realms = 0;
+	int got;
+	u64 next_snap_ino = 0;
+	__le32 *pnum_caps, *pnum_realms;
+
+	dout(1, "reconnect to recovering mds%d\n", mds);
+
+	/* find session */
+	session = __ceph_lookup_mds_session(mdsc, mds);
+	mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
+
+	if (session) {
+		mutex_lock(&session->s_mutex);
+
+		session->s_state = CEPH_MDS_SESSION_RECONNECTING;
+		session->s_seq = 0;
+
+		/* replay unsafe requests */
+		replay_unsafe_requests(mdsc, session);
+
+		/* estimate needed space */
+		len += session->s_nr_caps *
+			sizeof(struct ceph_mds_cap_reconnect);
+		len += session->s_nr_caps * (100); /* guess! */
+		dout(40, "estimating i need %d bytes for %d caps\n",
+		     len, session->s_nr_caps);
+	} else {
+		dout(20, "no session for mds%d, will send short reconnect\n",
+		     mds);
+	}
+
+	down_read(&mdsc->snap_rwsem);
+
+retry:
+	/* build reply */
+	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
+	if (IS_ERR(reply)) {
+		err = PTR_ERR(reply);
+		derr(0, "ENOMEM trying to send mds reconnect to mds%d\n", mds);
+		goto out;
+	}
+	p = reply->front.iov_base;
+	end = p + len;
+
+	if (!session) {
+		ceph_encode_8(&p, 1); /* session was closed */
+		ceph_encode_32(&p, 0);
+		goto send;
+	}
+	dout(10, "session %p state %s\n", session,
+	     session_state_name(session->s_state));
+
+	/* traverse this session's caps */
+	ceph_encode_8(&p, 0);
+	pnum_caps = p;
+	ceph_encode_32(&p, session->s_nr_caps);
+	num_caps = 0;
+	list_for_each(cp, &session->s_caps) {
+		struct inode *inode;
+		struct ceph_mds_cap_reconnect *rec;
+
+		cap = list_entry(cp, struct ceph_cap, session_caps);
+		ci = cap->ci;
+		inode = &ci->vfs_inode;
+
+		/* skip+drop expireable caps.  this is racy, but harmless. */
+		if ((cap->issued & ~CEPH_CAP_EXPIREABLE) == 0) {
+			dout(10, " skipping %p ino %llx.%llx cap %p %s\n",
+			     inode, ceph_vinop(inode), cap,
+			     ceph_cap_string(cap->issued));
+			continue;
+		}
+
+		dout(10, " adding %p ino %llx.%llx cap %p %s\n",
+		     inode, ceph_vinop(inode), cap,
+		     ceph_cap_string(cap->issued));
+		ceph_decode_need(&p, end, sizeof(u64), needmore);
+		ceph_encode_64(&p, ceph_ino(inode));
+
+		dentry = d_find_alias(inode);
+		if (dentry) {
+			path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, -1);
+			if (IS_ERR(path)) {
+				err = PTR_ERR(path);
+				BUG_ON(err);
+			}
+		} else {
+			path = NULL;
+			pathlen = 0;
+		}
+		ceph_decode_need(&p, end, pathlen+4, needmore);
+		ceph_encode_string(&p, end, path, pathlen);
+
+		ceph_decode_need(&p, end, sizeof(*rec), needmore);
+		rec = p;
+		p += sizeof(*rec);
+		BUG_ON(p > end);
+		spin_lock(&inode->i_lock);
+		cap->seq = 0;  /* reset cap seq */
+		rec->pathbase = cpu_to_le64(pathbase);
+		rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+		rec->issued = cpu_to_le32(cap->issued);
+		rec->size = cpu_to_le64(inode->i_size);
+		ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
+		ceph_encode_timespec(&rec->atime, &inode->i_atime);
+		rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+		spin_unlock(&inode->i_lock);
+
+		kfree(path);
+		dput(dentry);
+		num_caps++;
+	}
+	*pnum_caps = cpu_to_le32(num_caps);
+
+	/*
+	 * snaprealms.  we provide mds with the ino, seq (version), and
+	 * parent for all of our realms.  If the mds has any newer info,
+	 * it will tell us.
+	 */
+	next_snap_ino = 0;
+	/* save some space for the snaprealm count */
+	pnum_realms = p;
+	ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
+	p += sizeof(*pnum_realms);
+	num_realms = 0;
+	while (1) {
+		struct ceph_snap_realm *realm;
+		struct ceph_mds_snaprealm_reconnect *sr_rec;
+		got = radix_tree_gang_lookup(&mdsc->snap_realms,
+					     (void **)&realm, next_snap_ino, 1);
+		if (!got)
+			break;
+
+		dout(10, " adding snap realm %llx seq %lld parent %llx\n",
+		     realm->ino, realm->seq, realm->parent_ino);
+		ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
+		sr_rec = p;
+		sr_rec->ino = cpu_to_le64(realm->ino);
+		sr_rec->seq = cpu_to_le64(realm->seq);
+		sr_rec->parent = cpu_to_le64(realm->parent_ino);
+		p += sizeof(*sr_rec);
+		num_realms++;
+		next_snap_ino = realm->ino + 1;
+	}
+	*pnum_realms = cpu_to_le32(num_realms);
+
+send:
+	reply->front.iov_len = p - reply->front.iov_base;
+	reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
+	dout(10, "final len was %u (guessed %d)\n",
+	     (unsigned)reply->front.iov_len, len);
+	ceph_send_msg_mds(mdsc, reply, mds);
+
+	if (session) {
+		session->s_state = CEPH_MDS_SESSION_OPEN;
+		__wake_requests(mdsc, &session->s_waiting);
+	}
+
+out:
+	up_read(&mdsc->snap_rwsem);
+	if (session) {
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+	}
+	mutex_lock(&mdsc->mutex);
+	return;
+
+needmore:
+	/*
+	 * we need a larger buffer.  this doesn't very accurately
+	 * factor in snap realms, but it's safe.
+	 */
+	num_caps += num_realms;
+	newlen = (len * (session->s_nr_caps+3)) / (num_caps + 1);
+	dout(30, "i guessed %d, and did %d of %d caps, retrying with %d\n",
+	     len, num_caps, session->s_nr_caps, newlen);
+	len = newlen;
+	ceph_msg_put(reply);
+	goto retry;
+}
+
+
+/*
+ * if the client is unresponsive for long enough, the mds will kill
+ * the session entirely.
+ */
+void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds)
+{
+	derr(1, "mds%d gave us the boot.  IMPLEMENT RECONNECT.\n", mds);
+}
+
+
+
+/*
+ * compare old and new mdsmaps, kicking requests
+ * and closing out old connections as necessary
+ *
+ * called under mdsc->mutex.
+ */
+static void check_new_map(struct ceph_mds_client *mdsc,
+			  struct ceph_mdsmap *newmap,
+			  struct ceph_mdsmap *oldmap)
+{
+	int i;
+	int oldstate, newstate;
+	struct ceph_mds_session *s;
+
+	dout(20, "check_new_map new %u old %u\n",
+	     newmap->m_epoch, oldmap->m_epoch);
+
+	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+		if (mdsc->sessions[i] == NULL)
+			continue;
+		s = mdsc->sessions[i];
+		oldstate = ceph_mdsmap_get_state(oldmap, i);
+		newstate = ceph_mdsmap_get_state(newmap, i);
+
+		dout(20, "check_new_map mds%d state %d -> %d (session %s)\n",
+		     i, oldstate, newstate, session_state_name(s->s_state));
+		if (newstate < oldstate) {
+			/* if the state moved backwards, that means
+			 * the old mds failed and/or a new mds is
+			 * recovering in its place. */
+			/* notify messenger to close out old messages,
+			 * socket. */
+			ceph_messenger_mark_down(mdsc->client->msgr,
+						 &oldmap->m_addr[i]);
+
+			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
+				/* the session never opened, just close it
+				 * out now */
+				__wake_requests(mdsc, &s->s_waiting);
+				unregister_session(mdsc, i);
+			}
+
+			/* kick any requests waiting on the recovering mds */
+			kick_requests(mdsc, i, 1);
+			continue;
+		}
+
+		/*
+		 * kick requests on any mds that has gone active.
+		 *
+		 * kick requests on cur or forwarder: we may have sent
+		 * the request to mds1, mds1 told us it forwarded it
+		 * to mds2, but then we learn mds1 failed and can't be
+		 * sure it successfully forwarded our request before
+		 * it died.
+		 */
+		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
+		    newstate >= CEPH_MDS_STATE_ACTIVE)
+			kick_requests(mdsc, i, 1);
+	}
+}
+
+
+
+/*
+ * leases
+ */
+
+void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	struct super_block *sb = mdsc->client->sb;
+	struct inode *inode;
+	struct ceph_mds_session *session;
+	struct ceph_inode_info *ci;
+	struct dentry *parent, *dentry;
+	struct ceph_dentry_info *di;
+	int mds;
+	struct ceph_mds_lease *h = msg->front.iov_base;
+	struct ceph_vino vino;
+	int mask;
+	struct qstr dname;
+
+	if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+		return;
+	mds = le32_to_cpu(msg->hdr.src.name.num);
+	dout(10, "handle_lease from mds%d\n", mds);
+
+	/* decode */
+	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
+		goto bad;
+	vino.ino = le64_to_cpu(h->ino);
+	vino.snap = CEPH_NOSNAP;
+	mask = le16_to_cpu(h->mask);
+	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
+	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
+	if (dname.len != le32_to_cpu(*(__le32 *)(h+1)))
+		goto bad;
+
+	/* find session */
+	mutex_lock(&mdsc->mutex);
+	session = __ceph_lookup_mds_session(mdsc, mds);
+	mutex_unlock(&mdsc->mutex);
+	if (!session) {
+		derr(0, "WTF, got lease but no session for mds%d\n", mds);
+		return;
+	}
+
+	mutex_lock(&session->s_mutex);
+	session->s_seq++;
+
+	/* lookup inode */
+	inode = ceph_find_inode(sb, vino);
+	dout(20, "handle_lease action is %d, mask %d, ino %llx %p\n", h->action,
+	     mask, vino.ino, inode);
+	if (inode == NULL) {
+		dout(10, "handle_lease no inode %llx\n", vino.ino);
+		goto release;
+	}
+
+	BUG_ON(h->action != CEPH_MDS_LEASE_REVOKE);  /* for now */
+
+	/* inode */
+	ci = ceph_inode(inode);
+
+	/* dentry */
+	if (mask & CEPH_LOCK_DN) {
+		parent = d_find_alias(inode);
+		if (!parent) {
+			dout(10, "no parent dentry on inode %p\n", inode);
+			WARN_ON(1);
+			goto release;  /* hrm... */
+		}
+		dname.hash = full_name_hash(dname.name, dname.len);
+		dentry = d_lookup(parent, &dname);
+		dput(parent);
+		if (!dentry)
+			goto release;
+		di = ceph_dentry(dentry);
+		if (di && di->lease_session == session) {
+			h->seq = cpu_to_le32(di->lease_seq);
+			revoke_dentry_lease(dentry);
+		}
+		dput(dentry);
+	}
+
+release:
+	iput(inode);
+	/* let's just reuse the same message */
+	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
+	ceph_msg_get(msg);
+	ceph_send_msg_mds(mdsc, msg, mds);
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+	return;
+
+bad:
+	dout(0, "corrupt lease message\n");
+}
+
+
+/*
+ * Preemptively release a lease we expect to invalidate anyway.
+ * Pass @inode always, @dentry is optional.
+ */
+void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
+			     struct dentry *dentry, int mask)
+{
+	struct ceph_msg *msg;
+	struct ceph_mds_lease *lease;
+	struct ceph_dentry_info *di;
+	int origmask = mask;
+	int mds = -1;
+	int len = sizeof(*lease) + sizeof(u32);
+	int dnamelen = 0;
+
+	BUG_ON(inode == NULL);
+	BUG_ON(dentry == NULL);
+
+	/* is dentry lease valid? */
+	if (mask & CEPH_LOCK_DN) {
+		spin_lock(&dentry->d_lock);
+		di = ceph_dentry(dentry);
+		if (di &&
+		    di->lease_session->s_mds >= 0 &&
+		    di->lease_gen == di->lease_session->s_cap_gen &&
+		    time_before(jiffies, dentry->d_time)) {
+			/* we do have a lease on this dentry; note mds */
+			mds = di->lease_session->s_mds;
+			dnamelen = dentry->d_name.len;
+			len += dentry->d_name.len;
+		} else {
+			mask &= ~CEPH_LOCK_DN;  /* no lease; clear DN bit */
+		}
+		spin_unlock(&dentry->d_lock);
+	} else {
+		mask &= ~CEPH_LOCK_DN;  /* no lease; clear DN bit */
+	}
+
+	if (mask == 0) {
+		dout(10, "lease_release inode %p dentry %p -- "
+		     "no lease on %d\n",
+		     inode, dentry, origmask);
+		return;  /* nothing to drop */
+	}
+	BUG_ON(mds < 0);
+
+	dout(10, "lease_release inode %p dentry %p %d mask %d to mds%d\n",
+	     inode, dentry, dnamelen, mask, mds);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
+	if (IS_ERR(msg))
+		return;
+	lease = msg->front.iov_base;
+	lease->action = CEPH_MDS_LEASE_RELEASE;
+	lease->mask = cpu_to_le16(mask);
+	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
+	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
+	*(__le32 *)((void *)lease + sizeof(*lease)) = cpu_to_le32(dnamelen);
+	if (dentry)
+		memcpy((void *)lease + sizeof(*lease) + 4, dentry->d_name.name,
+		       dnamelen);
+	ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+
+/*
+ * delayed work -- periodically trim expired leases, renew caps with mds
+ */
+static void schedule_delayed(struct ceph_mds_client *mdsc)
+{
+	int delay = 5;
+	unsigned hz = round_jiffies_relative(HZ * delay);
+	schedule_delayed_work(&mdsc->delayed_work, hz);
+}
+
+static void delayed_work(struct work_struct *work)
+{
+	int i;
+	struct ceph_mds_client *mdsc =
+		container_of(work, struct ceph_mds_client, delayed_work.work);
+	int renew_interval;
+	int renew_caps;
+	u32 want_map = 0;
+
+	dout(30, "delayed_work\n");
+	ceph_check_delayed_caps(mdsc);
+
+	mutex_lock(&mdsc->mutex);
+	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
+	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
+				   mdsc->last_renew_caps);
+	if (renew_caps)
+		mdsc->last_renew_caps = jiffies;
+
+	for (i = 0; i < mdsc->max_sessions; i++) {
+		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
+		if (s == NULL)
+			continue;
+		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
+			dout(10, "resending session close request for mds%d\n",
+			     s->s_mds);
+			request_close_session(mdsc, s);
+			ceph_put_mds_session(s);
+			continue;
+		}
+		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
+			derr(1, "mds%d session probably timed out, "
+			     "requesting mds map\n", s->s_mds);
+			want_map = mdsc->mdsmap->m_epoch;
+		}
+		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+			/* this mds is failed or recovering, just wait */
+			ceph_put_mds_session(s);
+			continue;
+		}
+		mutex_unlock(&mdsc->mutex);
+
+		mutex_lock(&s->s_mutex);
+		if (renew_caps)
+			send_renew_caps(mdsc, s);
+		ceph_trim_session_rdcaps(s);
+		mutex_unlock(&s->s_mutex);
+		ceph_put_mds_session(s);
+
+		mutex_lock(&mdsc->mutex);
+	}
+	mutex_unlock(&mdsc->mutex);
+
+	if (want_map)
+		ceph_monc_request_mdsmap(&mdsc->client->monc, want_map);
+
+	schedule_delayed(mdsc);
+}
+
+
+void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
+{
+	mdsc->client = client;
+	mutex_init(&mdsc->mutex);
+	mdsc->mdsmap = NULL;            /* none yet */
+	init_completion(&mdsc->safe_umount_waiters);
+	init_completion(&mdsc->session_close_waiters);
+	INIT_LIST_HEAD(&mdsc->waiting_for_map);
+	mdsc->sessions = NULL;
+	mdsc->max_sessions = 0;
+	mdsc->stopping = 0;
+	init_rwsem(&mdsc->snap_rwsem);
+	INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
+	INIT_LIST_HEAD(&mdsc->snap_empty);
+	spin_lock_init(&mdsc->snap_empty_lock);	
+	mdsc->last_tid = 0;
+	INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
+	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
+	mdsc->last_renew_caps = jiffies;
+	INIT_LIST_HEAD(&mdsc->cap_delay_list);
+	spin_lock_init(&mdsc->cap_delay_lock);
+	INIT_LIST_HEAD(&mdsc->snap_flush_list);
+	spin_lock_init(&mdsc->snap_flush_lock);
+}
+
+/*
+ * drop all leases (and dentry refs) in preparation for umount
+ */
+static void drop_leases(struct ceph_mds_client *mdsc)
+{
+	int i;
+
+	dout(10, "drop_leases\n");
+	mutex_lock(&mdsc->mutex);
+	for (i = 0; i < mdsc->max_sessions; i++) {
+		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
+		if (!s)
+			continue;
+		mutex_unlock(&mdsc->mutex);
+		mutex_lock(&s->s_mutex);
+		mutex_unlock(&s->s_mutex);
+		ceph_put_mds_session(s);
+		mutex_lock(&mdsc->mutex);
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
+/*
+ * Wait for safe replies on open mds requests.  If we time out, drop
+ * all requests from the tree to avoid dangling dentry refs.
+ */
+static void wait_requests(struct ceph_mds_client *mdsc)
+{
+	struct ceph_mds_request *req;
+
+	mutex_lock(&mdsc->mutex);
+	if (__get_oldest_tid(mdsc)) {
+		mutex_unlock(&mdsc->mutex);
+		dout(10, "wait_requests waiting for requests\n");
+		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
+					    CEPH_MOUNT_TIMEOUT);
+		mutex_lock(&mdsc->mutex);
+
+		/* tear down remaining requests */
+		while (radix_tree_gang_lookup(&mdsc->request_tree,
+					      (void **)&req, 0, 1)) {
+			dout(10, "wait_requests timed out on tid %llu\n",
+			     req->r_tid);
+			radix_tree_delete(&mdsc->request_tree, req->r_tid);
+			ceph_mdsc_put_request(req);
+		}
+	}
+	mutex_unlock(&mdsc->mutex);
+	dout(10, "wait_requests done\n");
+}
+
+/*
+ * called before mount is ro, and before dentries are torn down.
+ * (hmm, does this still race with new lookups?)
+ */
+void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
+{
+	dout(10, "pre_umount\n");
+	mdsc->stopping = 1;
+
+	/*
+	 * clean out the delayed cap list; we will flush everything
+	 * explicitly below.
+	 */
+	spin_lock(&mdsc->cap_delay_lock);
+	while (!list_empty(&mdsc->cap_delay_list)) {
+		struct ceph_inode_info *ci;
+		ci = list_first_entry(&mdsc->cap_delay_list,
+				      struct ceph_inode_info,
+				      i_cap_delay_list);
+		list_del_init(&ci->i_cap_delay_list);
+		spin_unlock(&mdsc->cap_delay_lock);
+		iput(&ci->vfs_inode);
+		spin_lock(&mdsc->cap_delay_lock);
+	}
+	spin_unlock(&mdsc->cap_delay_lock);
+
+	drop_leases(mdsc);
+	ceph_check_delayed_caps(mdsc);
+	wait_requests(mdsc);
+}
+
+/*
+ * called after sb is ro.
+ */
+void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
+{
+	struct ceph_mds_session *session;
+	int i;
+	int n;
+	unsigned long started, timeout = CEPH_MOUNT_TIMEOUT;
+	struct ceph_client *client = mdsc->client;
+
+	dout(10, "close_sessions\n");
+
+	mutex_lock(&mdsc->mutex);
+
+	/* close sessions, caps.
+	 *
+	 * WARNING the session close timeout (and forced unmount in
+	 * general) is somewhat broken.. we'll leaved inodes pinned
+	 * and other nastyness.
+	 */
+	started = jiffies;
+	while (time_before(jiffies, started + timeout)) {
+		dout(10, "closing sessions\n");
+		n = 0;
+		for (i = 0; i < mdsc->max_sessions; i++) {
+			session = __ceph_lookup_mds_session(mdsc, i);
+			if (!session)
+				continue;
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&session->s_mutex);
+			__close_session(mdsc, session);
+			mutex_unlock(&session->s_mutex);
+			ceph_put_mds_session(session);
+			mutex_lock(&mdsc->mutex);
+			n++;
+		}
+		if (n == 0)
+			break;
+
+		if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
+			break;
+
+		dout(10, "waiting for sessions to close\n");
+		mutex_unlock(&mdsc->mutex);
+		wait_for_completion_timeout(&mdsc->session_close_waiters,
+					    timeout);
+		mutex_lock(&mdsc->mutex);
+	}
+
+	/* tear down remaining sessions */
+	for (i = 0; i < mdsc->max_sessions; i++) {
+		if (mdsc->sessions[i]) {
+			session = get_session(mdsc->sessions[i]);
+			unregister_session(mdsc, i);
+			mutex_unlock(&mdsc->mutex);
+			mutex_lock(&session->s_mutex);
+			remove_session_caps(session);
+			mutex_unlock(&session->s_mutex);
+			ceph_put_mds_session(session);
+			mutex_lock(&mdsc->mutex);			
+		}
+	}
+
+
+	WARN_ON(!list_empty(&mdsc->cap_delay_list));
+
+	mutex_unlock(&mdsc->mutex);
+
+	ceph_cleanup_empty_realms(mdsc);
+
+	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+
+	dout(10, "stopped\n");
+}
+
+void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
+{
+	dout(10, "stop\n");
+	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+	if (mdsc->mdsmap)
+		ceph_mdsmap_destroy(mdsc->mdsmap);
+	kfree(mdsc->sessions);
+}
+
+
+/*
+ * handle mds map update.
+ */
+void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	u32 epoch;
+	u32 maplen;
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	struct ceph_mdsmap *newmap, *oldmap;
+	ceph_fsid_t fsid;
+	int err = -EINVAL;
+	int from;
+	__le64 major, minor;
+
+	if (le32_to_cpu(msg->hdr.src.name.type) == CEPH_ENTITY_TYPE_MDS)
+		from = le32_to_cpu(msg->hdr.src.name.num);
+	else
+		from = -1;
+
+	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
+	ceph_decode_64_le(&p, major);
+	__ceph_fsid_set_major(&fsid, major);
+	ceph_decode_64_le(&p, minor);
+	__ceph_fsid_set_minor(&fsid, minor);
+	if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
+		derr(0, "got mdsmap with wrong fsid\n");
+		return;
+	}
+	ceph_decode_32(&p, epoch);
+	ceph_decode_32(&p, maplen);
+	dout(2, "handle_map epoch %u len %d\n", epoch, (int)maplen);
+
+	/* do we need it? */
+	ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
+	mutex_lock(&mdsc->mutex);
+	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
+		dout(2, "handle_map epoch %u <= our %u\n",
+		     epoch, mdsc->mdsmap->m_epoch);
+		mutex_unlock(&mdsc->mutex);
+		return;
+	}
+
+	newmap = ceph_mdsmap_decode(&p, end);
+	if (IS_ERR(newmap)) {
+		err = PTR_ERR(newmap);
+		goto bad_unlock;
+	}
+
+	/* swap into place */
+	if (mdsc->mdsmap) {
+		oldmap = mdsc->mdsmap;
+		mdsc->mdsmap = newmap;
+		check_new_map(mdsc, newmap, oldmap);
+		ceph_mdsmap_destroy(oldmap);
+
+		/* reconnect?  a recovering mds will send us an mdsmap,
+		 * indicating their state is RECONNECTING, if it wants us
+		 * to reconnect. */
+		if (from >= 0 && from < newmap->m_max_mds &&
+		    ceph_mdsmap_get_state(newmap, from) ==
+		    CEPH_MDS_STATE_RECONNECT)
+			send_mds_reconnect(mdsc, from);
+	} else {
+		mdsc->mdsmap = newmap;  /* first mds map */
+	}
+
+	__wake_requests(mdsc, &mdsc->waiting_for_map);
+
+	mutex_unlock(&mdsc->mutex);
+	schedule_delayed(mdsc);
+	return;
+
+bad_unlock:
+	mutex_unlock(&mdsc->mutex);
+bad:
+	derr(1, "problem with mdsmap %d\n", err);
+	return;
+}
+
+
+/* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
new file mode 100644
index 0000000..8b4c274
--- /dev/null
+++ b/fs/ceph/mds_client.h
@@ -0,0 +1,314 @@
+#ifndef _FS_CEPH_MDS_CLIENT_H
+#define _FS_CEPH_MDS_CLIENT_H
+
+#include <linux/completion.h>
+#include <linux/list.h>
+#include <linux/mutex.h>
+#include <linux/radix-tree.h>
+#include <linux/spinlock.h>
+
+#include "types.h"
+#include "messenger.h"
+#include "mdsmap.h"
+
+/*
+ * A cluster of MDS (metadata server) daemons is responsible for
+ * managing the file system namespace (the directory hierarchy and
+ * inodes) and for coordinating shared access to storage.  Metadata is
+ * partitioning hierarchically across a number of servers, and that
+ * partition varies over time as the cluster adjusts the distribution
+ * in order to balance load.
+ *
+ * The MDS client is primarily responsible to managing synchronous
+ * metadata requests for operations like open, unlink, and so forth.
+ * If there is a MDS failure, we find out about it when we (possibly
+ * request and) receive a new MDS map, and can resubmit affected
+ * requests.
+ *
+ * For the most part, though, we take advantage of a lossless
+ * communications channel to the MDS, and do not need to worry about
+ * timing out or resubmitting requests.
+ *
+ * We maintain a stateful "session" with each MDS we interact with.
+ * Within each session, we sent periodic heartbeat messages to ensure
+ * any capabilities or leases we have been issues remain valid.  If
+ * the session times out and goes stale, our leases and capabilities
+ * are no longer valid.
+ */
+
+/*
+ * Some lock dependencies:
+ *
+ * session->s_mutex
+ *         mdsc->mutex
+ *
+ *         mdsc->snap_rwsem
+ *
+ *         inode->i_lock
+ *                 mdsc->snap_flush_lock
+ *                 mdsc->cap_delay_lock
+ *
+ *
+ */
+
+struct ceph_client;
+struct ceph_cap;
+
+/*
+ * parsed info about a single inode.  pointers are into the encoded
+ * on-wire structures within the mds reply message payload.
+ */
+struct ceph_mds_reply_info_in {
+	struct ceph_mds_reply_inode *in;
+	u32 symlink_len;
+	char *symlink;
+	u32 xattr_len;
+	char *xattr_data;
+};
+
+/*
+ * parsed info about an mds reply, including a "trace" from
+ * the referenced inode, through its parents up to the root
+ * directory, and directory contents (for readdir results).
+ */
+struct ceph_mds_reply_info_parsed {
+	struct ceph_mds_reply_head    *head;
+
+	int trace_numi, trace_numd, trace_snapdirpos;
+	struct ceph_mds_reply_info_in *trace_in;
+	struct ceph_mds_reply_dirfrag **trace_dir;
+	char                          **trace_dname;
+	u32                           *trace_dname_len;
+	struct ceph_mds_reply_lease   **trace_dlease;
+
+	struct ceph_mds_reply_dirfrag *dir_dir;
+	int                           dir_nr;
+	char                          **dir_dname;
+	u32                           *dir_dname_len;
+	struct ceph_mds_reply_lease   **dir_dlease;
+	struct ceph_mds_reply_info_in *dir_in;
+
+	/* encoded blob describing snapshot contexts for certain
+	   operations (e.g., open) */
+	void *snapblob;
+	int snapblob_len;
+};
+
+/*
+ * state associated with each MDS<->client session
+ */
+enum {
+	CEPH_MDS_SESSION_NEW = 1,
+	CEPH_MDS_SESSION_OPENING = 2,
+	CEPH_MDS_SESSION_OPEN = 3,
+	CEPH_MDS_SESSION_FLUSHING = 4,
+	CEPH_MDS_SESSION_CLOSING = 5,
+	CEPH_MDS_SESSION_RECONNECTING = 6
+};
+
+struct ceph_mds_session {
+	int               s_mds;
+	int               s_state;
+	unsigned long     s_ttl;      /* time until mds kills us */
+	u64               s_seq;      /* incoming msg seq # */
+	struct mutex      s_mutex;    /* serialize session messages */
+	spinlock_t        s_cap_lock; /* protects s_cap_gen, s_cap_ttl */
+	u32               s_cap_gen;  /* inc each time we get mds stale msg */
+	unsigned long     s_cap_ttl;  /* when session caps expire */
+	unsigned long     s_renew_requested; /* last time we sent a renew req */
+	struct list_head  s_caps;     /* all caps issued by this session */
+	struct list_head  s_rdcaps;   /* just the readonly caps */
+	spinlock_t        s_rdcaps_lock;
+	int               s_nr_caps;
+	atomic_t          s_ref;
+	struct list_head  s_waiting;  /* waiting requests */
+	struct list_head  s_unsafe;   /* unsafe requests */
+};
+
+/*
+ * modes of choosing which MDS to send a request to
+ */
+enum {
+	USE_ANY_MDS,
+	USE_RANDOM_MDS,
+	USE_CAP_MDS,    /* prefer mds we hold caps from */
+	USE_AUTH_MDS,   /* prefer authoritative mds for this metadata item */
+};
+
+struct ceph_mds_request;
+struct ceph_mds_client;
+
+typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
+					     struct ceph_mds_request *req);
+
+struct ceph_mds_request_attr {
+	struct attribute attr;
+	ssize_t (*show)(struct ceph_mds_request *, struct ceph_mds_request_attr *,
+			char *);
+	ssize_t (*store)(struct ceph_mds_request *, struct ceph_mds_request_attr *,
+			 const char *, size_t);
+};
+
+/*
+ * an in-flight mds request
+ */
+struct ceph_mds_request {
+	u64 r_tid;                   /* transaction id */
+
+	struct kobject	  kobj;
+	struct ceph_mds_request_attr k_mds, k_op;
+
+	int r_op;
+	struct dentry *r_dentry;
+	struct dentry *r_old_dentry; /* rename from or link from */
+	const char *r_path1, *r_path2;
+	union ceph_mds_request_args r_args;
+
+	struct ceph_msg  *r_request;  /* original request */
+	struct ceph_msg  *r_reply;
+	struct ceph_mds_reply_info_parsed r_reply_info;
+	int r_err;
+	unsigned long r_timeout;  /* optional.  jiffies */
+
+	unsigned long r_started;  /* start time to measure timeout against */
+	unsigned long r_request_started; /* start time for mds request only,
+					    used to measure lease durations */
+
+	/* for choosing which mds to send this request to */
+	int r_direct_mode;
+	u32 r_direct_hash;      /* choose dir frag based on this dentry hash */
+	bool r_direct_is_hash;  /* true if r_direct_hash is valid */
+
+	struct inode	*r_listener;
+	struct list_head r_listener_item;
+
+	/* references to the trailing dentry and inode from parsing the
+	 * mds response.  also used to feed a VFS-provided dentry into
+	 * the reply handler */
+	struct inode     *r_last_inode;
+	struct ceph_cap  *r_expected_cap; /* preallocate cap if we expect one */
+	int               r_fmode;        /* file mode, if expecting cap */
+	struct ceph_mds_session *r_session;
+	struct ceph_mds_session *r_fwd_session;  /* forwarded from */
+	struct inode     *r_locked_dir; /* dir (if any) i_mutex locked by vfs */
+
+	int               r_attempts;   /* resend attempts */
+	int               r_num_fwd;    /* number of forward attempts */
+	int               r_resend_mds; /* mds to resend to next, if any*/
+
+	atomic_t          r_ref;
+	struct list_head  r_wait;
+	struct completion r_completion;
+	struct completion r_safe_completion;
+	ceph_mds_request_callback_t r_callback;
+	struct list_head  r_unsafe_item;  /* per-session unsafe list item */
+	bool		  r_got_unsafe, r_got_safe;
+};
+
+/*
+ * mds client state
+ */
+struct ceph_mds_client {
+	struct ceph_client      *client;
+	struct mutex            mutex;         /* all nested structures */
+
+	struct ceph_mdsmap      *mdsmap;
+	struct completion       safe_umount_waiters, session_close_waiters;
+	struct list_head        waiting_for_map;
+
+	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
+	int                     max_sessions;  /* len of s_mds_sessions */
+	int                     stopping;      /* true if shutting down */
+
+	/*
+	 * snap_rwsem will cover cap linkage into snaprealms, and
+	 * realm snap contexts.  (later, we can do per-realm snap
+	 * contexts locks..)  the empty list contains realms with no
+	 * references (implying they contain no inodes with caps) that
+	 * should be destroyed.
+	 */
+	struct rw_semaphore     snap_rwsem;
+	struct radix_tree_root  snap_realms;
+	struct list_head        snap_empty;
+	spinlock_t              snap_empty_lock;  /* protect snap_empty */
+
+	u64                    last_tid;      /* most recent mds request */
+	struct radix_tree_root request_tree;  /* pending mds requests */
+	struct delayed_work    delayed_work;  /* delayed work */
+	unsigned long    last_renew_caps;  /* last time we renewed our caps */
+	struct list_head cap_delay_list;   /* caps with delayed release */
+	spinlock_t       cap_delay_lock;   /* protects cap_delay_list */
+	struct list_head snap_flush_list;  /* cap_snaps ready to flush */
+	spinlock_t       snap_flush_lock;
+
+	struct kobject		kobj;
+};
+
+extern const char *ceph_mds_op_name(int op);
+
+extern struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
+
+inline static struct ceph_mds_session *
+ceph_get_mds_session(struct ceph_mds_session *s)
+{
+	atomic_inc(&s->s_ref);
+	return s;
+}
+
+/*
+ * requests
+ */
+static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
+{
+	atomic_inc(&req->r_ref);
+}
+
+extern void ceph_put_mds_session(struct ceph_mds_session *s);
+
+extern void ceph_send_msg_mds(struct ceph_mds_client *mdsc,
+			      struct ceph_msg *msg, int mds);
+
+extern void ceph_mdsc_init(struct ceph_mds_client *mdsc,
+			   struct ceph_client *client);
+extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc);
+extern void ceph_mdsc_stop(struct ceph_mds_client *mdsc);
+
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
+				 struct ceph_msg *msg);
+extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+				     struct ceph_msg *msg);
+extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc,
+				   struct ceph_msg *msg);
+extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
+				     struct ceph_msg *msg);
+
+extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
+				   struct ceph_msg *msg);
+
+extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
+				    struct inode *inode,
+				    struct dentry *dn, int mask);
+
+extern struct ceph_mds_request *
+ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
+			 struct dentry *dentry, struct dentry *old_dentry,
+			 const char *path1, const char *path2,
+			 int mode);
+extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+				     struct ceph_mds_request *req);
+extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+				struct inode *listener,
+				struct ceph_mds_request *req);
+extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
+
+extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
+
+extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds);
+
+extern void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
+				       struct ceph_mds_session *session);
+extern struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode,
+						    u64 tid);
+extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, int mds);
+
+#endif
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
new file mode 100644
index 0000000..87d3870
--- /dev/null
+++ b/fs/ceph/mdsmap.c
@@ -0,0 +1,118 @@
+#include <linux/bug.h>
+#include <linux/err.h>
+#include <linux/random.h>
+#include <linux/slab.h>
+#include <linux/types.h>
+
+#include "mdsmap.h"
+#include "messenger.h"
+#include "decode.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_mdsmap __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_MDSMAP
+#define DOUT_VAR ceph_debug_mdsmap
+#include "super.h"
+
+
+/*
+ * choose a random mds that is "up" (i.e. has a state > 0), or -1.
+ */
+int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
+{
+	int n = 0;
+	int i;
+	char r;
+
+	/* count */
+	for (i = 0; i < m->m_max_mds; i++)
+		if (m->m_state[i] > 0)
+			n++;
+	if (n == 0)
+		return -1;
+
+	/* pick */
+	get_random_bytes(&r, 1);
+	n = r % n;
+	i = 0;
+	for (i = 0; n > 0; i++, n--)
+		while (m->m_state[i] <= 0)
+			i++;
+
+	return i;
+}
+
+/*
+ * Ignore any fields we don't care about in the MDS map (there are quite
+ * a few of them).
+ */
+struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
+{
+	struct ceph_mdsmap *m;
+	int i, n;
+	int err = -EINVAL;
+
+	m = kzalloc(sizeof(*m), GFP_NOFS);
+	if (m == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(p, end, 8*sizeof(u32), bad);
+	ceph_decode_32(p, m->m_epoch);
+	ceph_decode_32(p, m->m_client_epoch);
+	ceph_decode_32(p, m->m_last_failure);
+	ceph_decode_32(p, m->m_root);
+	ceph_decode_32(p, m->m_session_timeout);
+	ceph_decode_32(p, m->m_session_autoclose);
+	ceph_decode_32(p, m->m_max_mds);
+
+	m->m_addr = kzalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_NOFS);
+	m->m_state = kzalloc(m->m_max_mds*sizeof(*m->m_state), GFP_NOFS);
+	if (m->m_addr == NULL || m->m_state == NULL)
+		goto badmem;
+
+	/* pick out active nodes from mds_info (state > 0) */
+	ceph_decode_32(p, n);
+	ceph_decode_need(p, end,
+			 n * (3*sizeof(u32) + sizeof(u64) +
+			      2*sizeof(*m->m_addr) +
+			      sizeof(struct ceph_timespec)),
+			 bad);
+	for (i = 0; i < n; i++) {
+		s32 mds, inc, state;
+		u64 state_seq;
+		struct ceph_entity_addr addr;
+
+		*p += sizeof(addr);  /* skip addr key */
+		ceph_decode_32(p, mds);
+		ceph_decode_32(p, inc);
+		ceph_decode_32(p, state);
+		ceph_decode_64(p, state_seq);
+		ceph_decode_copy(p, &addr, sizeof(addr));
+		dout(10, "mdsmap_decode %d/%d mds%d.%d %u.%u.%u.%u:%u state %d\n",
+		     i+1, n, mds, inc, IPQUADPORT(addr.ipaddr), state);
+		if (mds >= 0 && mds < m->m_max_mds && state > 0) {
+			m->m_state[mds] = state;
+			m->m_addr[mds] = addr;
+		}
+		*p += sizeof(struct ceph_timespec);
+	}
+
+	/* ok, we don't care about the rest. */
+	dout(30, "mdsmap_decode success epoch %u\n", m->m_epoch);
+	return m;
+
+badmem:
+	err = -ENOMEM;
+bad:
+	derr(0, "corrupt mdsmap");
+	ceph_mdsmap_destroy(m);
+	return ERR_PTR(-EINVAL);
+}
+
+void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
+{
+	kfree(m->m_addr);
+	kfree(m->m_state);
+	kfree(m);
+}
diff --git a/fs/ceph/mdsmap.h b/fs/ceph/mdsmap.h
new file mode 100644
index 0000000..b50e298
--- /dev/null
+++ b/fs/ceph/mdsmap.h
@@ -0,0 +1,94 @@
+#ifndef _FS_CEPH_MDSMAP_H
+#define _FS_CEPH_MDSMAP_H
+
+#include "types.h"
+
+/*
+ * mds map
+ *
+ * fields limited to those the client cares about
+ */
+struct ceph_mdsmap {
+	u32 m_epoch, m_client_epoch, m_last_failure;
+	u32 m_root;
+	u32 m_session_timeout;          /* seconds */
+	u32 m_session_autoclose;        /* seconds */
+	u32 m_max_mds;                  /* size of m_addr, m_state arrays */
+	struct ceph_entity_addr *m_addr;  /* mds addrs */
+	s32 *m_state;                   /* states */
+};
+
+static inline struct ceph_entity_addr *
+ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
+{
+	if (w >= m->m_max_mds)
+		return NULL;
+	return &m->m_addr[w];
+}
+
+static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
+{
+	BUG_ON(w < 0);
+	if (w >= m->m_max_mds)
+		return CEPH_MDS_STATE_DNE;
+	return m->m_state[w];
+}
+
+static inline char *ceph_mdsmap_state_str(int state)
+{
+	switch (state) {
+	case CEPH_MDS_STATE_DNE:
+		return "dne";
+		break;
+	case CEPH_MDS_STATE_STOPPED:
+		return "stopped";
+		break;
+	case CEPH_MDS_STATE_DESTROYING:
+		return "destroying";
+		break;
+	case CEPH_MDS_STATE_FAILED:
+		return "failed";
+		break;
+	case CEPH_MDS_STATE_BOOT:
+		return "boot";
+		break;
+	case CEPH_MDS_STATE_STANDBY:
+		return "standby";
+		break;
+	case CEPH_MDS_STATE_CREATING:
+		return "creating";
+		break;
+	case CEPH_MDS_STATE_STARTING:
+		return "starting";
+		break;
+	case CEPH_MDS_STATE_STANDBY_REPLAY:
+		return "standby replay";
+		break;
+	case CEPH_MDS_STATE_REPLAY:
+		return "replay";
+		break;
+	case CEPH_MDS_STATE_RESOLVE:
+		return "resolve";
+		break;
+	case CEPH_MDS_STATE_RECONNECT:
+		return "reconnect";
+		break;
+	case CEPH_MDS_STATE_REJOIN:
+		return "rejoin";
+		break;
+	case CEPH_MDS_STATE_ACTIVE:
+		return "active";
+		break;
+	case CEPH_MDS_STATE_STOPPING:
+		return "stopping";
+		break;
+	}
+
+	return "unknown";
+}
+
+extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
+extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
+extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
+
+#endif
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux