[PATCH 10/20] ceph: OSD client

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The OSD client is responsible for reading and writing data from/to the
object storage pool.  This includes determining where objects are
stored in the cluster, and ensuring that requests are retried or
redirected in the event of a node failure or data migration.

If an OSD does not respond before a timeout expires, 'ping' messages
are sent across the lossless, ordered communications channel to
ensure that any break in the TCP is discovered.  If the session does
reset, a reconnection is attempted and affected requests are resent
(by the message transport layer).

Signed-off-by: Sage Weil <sage@xxxxxxxxxxxx>
---
 fs/ceph/osd_client.c |  983 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/osd_client.h |  151 ++++++++
 fs/ceph/osdmap.c     |  692 +++++++++++++++++++++++++++++++++++
 fs/ceph/osdmap.h     |   83 +++++
 4 files changed, 1909 insertions(+), 0 deletions(-)
 create mode 100644 fs/ceph/osd_client.c
 create mode 100644 fs/ceph/osd_client.h
 create mode 100644 fs/ceph/osdmap.c
 create mode 100644 fs/ceph/osdmap.h

diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
new file mode 100644
index 0000000..1cc0640
--- /dev/null
+++ b/fs/ceph/osd_client.c
@@ -0,0 +1,983 @@
+#include <linux/err.h>
+#include <linux/highmem.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+
+#include "ceph_debug.h"
+
+int ceph_debug_osdc __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_OSDC
+#define DOUT_VAR ceph_debug_osdc
+#include "super.h"
+
+#include "osd_client.h"
+#include "messenger.h"
+#include "crush/mapper.h"
+#include "decode.h"
+
+
+/*
+ * calculate the mapping of a file extent onto an object, and fill out the
+ * request accordingly.  shorten extent as necessary if it crosses an
+ * object boundary.
+ */
+static void calc_layout(struct ceph_osd_client *osdc,
+			struct ceph_vino vino, struct ceph_file_layout *layout,
+			u64 off, u64 *plen,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+	struct ceph_osd_op *op = (void *)(reqhead + 1);
+	u64 orig_len = *plen;
+	u64 objoff, objlen;    /* extent in object */
+	u64 bno;
+
+	reqhead->snapid = cpu_to_le64(vino.snap);
+
+	/* object extent? */
+	ceph_calc_file_object_mapping(layout, off, plen, &bno,
+				      &objoff, &objlen);
+	if (*plen < orig_len)
+		dout(10, " skipping last %llu, final file extent %llu~%llu\n",
+		     orig_len - *plen, off, *plen);
+
+	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
+	req->r_oid_len = strlen(req->r_oid);
+
+
+	op->offset = cpu_to_le64(objoff);
+	op->length = cpu_to_le64(objlen);
+	req->r_num_pages = calc_pages_for(off, *plen);
+
+	dout(10, "calc_layout %s (%d) %llu~%llu (%d pages)\n",
+	     req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
+}
+
+
+/*
+ * requests
+ */
+void ceph_osdc_put_request(struct ceph_osd_request *req)
+{
+	dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
+	     atomic_read(&req->r_ref)-1);
+	BUG_ON(atomic_read(&req->r_ref) <= 0);
+	if (atomic_dec_and_test(&req->r_ref)) {
+		if (req->r_request)
+			ceph_msg_put(req->r_request);
+		if (req->r_reply)
+			ceph_msg_put(req->r_reply);
+		if (req->r_own_pages)
+			ceph_release_page_vector(req->r_pages,
+						 req->r_num_pages);
+		ceph_put_snap_context(req->r_snapc);
+		kfree(req);
+	}
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.  include addition truncate or sync osd ops.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+					       struct ceph_file_layout *layout,
+					       struct ceph_vino vino,
+					       u64 off, u64 *plen,
+					       int opcode, int flags,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       u32 truncate_seq,
+					       u64 truncate_size,
+					       struct timespec *mtime)
+{
+	struct ceph_osd_request *req;
+	struct ceph_msg *msg;
+	struct ceph_osd_request_head *head;
+	struct ceph_osd_op *op;
+	void *p;
+	int do_trunc = truncate_seq && (off + *plen > truncate_size);
+	int num_op = 1 + do_sync + do_trunc;
+	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+	int i;
+	u64 prevofs;
+
+	/* we may overallocate here, if our write extent is shortened below */
+	req = kzalloc(sizeof(*req), GFP_NOFS);
+	if (req == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	atomic_set(&req->r_ref, 1);
+	init_completion(&req->r_completion);
+	init_completion(&req->r_safe_completion);
+	INIT_LIST_HEAD(&req->r_unsafe_item);
+	req->r_flags = flags;
+	req->r_last_osd = -1;
+
+	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+
+	/* create message; allow space for oid */
+	msg_size += 40 + osdc->client->signed_ticket_len;
+	if (snapc)
+		msg_size += sizeof(u64) * snapc->num_snaps;
+	msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
+	if (IS_ERR(msg)) {
+		kfree(req);
+		return ERR_PTR(PTR_ERR(msg));
+	}
+	memset(msg->front.iov_base, 0, msg->front.iov_len);
+	head = msg->front.iov_base;
+	op = (void *)(head + 1);
+	p = (void *)(op + num_op);
+
+	req->r_request = msg;
+	req->r_snapc = ceph_get_snap_context(snapc);
+
+	head->client_inc = cpu_to_le32(1); /* always, for now. */
+	head->flags = cpu_to_le32(flags);
+	if (flags & CEPH_OSD_FLAG_WRITE)
+		ceph_encode_timespec(&head->mtime, mtime);
+	head->num_ops = cpu_to_le16(num_op);
+	op->op = cpu_to_le16(opcode);
+
+	/* calculate max write size */
+	calc_layout(osdc, vino, layout, off, plen, req);
+	req->r_file_layout = *layout;  /* keep a copy */
+
+	if (flags & CEPH_OSD_FLAG_WRITE) {
+		req->r_request->hdr.data_off = cpu_to_le16(off);
+		req->r_request->hdr.data_len = cpu_to_le32(*plen);
+		op->payload_len = cpu_to_le32(*plen);
+	}
+
+
+	/* fill in oid, ticket */
+	head->object_len = cpu_to_le32(req->r_oid_len);
+	memcpy(p, req->r_oid, req->r_oid_len);
+	p += req->r_oid_len;
+
+	head->ticket_len = cpu_to_le32(osdc->client->signed_ticket_len);
+	memcpy(p, osdc->client->signed_ticket,
+	       osdc->client->signed_ticket_len);
+	p += osdc->client->signed_ticket_len;
+
+
+	/* additional ops */
+	if (do_trunc) {
+		op++;
+		op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
+			     CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
+		op->truncate_seq = cpu_to_le32(truncate_seq);
+		prevofs =  le64_to_cpu((op-1)->offset);
+		op->truncate_size = cpu_to_le64(truncate_size - (off-prevofs));
+	}
+	if (do_sync) {
+		op++;
+		op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
+	}
+	if (snapc) {
+		head->snap_seq = cpu_to_le64(snapc->seq);
+		head->num_snaps = cpu_to_le32(snapc->num_snaps);
+		for (i = 0; i < snapc->num_snaps; i++) {
+			put_unaligned_le64(snapc->snaps[i], p);
+			p += sizeof(u64);
+		}
+	}
+
+	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+	return req;
+}
+
+/*
+ * Register request, assign tid.  If this is the first request, set up
+ * the timeout event.
+ */
+static int register_request(struct ceph_osd_client *osdc,
+			    struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *head = req->r_request->front.iov_base;
+	int rc;
+
+	mutex_lock(&osdc->request_mutex);
+	req->r_tid = ++osdc->last_tid;
+	head->tid = cpu_to_le64(req->r_tid);
+
+	dout(30, "register_request %p tid %lld\n", req, req->r_tid);
+	rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
+	if (rc < 0)
+		goto out;
+
+	ceph_osdc_get_request(req);
+	osdc->num_requests++;
+
+	req->r_timeout_stamp =
+		jiffies + osdc->client->mount_args.osd_timeout*HZ;
+
+	if (osdc->num_requests == 1) {
+		osdc->timeout_tid = req->r_tid;
+		dout(30, "  timeout on tid %llu at %lu\n", req->r_tid,
+		     req->r_timeout_stamp);
+		schedule_delayed_work(&osdc->timeout_work,
+		      round_jiffies_relative(req->r_timeout_stamp - jiffies));
+	}
+out:
+	mutex_unlock(&osdc->request_mutex);
+	return rc;
+}
+
+/*
+ * Timeout callback, called every N seconds when 1 or more osd
+ * requests has been active for more than N seconds.  When this
+ * happens, we ping all OSDs with requests who have timed out to
+ * ensure any communications channel reset is detected.  Reset the
+ * request timeouts another N seconds in the future as we go.
+ * Reschedule the timeout event another N seconds in future (unless
+ * there are no open requests).
+ */
+static void handle_timeout(struct work_struct *work)
+{
+	struct ceph_osd_client *osdc =
+		container_of(work, struct ceph_osd_client, timeout_work.work);
+	struct ceph_osd_request *req;
+	unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+	unsigned long next_timeout = timeout + jiffies;
+	RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
+	u64 next_tid = 0;
+	int got;
+
+	dout(10, "timeout\n");
+	down_read(&osdc->map_sem);
+
+	ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+
+	mutex_lock(&osdc->request_mutex);
+	while (1) {
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+		if (got == 0)
+			break;
+		next_tid = req->r_tid + 1;
+		if (time_before(jiffies, req->r_timeout_stamp))
+			goto next;
+
+		req->r_timeout_stamp = next_timeout;
+		if (req->r_last_osd >= 0 &&
+		    radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
+			struct ceph_entity_name n = {
+				.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
+				.num = cpu_to_le32(req->r_last_osd)
+			};
+			dout(20, " tid %llu (at least) timed out on osd%d\n",
+			     req->r_tid, req->r_last_osd);
+			radix_tree_insert(&pings, req->r_last_osd, req);
+			ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
+		}
+
+	next:
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+	}
+
+	while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
+		radix_tree_delete(&pings, req->r_last_osd);
+
+	if (osdc->timeout_tid)
+		schedule_delayed_work(&osdc->timeout_work,
+				      round_jiffies_relative(timeout));
+
+	mutex_unlock(&osdc->request_mutex);
+
+	up_read(&osdc->map_sem);
+}
+
+/*
+ * called under osdc->request_mutex
+ */
+static void __unregister_request(struct ceph_osd_client *osdc,
+				 struct ceph_osd_request *req)
+{
+	dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
+	radix_tree_delete(&osdc->request_tree, req->r_tid);
+
+	osdc->num_requests--;
+	ceph_osdc_put_request(req);
+
+	if (req->r_tid == osdc->timeout_tid) {
+		if (osdc->num_requests == 0) {
+			dout(30, "no requests, canceling timeout\n");
+			osdc->timeout_tid = 0;
+			cancel_delayed_work(&osdc->timeout_work);
+		} else {
+			int ret;
+
+			ret = radix_tree_gang_lookup(&osdc->request_tree,
+						     (void **)&req, 0, 1);
+			BUG_ON(ret != 1);
+			osdc->timeout_tid = req->r_tid;
+			dout(30, "rescheduled timeout on tid %llu at %lu\n",
+			     req->r_tid, req->r_timeout_stamp);
+			schedule_delayed_work(&osdc->timeout_work,
+			      round_jiffies_relative(req->r_timeout_stamp -
+						     jiffies));
+		}
+	}
+}
+
+/*
+ * Pick an osd, and put result in req->r_last_osd[_addr].  The first
+ * up osd in the pg.  or -1.
+ *
+ * Caller should hold map_sem for read.
+ *
+ * return 0 if unchanged, 1 if changed.
+ */
+static int map_osds(struct ceph_osd_client *osdc,
+		    struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+	union ceph_pg pgid;
+	struct ceph_pg_pool_info *pool;
+	int ruleno;
+	unsigned pps; /* placement ps */
+	int osds[10], osd = -1;
+	int i, num;
+	int err;
+
+	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
+				      &req->r_file_layout, osdc->osdmap);
+	if (err)
+		return err;
+	pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
+	if (pgid.pg.pool >= osdc->osdmap->num_pools)
+		return -1;
+	pool = &osdc->osdmap->pg_pool[pgid.pg.pool];
+	ruleno = crush_find_rule(osdc->osdmap->crush, pool->v.crush_ruleset,
+				 pool->v.type, pool->v.size);
+	if (ruleno < 0) {
+		derr(0, "map_osds no crush rule for pool %d type %d size %d\n",
+		     pgid.pg.pool, pool->v.type, pool->v.size);
+		return -1;
+	}
+
+	if (pgid.pg.preferred >= 0)
+		pps = ceph_stable_mod(pgid.pg.ps,
+				      le32_to_cpu(pool->v.lpgp_num),
+				      pool->lpgp_num_mask);
+	else
+		pps = ceph_stable_mod(pgid.pg.ps,
+				      le32_to_cpu(pool->v.pgp_num),
+				      pool->pgp_num_mask);
+	pps += pgid.pg.pool;
+	num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
+			    min_t(int, pool->v.size, ARRAY_SIZE(osds)),
+			    pgid.pg.preferred, osdc->osdmap->osd_weight);
+
+	/* primary is first up osd */
+	for (i = 0; i < num; i++)
+		if (ceph_osd_is_up(osdc->osdmap, osds[i])) {
+			osd = osds[i];
+			break;
+		}
+	dout(20, "map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
+	     req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd);
+	if (req->r_last_osd == osd &&
+	    (osd < 0 || ceph_entity_addr_equal(&osdc->osdmap->osd_addr[osd],
+					       &req->r_last_osd_addr)))
+		return 0;
+	req->r_last_osd = osd;
+	if (osd >= 0)
+		req->r_last_osd_addr = osdc->osdmap->osd_addr[osd];
+	return 1;
+}
+
+/*
+ * caller should hold map_sem (for read)
+ */
+static int send_request(struct ceph_osd_client *osdc,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead;
+	int osd;
+
+	map_osds(osdc, req);
+	if (req->r_last_osd < 0) {
+		dout(10, "send_request %p no up osds in pg\n", req);
+		ceph_monc_request_osdmap(&osdc->client->monc,
+					 osdc->osdmap->epoch+1);
+		return 0;
+	}
+	osd = req->r_last_osd;
+
+	dout(10, "send_request %p tid %llu to osd%d flags %d\n",
+	     req, req->r_tid, osd, req->r_flags);
+
+	reqhead = req->r_request->front.iov_base;
+	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
+	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
+	reqhead->reassert_version = req->r_reassert_version;
+
+	req->r_request->hdr.dst.name.type =
+		cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+	req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
+	req->r_request->hdr.dst.addr = req->r_last_osd_addr;
+
+	req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
+
+	ceph_msg_get(req->r_request); /* send consumes a ref */
+	return ceph_msg_send(osdc->client->msgr, req->r_request,
+			     BASE_DELAY_INTERVAL);
+}
+
+/*
+ * handle osd op reply.  either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
+ */
+void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+	struct ceph_osd_request *req;
+	u64 tid;
+	int numops, object_len, flags;
+
+	if (msg->front.iov_len < sizeof(*rhead))
+		goto bad;
+	tid = le64_to_cpu(rhead->tid);
+	numops = le32_to_cpu(rhead->num_ops);
+	object_len = le32_to_cpu(rhead->object_len);
+	if (msg->front.iov_len != sizeof(*rhead) + object_len +
+	    numops * sizeof(struct ceph_osd_op))
+		goto bad;
+	dout(10, "handle_reply %p tid %llu\n", msg, tid);
+
+	/* lookup */
+	mutex_lock(&osdc->request_mutex);
+	req = radix_tree_lookup(&osdc->request_tree, tid);
+	if (req == NULL) {
+		dout(10, "handle_reply tid %llu dne\n", tid);
+		mutex_unlock(&osdc->request_mutex);
+		return;
+	}
+	ceph_osdc_get_request(req);
+	flags = le32_to_cpu(rhead->flags);
+
+	if (req->r_aborted) {
+		dout(10, "handle_reply tid %llu aborted\n", tid);
+		goto done;
+	}
+
+	if (req->r_reassert_version.epoch == 0) {
+		/* first ack */
+		if (req->r_reply == NULL) {
+			/* no data payload, or r_reply would have been set by
+			   prepare_pages. */
+			ceph_msg_get(msg);
+			req->r_reply = msg;
+		} else {
+			/* r_reply was set by prepare_pages */
+			BUG_ON(req->r_reply != msg);
+		}
+
+		/* in case we need to replay this op, */
+		req->r_reassert_version = rhead->reassert_version;
+	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
+		dout(10, "handle_reply tid %llu dup ack\n", tid);
+		goto done;
+	}
+
+	dout(10, "handle_reply tid %llu flags %d\n", tid, flags);
+
+	/* either this is a read, or we got the safe response */
+	if ((flags & CEPH_OSD_FLAG_ONDISK) ||
+	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
+		__unregister_request(osdc, req);
+
+	mutex_unlock(&osdc->request_mutex);
+
+	if (req->r_callback)
+		req->r_callback(req);
+	else
+		complete(&req->r_completion);
+
+	if (flags & CEPH_OSD_FLAG_ONDISK) {
+		if (req->r_safe_callback)
+			req->r_safe_callback(req);
+		complete(&req->r_safe_completion);  /* fsync waiter */
+	}
+
+done:
+	ceph_osdc_put_request(req);
+	return;
+
+bad:
+	derr(0, "got corrupt osd_op_reply got %d %d expected %d\n",
+	     (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
+	     (int)sizeof(*rhead));
+}
+
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+			  struct ceph_entity_addr *who)
+{
+	struct ceph_osd_request *req;
+	u64 next_tid = 0;
+	int got;
+	int needmap = 0;
+
+	mutex_lock(&osdc->request_mutex);
+	while (1) {
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+		if (got == 0)
+			break;
+		next_tid = req->r_tid + 1;
+
+		if (who && ceph_entity_addr_equal(who, &req->r_last_osd_addr))
+			goto kick;
+
+		if (map_osds(osdc, req) == 0)
+			continue;  /* no change */
+
+		if (req->r_last_osd < 0) {
+			dout(20, "tid %llu maps to no valid osd\n", req->r_tid);
+			needmap++;  /* request a newer map */
+			memset(&req->r_last_osd_addr, 0,
+			       sizeof(req->r_last_osd_addr));
+			continue;
+		}
+
+	kick:
+		dout(20, "kicking tid %llu osd%d\n", req->r_tid,
+		     req->r_last_osd);
+		ceph_osdc_get_request(req);
+		mutex_unlock(&osdc->request_mutex);
+		req->r_request = ceph_msg_maybe_dup(req->r_request);
+		if (!req->r_aborted) {
+			req->r_flags |= CEPH_OSD_FLAG_RETRY;
+			send_request(osdc, req);
+		}
+		ceph_osdc_put_request(req);
+		mutex_lock(&osdc->request_mutex);
+	}
+	mutex_unlock(&osdc->request_mutex);
+
+	if (needmap) {
+		dout(10, "%d requests for down osds, need new map\n", needmap);
+		ceph_monc_request_osdmap(&osdc->client->monc,
+					 osdc->osdmap->epoch+1);
+	}
+}
+
+/*
+ * Process updated osd map.
+ *
+ * The message contains any number of incremental and full maps.
+ */
+void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	void *p, *end, *next;
+	u32 nr_maps, maplen;
+	u32 epoch;
+	struct ceph_osdmap *newmap = NULL, *oldmap;
+	int err;
+	ceph_fsid_t fsid;
+
+	dout(2, "handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
+	p = msg->front.iov_base;
+	end = p + msg->front.iov_len;
+
+	/* verify fsid */
+	ceph_decode_need(&p, end, sizeof(fsid), bad);
+	ceph_decode_copy(&p, &fsid, sizeof(fsid));
+	if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
+		derr(0, "got map with wrong fsid, ignoring\n");
+		return;
+	}
+
+	down_write(&osdc->map_sem);
+
+	/* incremental maps */
+	ceph_decode_32_safe(&p, end, nr_maps, bad);
+	dout(10, " %d inc maps\n", nr_maps);
+	while (nr_maps > 0) {
+		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
+		ceph_decode_32(&p, epoch);
+		ceph_decode_32(&p, maplen);
+		ceph_decode_need(&p, end, maplen, bad);
+		next = p + maplen;
+		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
+			dout(10, "applying incremental map %u len %d\n",
+			     epoch, maplen);
+			newmap = apply_incremental(&p, next, osdc->osdmap,
+						   osdc->client->msgr);
+			if (IS_ERR(newmap)) {
+				err = PTR_ERR(newmap);
+				goto bad;
+			}
+			if (newmap != osdc->osdmap) {
+				ceph_osdmap_destroy(osdc->osdmap);
+				osdc->osdmap = newmap;
+			}
+		} else {
+			dout(10, "ignoring incremental map %u len %d\n",
+			     epoch, maplen);
+		}
+		p = next;
+		nr_maps--;
+	}
+	if (newmap)
+		goto done;
+
+	/* full maps */
+	ceph_decode_32_safe(&p, end, nr_maps, bad);
+	dout(30, " %d full maps\n", nr_maps);
+	while (nr_maps) {
+		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
+		ceph_decode_32(&p, epoch);
+		ceph_decode_32(&p, maplen);
+		ceph_decode_need(&p, end, maplen, bad);
+		if (nr_maps > 1) {
+			dout(5, "skipping non-latest full map %u len %d\n",
+			     epoch, maplen);
+		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
+			dout(10, "skipping full map %u len %d, "
+			     "older than our %u\n", epoch, maplen,
+			     osdc->osdmap->epoch);
+		} else {
+			dout(10, "taking full map %u len %d\n", epoch, maplen);
+			newmap = osdmap_decode(&p, p+maplen);
+			if (IS_ERR(newmap)) {
+				err = PTR_ERR(newmap);
+				goto bad;
+			}
+			oldmap = osdc->osdmap;
+			osdc->osdmap = newmap;
+			if (oldmap)
+				ceph_osdmap_destroy(oldmap);
+		}
+		p += maplen;
+		nr_maps--;
+	}
+
+done:
+	downgrade_write(&osdc->map_sem);
+	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+	if (newmap)
+		kick_requests(osdc, NULL);
+	up_read(&osdc->map_sem);
+	return;
+
+bad:
+	derr(1, "handle_map corrupt msg\n");
+	up_write(&osdc->map_sem);
+	return;
+}
+
+/*
+ * If we detect that a tcp connection to an osd resets, we need to
+ * resubmit all requests for that osd.  That's because although we reliably
+ * deliver our requests, the osd doesn't not try as hard to deliver the
+ * reply (because it does not get notification when clients, mds' leave
+ * the cluster).
+ */
+void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+			    struct ceph_entity_addr *addr)
+{
+	down_read(&osdc->map_sem);
+	kick_requests(osdc, addr);
+	up_read(&osdc->map_sem);
+}
+
+
+/*
+ * A read request prepares specific pages that data is to be read into.
+ * When a message is being read off the wire, we call prepare_pages to
+ * find those pages.
+ *  0 = success, -1 failure.
+ */
+int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
+{
+	struct ceph_client *client = p;
+	struct ceph_osd_client *osdc = &client->osdc;
+	struct ceph_osd_reply_head *rhead = m->front.iov_base;
+	struct ceph_osd_request *req;
+	u64 tid;
+	int ret = -1;
+	int type = le16_to_cpu(m->hdr.type);
+
+	dout(10, "prepare_pages on msg %p want %d\n", m, want);
+	if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
+		return -1;  /* hmm! */
+
+	tid = le64_to_cpu(rhead->tid);
+	mutex_lock(&osdc->request_mutex);
+	req = radix_tree_lookup(&osdc->request_tree, tid);
+	if (!req) {
+		dout(10, "prepare_pages unknown tid %llu\n", tid);
+		goto out;
+	}
+	dout(10, "prepare_pages tid %llu has %d pages, want %d\n",
+	     tid, req->r_num_pages, want);
+	if (likely(req->r_num_pages >= want && req->r_reply == NULL &&
+		    !req->r_aborted)) {
+		m->pages = req->r_pages;
+		m->nr_pages = req->r_num_pages;
+		ceph_msg_get(m);
+		req->r_reply = m;
+		ret = 0; /* success */
+	}
+out:
+	mutex_unlock(&osdc->request_mutex);
+	return ret;
+}
+
+/*
+ * Register request, send initial attempt.
+ */
+int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+			    struct ceph_osd_request *req)
+{
+	int rc;
+
+	req->r_request->pages = req->r_pages;
+	req->r_request->nr_pages = req->r_num_pages;
+
+	rc = register_request(osdc, req);
+	if (rc < 0)
+		return rc;
+
+	down_read(&osdc->map_sem);
+	rc = send_request(osdc, req);
+	up_read(&osdc->map_sem);
+	return rc;
+}
+
+int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
+			   struct ceph_osd_request *req)
+{
+	struct ceph_osd_reply_head *replyhead;
+	__s32 rc;
+	int bytes;
+
+	rc = wait_for_completion_interruptible(&req->r_completion);
+	if (rc < 0) {
+		ceph_osdc_abort_request(osdc, req);
+		return rc;
+	}
+
+	/* parse reply */
+	replyhead = req->r_reply->front.iov_base;
+	rc = le32_to_cpu(replyhead->result);
+	bytes = le32_to_cpu(req->r_reply->hdr.data_len);
+	dout(10, "wait_request tid %llu result %d, %d bytes\n",
+	     req->r_tid, rc, bytes);
+	if (rc < 0)
+		return rc;
+	return bytes;
+}
+
+/*
+ * To abort an in-progress request, take pages away from outgoing or
+ * incoming message.
+ */
+void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+			     struct ceph_osd_request *req)
+{
+	struct ceph_msg *msg;
+
+	dout(0, "abort_request tid %llu, revoking %p pages\n", req->r_tid,
+	     req->r_request);
+	/*
+	 * mark req aborted _before_ revoking pages, so that
+	 * if a racing kick_request _does_ dup the page vec
+	 * pointer, it will definitely then see the aborted
+	 * flag and not send the request.
+	 */
+	req->r_aborted = 1;
+	msg = req->r_request;
+	mutex_lock(&msg->page_mutex);
+	msg->pages = NULL;
+	mutex_unlock(&msg->page_mutex);
+	if (req->r_reply) {
+		mutex_lock(&req->r_reply->page_mutex);
+		req->r_reply->pages = NULL;
+		mutex_unlock(&req->r_reply->page_mutex);
+	}
+}
+
+void ceph_osdc_sync(struct ceph_osd_client *osdc)
+{
+	struct ceph_osd_request *req;
+	u64 last_tid, next_tid = 0;
+	int got;
+
+	mutex_lock(&osdc->request_mutex);
+	last_tid = osdc->last_tid;
+	while (1) {
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+		if (!got)
+			break;
+		if (req->r_tid > last_tid)
+			break;
+
+		next_tid = req->r_tid + 1;
+		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
+			continue;
+
+		ceph_osdc_get_request(req);
+		mutex_unlock(&osdc->request_mutex);
+		dout(10, "sync waiting on tid %llu (last is %llu)\n",
+		     req->r_tid, last_tid);
+		wait_for_completion(&req->r_safe_completion);
+		mutex_lock(&osdc->request_mutex);
+		ceph_osdc_put_request(req);
+	}
+	mutex_unlock(&osdc->request_mutex);
+	dout(10, "sync done (thru tid %llu)\n", last_tid);
+}
+
+/*
+ * init, shutdown
+ */
+void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
+{
+	dout(5, "init\n");
+	osdc->client = client;
+	osdc->osdmap = NULL;
+	init_rwsem(&osdc->map_sem);
+	init_completion(&osdc->map_waiters);
+	osdc->last_requested_map = 0;
+	mutex_init(&osdc->request_mutex);
+	osdc->timeout_tid = 0;
+	osdc->last_tid = 0;
+	INIT_RADIX_TREE(&osdc->request_tree, GFP_NOFS);
+	osdc->num_requests = 0;
+	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+}
+
+void ceph_osdc_stop(struct ceph_osd_client *osdc)
+{
+	cancel_delayed_work_sync(&osdc->timeout_work);
+	if (osdc->osdmap) {
+		ceph_osdmap_destroy(osdc->osdmap);
+		osdc->osdmap = NULL;
+	}
+}
+
+/*
+ * Read some contiguous pages.  Return number of bytes read (or
+ * zeroed).
+ */
+int ceph_osdc_readpages(struct ceph_osd_client *osdc,
+			struct ceph_vino vino, struct ceph_file_layout *layout,
+			u64 off, u64 len,
+			u32 truncate_seq, u64 truncate_size,
+			struct page **pages, int num_pages)
+{
+	struct ceph_osd_request *req;
+	int i;
+	struct page *page;
+	int rc = 0, read = 0;
+
+	dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
+	     vino.snap, off, len);
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+				    NULL, 0, truncate_seq, truncate_size, NULL);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+
+	/* it may be a short read due to an object boundary */
+	req->r_pages = pages;
+	num_pages = calc_pages_for(off, len);
+	req->r_num_pages = num_pages;
+
+	dout(10, "readpages final extent is %llu~%llu (%d pages)\n",
+	     off, len, req->r_num_pages);
+
+	rc = ceph_osdc_start_request(osdc, req);
+	if (!rc)
+		rc = ceph_osdc_wait_request(osdc, req);
+
+	if (rc >= 0) {
+		read = rc;
+		rc = len;
+	} else if (rc == -ENOENT) {
+		rc = len;
+	}
+
+	/* zero trailing pages on success */
+	if (read < (num_pages << PAGE_CACHE_SHIFT)) {
+		if (read & ~PAGE_CACHE_MASK) {
+			i = read >> PAGE_CACHE_SHIFT;
+			page = pages[i];
+			dout(20, "readpages zeroing %d %p from %d\n", i, page,
+			     (int)(read & ~PAGE_CACHE_MASK));
+			zero_user_segment(page, read & ~PAGE_CACHE_MASK,
+					  PAGE_CACHE_SIZE);
+			read += PAGE_CACHE_SIZE;
+		}
+		for (i = read >> PAGE_CACHE_SHIFT; i < num_pages; i++) {
+			page = req->r_pages[i];
+			dout(20, "readpages zeroing %d %p\n", i, page);
+			zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+		}
+	}
+
+	ceph_osdc_put_request(req);
+	dout(10, "readpages result %d\n", rc);
+	return rc;
+}
+
+/*
+ * do a sync write on N pages
+ */
+int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
+			 struct ceph_file_layout *layout,
+			 struct ceph_snap_context *snapc,
+			 u64 off, u64 len,
+			 u32 truncate_seq, u64 truncate_size,
+			 struct timespec *mtime,
+			 struct page **pages, int num_pages,
+			 int flags, int do_sync)
+{
+	struct ceph_osd_request *req;
+	int rc = 0;
+
+	BUG_ON(vino.snap != CEPH_NOSNAP);
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_WRITE,
+				    flags | CEPH_OSD_FLAG_ONDISK |
+					    CEPH_OSD_FLAG_WRITE,
+				    snapc, do_sync,
+				    truncate_seq, truncate_size, mtime);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+
+	/* it may be a short write due to an object boundary */
+	req->r_pages = pages;
+	req->r_num_pages = calc_pages_for(off, len);
+	dout(10, "writepages %llu~%llu (%d pages)\n", off, len,
+	     req->r_num_pages);
+
+	rc = ceph_osdc_start_request(osdc, req);
+	if (!rc)
+		rc = ceph_osdc_wait_request(osdc, req);
+
+	ceph_osdc_put_request(req);
+	if (rc == 0)
+		rc = len;
+	dout(10, "writepages result %d\n", rc);
+	return rc;
+}
+
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
new file mode 100644
index 0000000..9ad2559
--- /dev/null
+++ b/fs/ceph/osd_client.h
@@ -0,0 +1,151 @@
+#ifndef _FS_CEPH_OSD_CLIENT_H
+#define _FS_CEPH_OSD_CLIENT_H
+
+#include <linux/radix-tree.h>
+#include <linux/completion.h>
+
+#include "types.h"
+#include "osdmap.h"
+
+/*
+ * All data objects are stored within a cluster/cloud of OSDs, or
+ * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
+ * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
+ * remote daemons serving up and coordinating consistent and safe
+ * access to storage.
+ *
+ * Cluster membership and the mapping of data objects onto storage devices
+ * are described by the osd map.
+ *
+ * We keep track of pending OSD requests (read, write), resubmit
+ * requests to different OSDs when the cluster topology/data layout
+ * change, or retry the affected requests when the communications
+ * channel with an OSD is reset.
+ */
+
+struct ceph_msg;
+struct ceph_snap_context;
+struct ceph_osd_request;
+
+/*
+ * completion callback for async writepages
+ */
+typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
+
+struct ceph_osd_request_attr {
+	struct attribute attr;
+	ssize_t (*show)(struct ceph_osd_request *,
+			struct ceph_osd_request_attr *,
+			char *);
+	ssize_t (*store)(struct ceph_osd_request *,
+			 struct ceph_osd_request_attr *,
+			 const char *, size_t);
+};
+
+/* an in-flight request */
+struct ceph_osd_request {
+	u64             r_tid;              /* unique for this client */
+
+	struct ceph_msg  *r_request;
+	struct ceph_msg  *r_reply;
+	int               r_result;
+	int               r_flags;     /* any additional flags for the osd */
+	int               r_aborted;   /* set if we cancel this request */
+
+	atomic_t          r_ref;
+	struct completion r_completion, r_safe_completion;
+	ceph_osdc_callback_t r_callback, r_safe_callback;
+	struct ceph_eversion r_reassert_version;
+	struct list_head  r_unsafe_item;
+
+	struct inode *r_inode;         	      /* for use by callbacks */
+	struct writeback_control *r_wbc;      /* ditto */
+
+	char              r_oid[40];          /* object name */
+	int               r_oid_len;
+	int               r_last_osd;         /* pg osds */
+	struct ceph_entity_addr r_last_osd_addr;
+	unsigned long     r_timeout_stamp;
+
+	struct ceph_file_layout r_file_layout;
+	struct ceph_snap_context *r_snapc;    /* snap context for writes */
+	unsigned          r_num_pages;        /* size of page array (follows) */
+	struct page     **r_pages;            /* pages for data payload */
+	int               r_own_pages;        /* if true, i own page list */
+};
+
+struct ceph_osd_client {
+	struct ceph_client     *client;
+
+	struct ceph_osdmap     *osdmap;       /* current map */
+	struct rw_semaphore    map_sem;
+	struct completion      map_waiters;
+	u64                    last_requested_map;
+
+	struct mutex           request_mutex;
+	u64                    timeout_tid;   /* tid of timeout triggering rq */
+	u64                    last_tid;      /* tid of last request */
+	struct radix_tree_root request_tree;  /* pending requests, by tid */
+	int                    num_requests;
+	struct delayed_work    timeout_work;
+	struct dentry 	       *debugfs_file;
+};
+
+extern void ceph_osdc_init(struct ceph_osd_client *osdc,
+			   struct ceph_client *client);
+extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
+
+extern void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+				   struct ceph_entity_addr *addr);
+
+extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
+				   struct ceph_msg *msg);
+extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
+				 struct ceph_msg *msg);
+
+/* incoming read messages use this to discover which pages to read
+ * the data payload into. */
+extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
+
+extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
+				      struct ceph_file_layout *layout,
+				      struct ceph_vino vino,
+				      u64 offset, u64 *len, int op, int flags,
+				      struct ceph_snap_context *snapc,
+				      int do_sync, u32 truncate_seq,
+				      u64 truncate_size,
+				      struct timespec *mtime);
+
+static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
+{
+	atomic_inc(&req->r_ref);
+}
+extern void ceph_osdc_put_request(struct ceph_osd_request *req);
+
+extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+				   struct ceph_osd_request *req);
+extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
+				  struct ceph_osd_request *req);
+extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+				    struct ceph_osd_request *req);
+extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
+
+extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
+			       struct ceph_vino vino,
+			       struct ceph_file_layout *layout,
+			       u64 off, u64 len,
+			       u32 truncate_seq, u64 truncate_size,
+			       struct page **pages, int nr_pages);
+
+extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
+				struct ceph_vino vino,
+				struct ceph_file_layout *layout,
+				struct ceph_snap_context *sc,
+				u64 off, u64 len,
+				u32 truncate_seq, u64 truncate_size,
+				struct timespec *mtime,
+				struct page **pages, int nr_pages,
+				int flags, int do_sync);
+
+#endif
+
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
new file mode 100644
index 0000000..555c202
--- /dev/null
+++ b/fs/ceph/osdmap.c
@@ -0,0 +1,692 @@
+
+#include <asm/div64.h>
+
+#include "super.h"
+#include "osdmap.h"
+#include "crush/hash.h"
+#include "decode.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_osdmap __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_OSDMAP
+#define DOUT_VAR ceph_debug_osdmap
+
+
+char *ceph_osdmap_state_str(char *str, int len, int state)
+{
+	int flag = 0;
+
+	if (!len)
+		goto done;
+
+	*str = '\0';
+	if (state) {
+		if (state & CEPH_OSD_EXISTS) {
+			snprintf(str, len, "exists");
+			flag = 1;
+		}
+		if (state & CEPH_OSD_UP) {
+			snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
+				 "up");
+			flag = 1;
+		}
+	} else {
+		snprintf(str, len, "doesn't exist");
+	}
+done:
+	return str;
+}
+
+/* maps */
+
+static int calc_bits_of(unsigned t)
+{
+	int b = 0;
+	while (t) {
+		t = t >> 1;
+		b++;
+	}
+	return b;
+}
+
+/*
+ * the foo_mask is the smallest value 2^n-1 that is >= foo.
+ */
+static void calc_pg_masks(struct ceph_pg_pool_info *pi)
+{
+	pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1;
+	pi->pgp_num_mask =
+		(1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1;
+	pi->lpg_num_mask =
+		(1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1;
+	pi->lpgp_num_mask =
+		(1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1;
+}
+
+/*
+ * decode crush map
+ */
+static int crush_decode_uniform_bucket(void **p, void *end,
+				       struct crush_bucket_uniform *b)
+{
+	dout(30, "crush_decode_uniform_bucket %p to %p\n", *p, end);
+	ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
+	ceph_decode_32(p, b->item_weight);
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_list_bucket(void **p, void *end,
+				    struct crush_bucket_list *b)
+{
+	int j;
+	dout(30, "crush_decode_list_bucket %p to %p\n", *p, end);
+	b->item_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->item_weights == NULL)
+		return -ENOMEM;
+	b->sum_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->sum_weights == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++) {
+		ceph_decode_32(p, b->item_weights[j]);
+		ceph_decode_32(p, b->sum_weights[j]);
+	}
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_tree_bucket(void **p, void *end,
+				    struct crush_bucket_tree *b)
+{
+	int j;
+	dout(30, "crush_decode_tree_bucket %p to %p\n", *p, end);
+	ceph_decode_32_safe(p, end, b->num_nodes, bad);
+	b->node_weights = kmalloc(b->num_nodes * sizeof(u32), GFP_NOFS);
+	if (b->node_weights == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, b->num_nodes * sizeof(u32), bad);
+	for (j = 0; j < b->num_nodes; j++)
+		ceph_decode_32(p, b->node_weights[j]);
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_straw_bucket(void **p, void *end,
+				     struct crush_bucket_straw *b)
+{
+	int j;
+	dout(30, "crush_decode_straw_bucket %p to %p\n", *p, end);
+	b->item_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->item_weights == NULL)
+		return -ENOMEM;
+	b->straws = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->straws == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++) {
+		ceph_decode_32(p, b->item_weights[j]);
+		ceph_decode_32(p, b->straws[j]);
+	}
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static struct crush_map *crush_decode(void *pbyval, void *end)
+{
+	struct crush_map *c;
+	int err = -EINVAL;
+	int i, j;
+	void **p = &pbyval;
+	void *start = pbyval;
+	u32 magic;
+
+	dout(30, "crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
+
+	c = kzalloc(sizeof(*c), GFP_NOFS);
+	if (c == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(p, end, 4*sizeof(u32), bad);
+	ceph_decode_32(p, magic);
+	if (magic != CRUSH_MAGIC) {
+		derr(0, "crush_decode magic %x != current %x\n",
+		     (unsigned)magic, (unsigned)CRUSH_MAGIC);
+		goto bad;
+	}
+	ceph_decode_32(p, c->max_buckets);
+	ceph_decode_32(p, c->max_rules);
+	ceph_decode_32(p, c->max_devices);
+
+	c->device_parents = kmalloc(c->max_devices * sizeof(u32), GFP_NOFS);
+	if (c->device_parents == NULL)
+		goto badmem;
+	c->bucket_parents = kmalloc(c->max_buckets * sizeof(u32), GFP_NOFS);
+	if (c->bucket_parents == NULL)
+		goto badmem;
+
+	c->buckets = kmalloc(c->max_buckets * sizeof(*c->buckets), GFP_NOFS);
+	if (c->buckets == NULL)
+		goto badmem;
+	c->rules = kmalloc(c->max_rules * sizeof(*c->rules), GFP_NOFS);
+	if (c->rules == NULL)
+		goto badmem;
+
+	/* buckets */
+	for (i = 0; i < c->max_buckets; i++) {
+		int size = 0;
+		u32 alg;
+		struct crush_bucket *b;
+
+		ceph_decode_32_safe(p, end, alg, bad);
+		if (alg == 0) {
+			c->buckets[i] = NULL;
+			continue;
+		}
+		dout(30, "crush_decode bucket %d off %x %p to %p\n",
+		     i, (int)(*p-start), *p, end);
+
+		switch (alg) {
+		case CRUSH_BUCKET_UNIFORM:
+			size = sizeof(struct crush_bucket_uniform);
+			break;
+		case CRUSH_BUCKET_LIST:
+			size = sizeof(struct crush_bucket_list);
+			break;
+		case CRUSH_BUCKET_TREE:
+			size = sizeof(struct crush_bucket_tree);
+			break;
+		case CRUSH_BUCKET_STRAW:
+			size = sizeof(struct crush_bucket_straw);
+			break;
+		default:
+			goto bad;
+		}
+		BUG_ON(size == 0);
+		b = c->buckets[i] = kzalloc(size, GFP_NOFS);
+		if (b == NULL)
+			goto badmem;
+
+		ceph_decode_need(p, end, 4*sizeof(u32), bad);
+		ceph_decode_32(p, b->id);
+		ceph_decode_16(p, b->type);
+		ceph_decode_16(p, b->alg);
+		ceph_decode_32(p, b->weight);
+		ceph_decode_32(p, b->size);
+
+		dout(30, "crush_decode bucket size %d off %x %p to %p\n",
+		     b->size, (int)(*p-start), *p, end);
+
+		b->items = kmalloc(b->size * sizeof(__s32), GFP_NOFS);
+		if (b->items == NULL)
+			goto badmem;
+		b->perm = kmalloc(b->size * sizeof(u32), GFP_NOFS);
+		if (b->perm == NULL)
+			goto badmem;
+		b->perm_n = 0;
+
+		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
+		for (j = 0; j < b->size; j++)
+			ceph_decode_32(p, b->items[j]);
+
+		switch (b->alg) {
+		case CRUSH_BUCKET_UNIFORM:
+			err = crush_decode_uniform_bucket(p, end,
+				  (struct crush_bucket_uniform *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_LIST:
+			err = crush_decode_list_bucket(p, end,
+			       (struct crush_bucket_list *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_TREE:
+			err = crush_decode_tree_bucket(p, end,
+				(struct crush_bucket_tree *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_STRAW:
+			err = crush_decode_straw_bucket(p, end,
+				(struct crush_bucket_straw *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		}
+	}
+
+	/* rules */
+	dout(30, "rule vec is %p\n", c->rules);
+	for (i = 0; i < c->max_rules; i++) {
+		u32 yes;
+		struct crush_rule *r;
+
+		ceph_decode_32_safe(p, end, yes, bad);
+		if (!yes) {
+			dout(30, "crush_decode NO rule %d off %x %p to %p\n",
+			     i, (int)(*p-start), *p, end);
+			c->rules[i] = NULL;
+			continue;
+		}
+
+		dout(30, "crush_decode rule %d off %x %p to %p\n",
+		     i, (int)(*p-start), *p, end);
+
+		/* len */
+		ceph_decode_32_safe(p, end, yes, bad);
+
+		r = c->rules[i] = kmalloc(sizeof(*r) +
+					  yes*sizeof(struct crush_rule_step),
+					  GFP_NOFS);
+		if (r == NULL)
+			goto badmem;
+		dout(30, " rule %d is at %p\n", i, r);
+		r->len = yes;
+		ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
+		ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
+		for (j = 0; j < r->len; j++) {
+			ceph_decode_32(p, r->steps[j].op);
+			ceph_decode_32(p, r->steps[j].arg1);
+			ceph_decode_32(p, r->steps[j].arg2);
+		}
+	}
+
+	/* ignore trailing name maps. */
+
+	dout(30, "crush_decode success\n");
+	return c;
+
+badmem:
+	err = -ENOMEM;
+bad:
+	dout(30, "crush_decode fail %d\n", err);
+	crush_destroy(c);
+	return ERR_PTR(err);
+}
+
+
+/*
+ * osd map
+ */
+void ceph_osdmap_destroy(struct ceph_osdmap *map)
+{
+	dout(10, "osdmap_destroy %p\n", map);
+	if (map->crush)
+		crush_destroy(map->crush);
+	kfree(map->osd_state);
+	kfree(map->osd_weight);
+	kfree(map->pg_pool);
+	kfree(map->osd_addr);
+	kfree(map);
+}
+
+/*
+ * adjust max osd value.  reallocate arrays.
+ */
+static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
+{
+	u8 *state;
+	struct ceph_entity_addr *addr;
+	u32 *weight;
+
+	state = kzalloc(max * sizeof(*state), GFP_NOFS);
+	addr = kzalloc(max * sizeof(*addr), GFP_NOFS);
+	weight = kzalloc(max * sizeof(*weight), GFP_NOFS);
+	if (state == NULL || addr == NULL || weight == NULL) {
+		kfree(state);
+		kfree(addr);
+		kfree(weight);
+		return -ENOMEM;
+	}
+
+	/* copy old? */
+	if (map->osd_state) {
+		memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
+		memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
+		memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
+		kfree(map->osd_state);
+		kfree(map->osd_addr);
+		kfree(map->osd_weight);
+	}
+
+	map->osd_state = state;
+	map->osd_weight = weight;
+	map->osd_addr = addr;
+	map->max_osd = max;
+	return 0;
+}
+
+/*
+ * decode a full map.
+ */
+struct ceph_osdmap *osdmap_decode(void **p, void *end)
+{
+	struct ceph_osdmap *map;
+	u32 len, max, i;
+	int err = -EINVAL;
+	void *start = *p;
+
+	dout(30, "osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p));
+
+	map = kzalloc(sizeof(*map), GFP_NOFS);
+	if (map == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(p, end, 2*sizeof(u64)+6*sizeof(u32), bad);
+	ceph_decode_copy(p, &map->fsid, sizeof(map->fsid));
+	ceph_decode_32(p, map->epoch);
+	ceph_decode_copy(p, &map->created, sizeof(map->created));
+	ceph_decode_copy(p, &map->modified, sizeof(map->modified));
+
+	ceph_decode_32(p, map->num_pools);
+	map->pg_pool = kmalloc(map->num_pools * sizeof(*map->pg_pool),
+			       GFP_NOFS);
+	if (!map->pg_pool) {
+		err = -ENOMEM;
+		goto bad;
+	}
+	ceph_decode_32_safe(p, end, max, bad);
+	while (max--) {
+		ceph_decode_need(p, end, 4+sizeof(map->pg_pool->v), bad);
+		ceph_decode_32(p, i);
+		if (i >= map->num_pools)
+			goto bad;
+		ceph_decode_copy(p, &map->pg_pool[i].v,
+				 sizeof(map->pg_pool->v));
+		calc_pg_masks(&map->pg_pool[i]);
+		p += le32_to_cpu(map->pg_pool[i].v.num_snaps) * sizeof(u64);
+		p += le32_to_cpu(map->pg_pool[i].v.num_removed_snap_intervals)
+			* sizeof(u64) * 2;
+	}
+
+	ceph_decode_32_safe(p, end, map->flags, bad);
+
+	ceph_decode_32(p, max);
+
+	/* (re)alloc osd arrays */
+	err = osdmap_set_max_osd(map, max);
+	if (err < 0)
+		goto bad;
+	dout(30, "osdmap_decode max_osd = %d\n", map->max_osd);
+
+	/* osds */
+	err = -EINVAL;
+	ceph_decode_need(p, end, 3*sizeof(u32) +
+			 map->max_osd*(1 + sizeof(*map->osd_weight) +
+				       sizeof(*map->osd_addr)), bad);
+	*p += 4; /* skip length field (should match max) */
+	ceph_decode_copy(p, map->osd_state, map->max_osd);
+
+	*p += 4; /* skip length field (should match max) */
+	for (i = 0; i < map->max_osd; i++)
+		ceph_decode_32(p, map->osd_weight[i]);
+
+	*p += 4; /* skip length field (should match max) */
+	ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
+
+	/* crush */
+	ceph_decode_32_safe(p, end, len, bad);
+	dout(30, "osdmap_decode crush len %d from off 0x%x\n", len,
+	     (int)(*p - start));
+	ceph_decode_need(p, end, len, bad);
+	map->crush = crush_decode(*p, end);
+	*p += len;
+	if (IS_ERR(map->crush)) {
+		err = PTR_ERR(map->crush);
+		map->crush = NULL;
+		goto bad;
+	}
+
+	/* ignore the rest of the map */
+	*p = end;
+
+	dout(30, "osdmap_decode done %p %p\n", *p, end);
+	return map;
+
+bad:
+	dout(30, "osdmap_decode fail\n");
+	ceph_osdmap_destroy(map);
+	return ERR_PTR(err);
+}
+
+/*
+ * decode and apply an incremental map update.
+ */
+struct ceph_osdmap *apply_incremental(void **p, void *end,
+				      struct ceph_osdmap *map,
+				      struct ceph_messenger *msgr)
+{
+	struct ceph_osdmap *newmap = map;
+	struct crush_map *newcrush = NULL;
+	ceph_fsid_t fsid;
+	u32 epoch = 0;
+	struct ceph_timespec modified;
+	u32 len, pool;
+	__s32 new_flags, max;
+	void *start = *p;
+	int err = -EINVAL;
+
+	ceph_decode_need(p, end, sizeof(fsid)+sizeof(modified)+2*sizeof(u32),
+			 bad);
+	ceph_decode_copy(p, &fsid, sizeof(fsid));
+	ceph_decode_32(p, epoch);
+	BUG_ON(epoch != map->epoch+1);
+	ceph_decode_copy(p, &modified, sizeof(modified));
+	ceph_decode_32(p, new_flags);
+
+	/* full map? */
+	ceph_decode_32_safe(p, end, len, bad);
+	if (len > 0) {
+		dout(20, "apply_incremental full map len %d, %p to %p\n",
+		     len, *p, end);
+		newmap = osdmap_decode(p, min(*p+len, end));
+		return newmap;  /* error or not */
+	}
+
+	/* new crush? */
+	ceph_decode_32_safe(p, end, len, bad);
+	if (len > 0) {
+		dout(20, "apply_incremental new crush map len %d, %p to %p\n",
+		     len, *p, end);
+		newcrush = crush_decode(*p, min(*p+len, end));
+		if (IS_ERR(newcrush))
+			return ERR_PTR(PTR_ERR(newcrush));
+	}
+
+	/* new flags? */
+	if (new_flags >= 0)
+		map->flags = new_flags;
+
+	ceph_decode_need(p, end, 5*sizeof(u32), bad);
+
+	/* new max? */
+	ceph_decode_32(p, max);
+	if (max >= 0) {
+		err = osdmap_set_max_osd(map, max);
+		if (err < 0)
+			goto bad;
+	}
+
+	map->epoch++;
+	map->modified = map->modified;
+	if (newcrush) {
+		if (map->crush)
+			crush_destroy(map->crush);
+		map->crush = newcrush;
+		newcrush = NULL;
+	}
+
+	/* new_pool */
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		ceph_decode_32_safe(p, end, pool, bad);
+		if (pool >= map->num_pools) {
+			void *pg_pool = kzalloc((pool+1)*sizeof(*map->pg_pool),
+					  GFP_NOFS);
+			if (!pg_pool) {
+				err = -ENOMEM;
+				goto bad;
+			}
+			memcpy(pg_pool, map->pg_pool,
+			       map->num_pools * sizeof(*map->pg_pool));
+			kfree(map->pg_pool);
+			map->pg_pool = pg_pool;
+			map->num_pools = pool+1;
+		}
+		ceph_decode_copy(p, &map->pg_pool[pool].v,
+				 sizeof(map->pg_pool->v));
+		calc_pg_masks(&map->pg_pool[pool]);
+	}
+
+	/* old_pool (ignore) */
+	ceph_decode_32_safe(p, end, len, bad);
+	*p += len * sizeof(u32);
+
+	/* new_up */
+	err = -EINVAL;
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd;
+		struct ceph_entity_addr addr;
+		ceph_decode_32_safe(p, end, osd, bad);
+		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad);
+		dout(1, "osd%d up\n", osd);
+		BUG_ON(osd >= map->max_osd);
+		map->osd_state[osd] |= CEPH_OSD_UP;
+		map->osd_addr[osd] = addr;
+	}
+
+	/* new_down */
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd;
+		ceph_decode_32_safe(p, end, osd, bad);
+		(*p)++;  /* clean flag */
+		dout(1, "osd%d down\n", osd);
+		if (osd < map->max_osd) {
+			map->osd_state[osd] &= ~CEPH_OSD_UP;
+			ceph_messenger_mark_down(msgr, &map->osd_addr[osd]);
+		}
+	}
+
+	/* new_weight */
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd, off;
+		ceph_decode_need(p, end, sizeof(u32)*2, bad);
+		ceph_decode_32(p, osd);
+		ceph_decode_32(p, off);
+		dout(1, "osd%d weight 0x%x %s\n", osd, off,
+		     off == CEPH_OSD_IN ? "(in)" :
+		     (off == CEPH_OSD_OUT ? "(out)" : ""));
+		if (osd < map->max_osd)
+			map->osd_weight[osd] = off;
+	}
+
+	/* ignore the rest */
+	*p = end;
+	return map;
+
+bad:
+	derr(10, "corrupt incremental osdmap epoch %d off %d (%p of %p-%p)\n",
+	     epoch, (int)(*p - start), *p, start, end);
+	if (newcrush)
+		crush_destroy(newcrush);
+	return ERR_PTR(err);
+}
+
+
+
+
+/*
+ * calculate file layout from given offset, length.
+ * fill in correct oid, logical length, and object extent
+ * offset, length.
+ *
+ * for now, we write only a single su, until we can
+ * pass a stride back to the caller.
+ */
+void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+				   u64 off, u64 *plen,
+				   u64 *bno,
+				   u64 *oxoff, u64 *oxlen)
+{
+	u32 osize = le32_to_cpu(layout->fl_object_size);
+	u32 su = le32_to_cpu(layout->fl_stripe_unit);
+	u32 sc = le32_to_cpu(layout->fl_stripe_count);
+	u32 bl, stripeno, stripepos, objsetno;
+	u32 su_per_object;
+	u64 t;
+
+	dout(80, "mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
+	     osize, su);
+	su_per_object = osize / le32_to_cpu(layout->fl_stripe_unit);
+	dout(80, "osize %u / su %u = su_per_object %u\n", osize, su,
+	     su_per_object);
+
+	BUG_ON((su & ~PAGE_MASK) != 0);
+	/* bl = *off / su; */
+	t = off;
+	do_div(t, su);
+	bl = t;
+	dout(80, "off %llu / su %u = bl %u\n", off, su, bl);
+
+	stripeno = bl / sc;
+	stripepos = bl % sc;
+	objsetno = stripeno / su_per_object;
+
+	*bno = cpu_to_le32(objsetno * sc + stripepos);
+	dout(80, "objset %u * sc %u = bno %u\n", objsetno, sc, (unsigned)*bno);
+	/* *oxoff = *off / layout->fl_stripe_unit; */
+	t = off;
+	*oxoff = do_div(t, su);
+	*oxlen = min_t(u64, *plen, su - *oxoff);
+	*plen = *oxlen;
+
+	dout(80, " obj extent %llu~%llu\n", *oxoff, *oxlen);
+}
+
+/*
+ * calculate an object layout (i.e. pgid) from an oid,
+ * file_layout, and osdmap
+ */
+int ceph_calc_object_layout(struct ceph_object_layout *ol,
+			    const char *oid,
+			    struct ceph_file_layout *fl,
+			    struct ceph_osdmap *osdmap)
+{
+	unsigned num, num_mask;
+	union ceph_pg pgid;
+	s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
+	int poolid = le32_to_cpu(fl->fl_pg_pool);
+	struct ceph_pg_pool_info *pool;
+
+	if (poolid >= osdmap->num_pools)
+		return -EIO;
+	pool = &osdmap->pg_pool[poolid];
+
+	if (preferred >= 0) {
+		num = le32_to_cpu(pool->v.lpg_num);
+		num_mask = pool->lpg_num_mask;
+	} else {
+		num = le32_to_cpu(pool->v.pg_num);
+		num_mask = pool->pg_num_mask;
+	}
+
+	pgid.pg64 = 0;   /* start with it zeroed out */
+	pgid.pg.ps = ceph_full_name_hash(oid, strlen(oid));
+	pgid.pg.preferred = preferred;
+	pgid.pg.pool = le32_to_cpu(fl->fl_pg_pool);
+
+	ol->ol_pgid = cpu_to_le64(pgid.pg64);
+	ol->ol_stripe_unit = fl->fl_object_stripe_unit;
+
+	return 0;
+}
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
new file mode 100644
index 0000000..757aaf5
--- /dev/null
+++ b/fs/ceph/osdmap.h
@@ -0,0 +1,83 @@
+#ifndef _FS_CEPH_OSDMAP_H
+#define _FS_CEPH_OSDMAP_H
+
+#include "types.h"
+#include "ceph_fs.h"
+#include "crush/crush.h"
+
+/*
+ * The osd map describes the current membership of the osd cluster and
+ * specifies the mapping of objects to placement groups and placement
+ * groups to (sets of) osds.  That is, it completely specifies the
+ * (desired) distribution of all data objects in the system at some
+ * point in time.
+ *
+ * Each map version is identified by an epoch, which increases monotonically.
+ *
+ * The map can be updated either via an incremental map (diff) describing
+ * the change between two successive epochs, or as a fully encoded map.
+ */
+struct ceph_pg_pool_info {
+	struct ceph_pg_pool v;
+	int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
+};
+
+struct ceph_osdmap {
+	ceph_fsid_t fsid;
+	u32 epoch;
+	u32 mkfs_epoch;
+	struct ceph_timespec created, modified;
+
+	u32 flags;         /* CEPH_OSDMAP_* */
+
+	u32 max_osd;       /* size of osd_state, _offload, _addr arrays */
+	u8 *osd_state;     /* CEPH_OSD_* */
+	u32 *osd_weight;   /* 0 = failed, 0x10000 = 100% normal */
+	struct ceph_entity_addr *osd_addr;
+
+	u32 num_pools;
+	struct ceph_pg_pool_info *pg_pool;
+
+	/* the CRUSH map specifies the mapping of placement groups to
+	 * the list of osds that store+replicate them. */
+	struct crush_map *crush;
+};
+
+static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
+{
+	return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP);
+}
+
+static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
+{
+	return map && (map->flags & flag);
+}
+
+extern char *ceph_osdmap_state_str(char *str, int len, int state);
+
+static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
+						     int osd)
+{
+	if (osd >= map->max_osd)
+		return 0;
+	return &map->osd_addr[osd];
+}
+
+extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
+extern struct ceph_osdmap *apply_incremental(void **p, void *end,
+					     struct ceph_osdmap *map,
+					     struct ceph_messenger *msgr);
+extern void ceph_osdmap_destroy(struct ceph_osdmap *map);
+
+/* calculate mapping of a file extent to an object */
+extern void ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
+					  u64 off, u64 *plen,
+					  u64 *bno, u64 *oxoff, u64 *oxlen);
+
+/* calculate mapping of object to a placement group */
+extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
+				   const char *oid,
+				   struct ceph_file_layout *fl,
+				   struct ceph_osdmap *osdmap);
+
+#endif
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux