[PATCH 12/19] ceph: monitor client

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The monitor cluster is responsible for managing cluster membership
and state.  The monitor client handles what minimal interaction
the Ceph client has with it.

Signed-off-by: Sage Weil <sage@xxxxxxxxxxxx>
---
 fs/ceph/mon_client.c |  385 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/mon_client.h |  100 +++++++++++++
 2 files changed, 485 insertions(+), 0 deletions(-)
 create mode 100644 fs/ceph/mon_client.c
 create mode 100644 fs/ceph/mon_client.h

diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
new file mode 100644
index 0000000..ea4a5b2
--- /dev/null
+++ b/fs/ceph/mon_client.c
@@ -0,0 +1,385 @@
+
+#include <linux/types.h>
+#include <linux/random.h>
+#include <linux/sched.h>
+#include "mon_client.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_mon = -1;
+#define DOUT_MASK DOUT_MASK_MON
+#define DOUT_VAR ceph_debug_mon
+#define DOUT_PREFIX "mon: "
+#include "super.h"
+#include "decode.h"
+
+/*
+ * Decode a monmap blob (e.g., during mount).
+ */
+struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
+{
+	struct ceph_monmap *m;
+	int i, err = -EINVAL;
+
+	dout(30, "monmap_decode %p %p len %d\n", p, end, (int)(end-p));
+
+	/* The encoded and decoded sizes match. */
+	m = kmalloc(end-p, GFP_NOFS);
+	if (m == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(&p, end, 2*sizeof(u32) + 2*sizeof(u64), bad);
+	ceph_decode_64_le(&p, m->fsid.major);
+	ceph_decode_64_le(&p, m->fsid.minor);
+	ceph_decode_32(&p, m->epoch);
+	ceph_decode_32(&p, m->num_mon);
+	ceph_decode_need(&p, end, m->num_mon*sizeof(m->mon_inst[0]), bad);
+	ceph_decode_copy(&p, m->mon_inst, m->num_mon*sizeof(m->mon_inst[0]));
+	if (p != end)
+		goto bad;
+
+	dout(30, "monmap_decode epoch %d, num_mon %d\n", m->epoch,
+	     m->num_mon);
+	for (i = 0; i < m->num_mon; i++)
+		dout(30, "monmap_decode  mon%d is %u.%u.%u.%u:%u\n", i,
+		     IPQUADPORT(m->mon_inst[i].addr.ipaddr));
+	return m;
+
+bad:
+	dout(30, "monmap_decode failed with %d\n", err);
+	kfree(m);
+	return ERR_PTR(err);
+}
+
+/*
+ * return true if *addr is included in the monmap.
+ */
+int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
+{
+	int i;
+
+	for (i = 0; i < m->num_mon; i++)
+		if (ceph_entity_addr_equal(addr, &m->mon_inst[i].addr))
+			return 1;
+	return 0;
+}
+
+/*
+ * Choose a monitor.  If @notmon >= 0, choose a different monitor than
+ * last time.
+ */
+static int pick_mon(struct ceph_mon_client *monc, int notmon)
+{
+	char r;
+
+	if (notmon < 0 && monc->last_mon >= 0)
+		return monc->last_mon;
+	get_random_bytes(&r, 1);
+	monc->last_mon = r % monc->monmap->num_mon;
+	return monc->last_mon;
+}
+
+/*
+ * Delay work with exponential backoff.
+ */
+static void delayed_work(struct delayed_work *dwork, unsigned long *delay)
+{
+	schedule_delayed_work(dwork, *delay);
+	if (*delay < MAX_DELAY_INTERVAL)
+		*delay *= 2;
+	else
+		*delay = MAX_DELAY_INTERVAL;
+}
+
+
+/*
+ * mds map
+ */
+static void do_request_mdsmap(struct work_struct *work)
+{
+	struct ceph_msg *msg;
+	struct ceph_mds_getmap *h;
+	struct ceph_mon_client *monc =
+		container_of(work, struct ceph_mon_client,
+			     mds_delayed_work.work);
+	int mon = pick_mon(monc, -1);
+
+	dout(5, "request_mdsmap from mon%d want %u\n", mon, monc->want_mdsmap);
+	msg = ceph_msg_new(CEPH_MSG_MDS_GETMAP, sizeof(*h), 0, 0, NULL);
+	if (IS_ERR(msg))
+		return;
+	h = msg->front.iov_base;
+	h->fsid = monc->monmap->fsid;
+	h->want = cpu_to_le32(monc->want_mdsmap);
+	msg->hdr.dst = monc->monmap->mon_inst[mon];
+	ceph_msg_send(monc->client->msgr, msg, 0);
+
+	/* keep sending request until we receive mds map */
+	if (monc->want_mdsmap)
+		delayed_work(&monc->mds_delayed_work, &monc->mds_delay);
+}
+
+/*
+ * Register our desire for an mdsmap >= epoch @want.
+ */
+void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want)
+{
+	dout(5, "request_mdsmap want %u\n", want);
+	mutex_lock(&monc->req_mutex);
+	if (want > monc->want_mdsmap) {
+		monc->mds_delay = BASE_DELAY_INTERVAL;
+		monc->want_mdsmap = want;
+		do_request_mdsmap(&monc->mds_delayed_work.work);
+	}
+	mutex_unlock(&monc->req_mutex);
+}
+
+/*
+ * Called when we receive an mds map.
+ */
+int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+{
+	int ret = 0;
+
+	mutex_lock(&monc->req_mutex);
+	if (got < monc->want_mdsmap) {
+		dout(5, "got_mdsmap %u < wanted %u\n", got, monc->want_mdsmap);
+		ret = -EAGAIN;
+	} else {
+		dout(5, "got_mdsmap %u >= wanted %u\n", got, monc->want_mdsmap);
+		monc->want_mdsmap = 0;
+		cancel_delayed_work_sync(&monc->mds_delayed_work);
+		monc->mds_delay = BASE_DELAY_INTERVAL;
+	}
+	mutex_unlock(&monc->req_mutex);
+	return ret;
+}
+
+
+/*
+ * osd map
+ */
+static void do_request_osdmap(struct work_struct *work)
+{
+	struct ceph_msg *msg;
+	struct ceph_osd_getmap *h;
+	struct ceph_mon_client *monc =
+		container_of(work, struct ceph_mon_client,
+			     osd_delayed_work.work);
+	int mon = pick_mon(monc, -1);
+
+	dout(5, "request_osdmap from mon%d want %u\n", mon, monc->want_osdmap);
+	msg = ceph_msg_new(CEPH_MSG_OSD_GETMAP, sizeof(*h), 0, 0, NULL);
+	if (IS_ERR(msg))
+		return;
+	h = msg->front.iov_base;
+	h->fsid = monc->monmap->fsid;
+	h->start = cpu_to_le32(monc->want_osdmap);
+	msg->hdr.dst = monc->monmap->mon_inst[mon];
+	ceph_msg_send(monc->client->msgr, msg, 0);
+
+	/* keep sending request until we receive osd map */
+	if (monc->want_osdmap)
+		delayed_work(&monc->osd_delayed_work, &monc->osd_delay);
+}
+
+void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want)
+{
+	dout(5, "request_osdmap want %u\n", want);
+	mutex_lock(&monc->req_mutex);
+	monc->osd_delay = BASE_DELAY_INTERVAL;
+	monc->want_osdmap = want;
+	do_request_osdmap(&monc->osd_delayed_work.work);
+	mutex_unlock(&monc->req_mutex);
+}
+
+int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+{
+	int ret = 0;
+
+	mutex_lock(&monc->req_mutex);
+	if (got < monc->want_osdmap) {
+		dout(5, "got_osdmap %u < wanted %u\n", got, monc->want_osdmap);
+		ret = -EAGAIN;
+	} else {
+		dout(5, "got_osdmap %u >= wanted %u\n", got, monc->want_osdmap);
+		monc->want_osdmap = 0;
+		cancel_delayed_work_sync(&monc->osd_delayed_work);
+		monc->osd_delay = BASE_DELAY_INTERVAL;
+	}
+	mutex_unlock(&monc->req_mutex);
+	return ret;
+}
+
+
+/*
+ * umount
+ */
+static void do_request_umount(struct work_struct *work)
+{
+	struct ceph_msg *msg;
+	struct ceph_mon_client *monc =
+		container_of(work, struct ceph_mon_client,
+			     umount_delayed_work.work);
+	int mon = pick_mon(monc, -1);
+
+	dout(5, "do_request_umount from mon%d\n", mon);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_UNMOUNT, 0, 0, 0, NULL);
+	if (IS_ERR(msg))
+		return;
+	msg->hdr.dst = monc->monmap->mon_inst[mon];
+	ceph_msg_send(monc->client->msgr, msg, 0);
+
+	delayed_work(&monc->umount_delayed_work, &monc->umount_delay);
+}
+
+void ceph_monc_request_umount(struct ceph_mon_client *monc)
+{
+	struct ceph_client *client = monc->client;
+
+	/* don't bother if forced unmount */
+	if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
+		return;
+
+	mutex_lock(&monc->req_mutex);
+	monc->umount_delay = BASE_DELAY_INTERVAL;
+	do_request_umount(&monc->umount_delayed_work.work);
+	mutex_unlock(&monc->req_mutex);
+}
+
+/*
+ * Handle monitor umount ack.
+ */
+void ceph_monc_handle_umount(struct ceph_mon_client *monc,
+			     struct ceph_msg *msg)
+{
+	dout(5, "handle_umount\n");
+	mutex_lock(&monc->req_mutex);
+	cancel_delayed_work_sync(&monc->umount_delayed_work);
+	monc->client->mount_state = CEPH_MOUNT_UNMOUNTED;
+	mutex_unlock(&monc->req_mutex);
+	wake_up(&monc->client->mount_wq);
+}
+
+
+/*
+ * statfs
+ */
+void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
+				   struct ceph_msg *msg)
+{
+	struct ceph_mon_statfs_request *req;
+	struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
+	u64 tid;
+
+	if (msg->front.iov_len != sizeof(*reply))
+		goto bad;
+	tid = le64_to_cpu(reply->tid);
+	dout(10, "handle_statfs_reply %p tid %llu\n", msg, tid);
+
+	spin_lock(&monc->statfs_lock);
+	req = radix_tree_lookup(&monc->statfs_request_tree, tid);
+	if (req) {
+		radix_tree_delete(&monc->statfs_request_tree, tid);
+		req->buf->f_total = reply->st.f_total;
+		req->buf->f_free = reply->st.f_free;
+		req->buf->f_avail = reply->st.f_avail;
+		req->buf->f_objects = reply->st.f_objects;
+		req->result = 0;
+	}
+	spin_unlock(&monc->statfs_lock);
+	if (req)
+		complete(&req->completion);
+	return;
+
+bad:
+	derr(10, "corrupt statfs reply, no tid\n");
+}
+
+/*
+ * (re)send a statfs request
+ */
+static int send_statfs(struct ceph_mon_client *monc, u64 tid)
+{
+	struct ceph_msg *msg;
+	struct ceph_mon_statfs *h;
+	int mon = pick_mon(monc, -1);
+
+	dout(10, "send_statfs to mon%d tid %llu\n", mon, tid);
+	msg = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), 0, 0, NULL);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+	h = msg->front.iov_base;
+	h->fsid = monc->monmap->fsid;
+	h->tid = cpu_to_le64(tid);
+	msg->hdr.dst = monc->monmap->mon_inst[mon];
+	ceph_msg_send(monc->client->msgr, msg, 0);
+	return 0;
+}
+
+/*
+ * Do a synchronous statfs().
+ */
+int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
+{
+	struct ceph_mon_statfs_request req;
+	int err;
+
+	req.buf = buf;
+	init_completion(&req.completion);
+
+	/* register request */
+	err = radix_tree_preload(GFP_NOFS);
+	if (err < 0) {
+		derr(10, "ENOMEM in do_statfs\n");
+		return err;
+	}
+	spin_lock(&monc->statfs_lock);
+	req.tid = ++monc->last_tid;
+	req.last_attempt = jiffies;
+	radix_tree_insert(&monc->statfs_request_tree, req.tid, &req);
+	spin_unlock(&monc->statfs_lock);
+	radix_tree_preload_end();
+
+	/* send request and wait */
+	err = send_statfs(monc, req.tid);
+	if (err)
+		return err;
+	err = wait_for_completion_interruptible(&req.completion);
+	if (err == -EINTR)
+		return err;
+	return req.result;
+}
+
+
+int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
+{
+	dout(5, "init\n");
+	memset(monc, 0, sizeof(*monc));
+	monc->client = cl;
+	monc->monmap = kzalloc(sizeof(struct ceph_monmap) +
+		       sizeof(struct ceph_entity_addr) * MAX_MON_MOUNT_ADDR,
+		       GFP_KERNEL);
+	if (monc->monmap == NULL)
+		return -ENOMEM;
+	spin_lock_init(&monc->statfs_lock);
+	INIT_RADIX_TREE(&monc->statfs_request_tree, GFP_ATOMIC);
+	monc->last_tid = 0;
+	INIT_DELAYED_WORK(&monc->mds_delayed_work, do_request_mdsmap);
+	INIT_DELAYED_WORK(&monc->osd_delayed_work, do_request_osdmap);
+	INIT_DELAYED_WORK(&monc->umount_delayed_work, do_request_umount);
+	monc->mds_delay = monc->osd_delay = monc->umount_delay = 0;
+	mutex_init(&monc->req_mutex);
+	monc->want_mdsmap = 0;
+	monc->want_osdmap = 0;
+	return 0;
+}
+
+void ceph_monc_stop(struct ceph_mon_client *monc)
+{
+	dout(5, "stop\n");
+	cancel_delayed_work_sync(&monc->mds_delayed_work);
+	cancel_delayed_work_sync(&monc->osd_delayed_work);
+	cancel_delayed_work_sync(&monc->umount_delayed_work);
+	kfree(monc->monmap);
+}
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
new file mode 100644
index 0000000..920cda8
--- /dev/null
+++ b/fs/ceph/mon_client.h
@@ -0,0 +1,100 @@
+#ifndef _FS_CEPH_MON_CLIENT_H
+#define _FS_CEPH_MON_CLIENT_H
+
+#include "messenger.h"
+#include <linux/completion.h>
+#include <linux/radix-tree.h>
+
+/*
+ * A small cluster of Ceph "monitors" are responsible for managing critical
+ * cluster configuration and state information.  An odd number (e.g., 3, 5)
+ * of cmon daemons use a modified version of the Paxos part-time parliament
+ * algorithm to manage the MDS map (mds cluster membership), OSD map, and
+ * list of clients who have mounted the file system.
+ *
+ * Communication with the monitor cluster is lossy, so requests for
+ * information may have to be resent if we time out waiting for a response.
+ * As long as we do not time out, we continue to send all requests to the
+ * same monitor.  If there is a problem, we randomly pick a new monitor form
+ * the cluster to try.
+ */
+
+struct ceph_client;
+struct ceph_mount_args;
+
+/*
+ * The monitor map enumerates the set of all monitors.
+ *
+ * Make sure this structure size matches the encoded map size, or change
+ * ceph_monmap_decode().
+ */
+struct ceph_monmap {
+	struct ceph_fsid fsid;
+	u32 epoch;
+	u32 num_mon;
+	struct ceph_entity_inst mon_inst[0];
+};
+
+/*
+ * a pending statfs() request.
+ */
+struct ceph_mon_statfs_request {
+	u64 tid;
+	int result;
+	struct ceph_statfs *buf;
+	struct completion completion;
+	unsigned long last_attempt; /* jiffies */
+};
+
+struct ceph_mon_client {
+	struct ceph_client *client;
+	int last_mon;                       /* last monitor i contacted */
+	struct ceph_monmap *monmap;
+
+	/* pending statfs requests */
+	spinlock_t statfs_lock;
+	struct radix_tree_root statfs_request_tree;
+	u64 last_tid;
+
+	/* mds/osd map or umount requests */
+	struct delayed_work mds_delayed_work;
+	struct delayed_work osd_delayed_work;
+	struct delayed_work umount_delayed_work;
+	unsigned long mds_delay;
+	unsigned long osd_delay;
+	unsigned long umount_delay;
+	struct mutex req_mutex;
+	u32 want_mdsmap;
+	u32 want_osdmap;
+};
+
+extern struct ceph_monmap *ceph_monmap_decode(void *p, void *end);
+extern int ceph_monmap_contains(struct ceph_monmap *m,
+				struct ceph_entity_addr *addr);
+
+extern int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl);
+extern void ceph_monc_stop(struct ceph_mon_client *monc);
+
+/*
+ * The model here is to indicate that we need a new map of at least epoch
+ * @want, and to indicate which maps receive.  Periodically rerequest the map
+ * from the monitor cluster until we get what we want.
+ */
+extern void ceph_monc_request_mdsmap(struct ceph_mon_client *monc, u32 want);
+extern int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 have);
+
+extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want);
+extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
+
+extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
+
+extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
+			       struct ceph_statfs *buf);
+extern void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
+					  struct ceph_msg *msg);
+
+extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
+extern void ceph_monc_handle_umount(struct ceph_mon_client *monc,
+				    struct ceph_msg *msg);
+
+#endif
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux