[PATCHv2 2/3] rbd: combine object method calls in header refresh using fewer ceph_msgs

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Signed-off-by: Douglas Fuller <douglas.fuller@xxxxxxxxx>
---
 drivers/block/rbd.c             | 331 ++++++++++++++++++++++++++++++++++++----
 include/linux/ceph/osd_client.h |   2 +-
 2 files changed, 306 insertions(+), 27 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8125233..9184b43 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -4667,20 +4667,23 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 	bool first_time = rbd_dev->header.object_prefix == NULL;
 	int ret;
 
-	ret = rbd_dev_v2_image_size(rbd_dev);
-	if (ret)
-		return ret;
+	if (!first_time)
+	{
+		ret = rbd_dev_v2_image_size(rbd_dev);
+		if (ret)
+			return ret;
 
-	if (first_time) {
+		ret = rbd_dev_v2_snap_context(rbd_dev);
+		dout("rbd_dev_v2_snap_context returned %d\n", ret);
+		return ret;
+	}
+	else
+	{
 		ret = rbd_dev_v2_header_onetime(rbd_dev);
 		if (ret)
 			return ret;
 	}
-
-	ret = rbd_dev_v2_snap_context(rbd_dev);
-	dout("rbd_dev_v2_snap_context returned %d\n", ret);
-
-	return ret;
+	return ret; /* XXX change logic? */
 }
 
 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
@@ -5091,36 +5094,312 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 	memset(header, 0, sizeof (*header));
 }
 
+static struct ceph_osd_request * rbd_obj_header_req_alloc(
+		struct rbd_device *rbd_dev,
+		int num_ops)
+{
+	struct ceph_osd_request *osd_req;
+	struct page ***replies;
+	
+	replies = kmalloc(CEPH_OSD_MAX_OP * sizeof(*replies), GFP_KERNEL);
+	if (!replies)
+		return NULL;
+
+	osd_req = ceph_osdc_alloc_request(&rbd_dev->rbd_client->client->osdc,
+		NULL, num_ops, false, GFP_KERNEL);
+	if (!osd_req) {
+		kfree(replies);
+		return NULL;
+	}
+
+	osd_req->r_flags = CEPH_OSD_FLAG_READ;
+	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+	osd_req->r_priv = replies;
+	ceph_oid_set_name(&osd_req->r_base_oid, rbd_dev->header_name);
+
+	return osd_req;
+}
+
+static int rbd_obj_header_method_add(struct ceph_osd_request *osd_req,
+		int which,
+		const char *class_name,
+		const char *method_name,
+		const void *outbound,
+		size_t outbound_size,
+		size_t inbound_size)
+{
+	u32 page_count;
+	struct page ***replies = osd_req->r_priv;
+
+	BUG_ON(which >= osd_req->r_num_ops);
+
+	osd_req_op_cls_init(osd_req, which, CEPH_OSD_OP_CALL,
+		class_name, method_name);
+
+	if (outbound_size) {
+		struct ceph_pagelist *pagelist;
+		pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+		/* XXX is this ever freed? */
+		if (!pagelist)
+			return -ENOMEM;
+
+		ceph_pagelist_init(pagelist);
+		ceph_pagelist_append(pagelist, outbound, outbound_size);
+		osd_req_op_cls_request_data_pagelist(osd_req, which, pagelist);
+	}
+
+	page_count = (u32)calc_pages_for(0, inbound_size);
+	replies[which] = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(replies[which]))
+		return PTR_ERR(replies[which]);
+	osd_req_op_cls_response_data_pages(osd_req, which, replies[which],
+		inbound_size, 0, false, false);
+
+	return 0;
+}
+
+static int rbd_obj_header_req_exec(struct rbd_device * rbd_dev,
+		struct ceph_osd_request *osd_req)
+{
+	int ret;
+
+	ceph_osdc_build_request(osd_req, 0, NULL, cpu_to_le64(CEPH_NOSNAP), NULL);
+	ret = ceph_osdc_start_request(&rbd_dev->rbd_client->client->osdc,
+		osd_req, false);
+	if (ret) {
+		ceph_osdc_put_request(osd_req);
+		return ret;
+	}
+
+	ret = ceph_osdc_wait_request(&rbd_dev->rbd_client->client->osdc, osd_req);
+	if (ret < 0) {
+		ceph_osdc_cancel_request(osd_req);
+		return ret;
+	}
+
+	return 0;
+}
+
+static int rbd_obj_header_method_retrieve(struct ceph_osd_request *osd_req,
+		int which, void *buf, size_t length)
+{
+	struct page ***replies = (struct page ***)osd_req->r_priv;
+	size_t reply_length = osd_req->r_reply_op_len[which];
+
+	BUG_ON(reply_length > length);
+
+	ceph_copy_from_page_vector(replies[which], buf, 0, reply_length);
+	ceph_release_page_vector(replies[which], calc_pages_for(0, reply_length));
+
+	return reply_length;
+}
+
+static void rbd_obj_header_req_destroy(struct ceph_osd_request *osd_req)
+{
+	if (osd_req) {
+		kfree(osd_req->r_priv);
+		ceph_osdc_put_request(osd_req);
+	}
+}
+
+static int __extract_prefix(struct rbd_device *rbd_dev,
+		struct ceph_osd_request *osd_req,
+		int which)
+{
+	void *prefix_buf;
+	void * p;
+	size_t prefix_buflen;
+	int ret;
+
+	prefix_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+	if (!prefix_buf)
+		return -ENOMEM;
+
+	prefix_buflen = rbd_obj_header_method_retrieve(osd_req, which,
+		prefix_buf, RBD_OBJ_PREFIX_LEN_MAX);
+	p = prefix_buf;
+	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+		p + prefix_buflen, NULL, GFP_KERNEL);
+
+	if (IS_ERR(rbd_dev->header.object_prefix)) {
+		ret = PTR_ERR(rbd_dev->header.object_prefix);
+		rbd_dev->header.object_prefix = NULL;
+		goto out_err;
+	}
+
+	ret = 0;
+
+out_err:
+	kfree(prefix_buf);
+	return ret;
+}
+
+static int __extract_features(struct rbd_device *rbd_dev,
+		struct ceph_osd_request *osd_req,
+		int which)
+{
+	int ret;
+	struct {
+		__le64 features;
+		__le64 incompat;
+	} __attribute__((packed)) features_buf = { 0 };
+	u64 incompat;
+
+	rbd_obj_header_method_retrieve(osd_req, which,
+		&features_buf, sizeof(features_buf));
+	incompat = le64_to_cpu(features_buf.incompat);
+	if (incompat & ~RBD_FEATURES_SUPPORTED) {
+		ret = -ENXIO;
+		goto out_err;
+	}
+	rbd_dev->header.features = le64_to_cpu(features_buf.features);
+
+	ret = 0;
+
+out_err:
+	return ret;
+}
+
+static int __extract_snapcontext(struct rbd_device *rbd_dev,
+		struct ceph_osd_request *osd_req,
+		int which)
+{
+	void *snapc_buf;
+	size_t snapc_max;
+	size_t snapc_buflen;
+
+	void *p;
+	void *q;
+
+	struct ceph_snap_context * snapc;
+
+	u32 seq;
+	u32 snap_count;
+
+	int i;
+	int ret;
+
+	snapc_max = sizeof(__le64) + sizeof(__le32) +
+		RBD_MAX_SNAP_COUNT * sizeof(__le64);
+
+	snapc_buf = kzalloc(snapc_max, GFP_KERNEL);
+	if (!snapc_buf) {
+		ret = -ENOMEM;
+		goto out_err;
+	}
+
+	snapc_buflen = rbd_obj_header_method_retrieve(osd_req, which,
+		snapc_buf, snapc_max);
+
+	p = snapc_buf;
+	q = snapc_buf + snapc_buflen;
+	ret = -ERANGE;
+	ceph_decode_64_safe(&p, q, seq, out_err);
+	ceph_decode_32_safe(&p, q, snap_count, out_err);
+
+	if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context)) /
+		sizeof(u64)) {
+		ret = -EINVAL;
+		goto out_err;
+	}
+	if (!ceph_has_room(&p, q, snap_count * sizeof(__le64)))
+		goto out_err;
+
+	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
+	if (!snapc) {
+		ret = -ENOMEM;
+	}
+
+	snapc->seq = seq;
+	for (i=0; i < snap_count; i++)
+		snapc->snaps[i] = ceph_decode_64(&p);
+	ceph_put_snap_context(rbd_dev->header.snapc);
+	rbd_dev->header.snapc = snapc;
+
+	ret = 0;
+out_err:
+	kfree(snapc_buf);
+	return ret;
+}
+
+static int __extract_size(struct rbd_device *rbd_dev,
+		struct ceph_osd_request *osd_req,
+		int which)
+{
+	struct {
+		u8 order;
+		__le64 size;
+	} __attribute((packed)) size_buf = { 0 };
+
+	rbd_obj_header_method_retrieve(osd_req, which, &size_buf, sizeof(size_buf));
+
+	rbd_dev->header.obj_order = size_buf.order;
+	rbd_dev->header.image_size = le64_to_cpu(size_buf.size);
+
+	return 0;
+}
+
 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	ret = rbd_dev_v2_object_prefix(rbd_dev);
+	size_t snapc_max;
+	__le64 snapid = cpu_to_le64(CEPH_NOSNAP);
+	struct ceph_osd_request *osd_req;
+
+	snapc_max = sizeof(__le64) + sizeof(__le32) +
+		RBD_MAX_SNAP_COUNT * sizeof(__le64);
+
+	osd_req = rbd_obj_header_req_alloc(rbd_dev, 4);
+
+	if (!osd_req) {
+		ret = -ENOMEM;
+		goto out_err;
+	}
+
+	ret = rbd_obj_header_method_add(osd_req, 0,
+		"rbd", "get_object_prefix",
+		NULL, 0, RBD_OBJ_PREFIX_LEN_MAX);
 	if (ret)
 		goto out_err;
 
-	/*
-	 * Get the and check features for the image.  Currently the
-	 * features are assumed to never change.
-	 */
-	ret = rbd_dev_v2_features(rbd_dev);
+	ret = rbd_obj_header_method_add(osd_req, 1,
+		"rbd", "get_features",
+		&snapid, sizeof(snapid),
+		sizeof(struct{__le64 features;__le64 incompat;} __attribute__((packed))));
 	if (ret)
 		goto out_err;
 
-	/* If the image supports fancy striping, get its parameters */
+	ret = rbd_obj_header_method_add(osd_req, 2,
+		"rbd", "get_snapcontext",
+		NULL, 0, snapc_max);
+	if (ret)
+		goto out_err;
 
-	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
-		ret = rbd_dev_v2_striping_info(rbd_dev);
-		if (ret < 0)
-			goto out_err;
-	}
-	/* No support for crypto and compression type format 2 images */
+	ret = rbd_obj_header_method_add(osd_req, 3,
+		"rbd", "get_size",
+		&snapid, sizeof(snapid),
+		sizeof(struct{u8 order; __le64 size;}__attribute((packed))));
+	if (ret)
+		goto out_err;
+
+	ret = rbd_obj_header_req_exec(rbd_dev, osd_req);
+	if (ret)
+		goto out_err;
+
+	if ((ret = __extract_prefix(rbd_dev, osd_req, 0)))
+		goto out_err;
+	if ((ret = __extract_features(rbd_dev, osd_req, 1)))
+		goto out_err;
+	if ((ret = __extract_snapcontext(rbd_dev, osd_req, 2)))
+		goto out_err;
+	if ((ret = __extract_size(rbd_dev, osd_req, 3)))
+		goto out_err;
+
+	ret = 0;
 
-	return 0;
 out_err:
-	rbd_dev->header.features = 0;
-	kfree(rbd_dev->header.object_prefix);
-	rbd_dev->header.object_prefix = NULL;
+	rbd_obj_header_req_destroy(osd_req);
 
 	return ret;
 }
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 65fcf80..71c946d 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -43,7 +43,7 @@ struct ceph_osd {
 };
 
 
-#define CEPH_OSD_MAX_OP	3
+#define CEPH_OSD_MAX_OP	4
 
 enum ceph_osd_data_type {
 	CEPH_OSD_DATA_TYPE_NONE = 0,
-- 
1.9.3

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux