[PATCH 3/6] libceph: move message allocation out of ceph_osdc_alloc_request()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



The size of ->r_request and ->r_reply messages depends on the size of
the object name (ceph_object_id), while the size of ceph_osd_request is
fixed.  Move message allocation into a separate function that would
have to be called after ceph_object_id and ceph_object_locator (which
is also going to become variable in size with RADOS namespaces) have
been filled in:

    req = ceph_osdc_alloc_request(...);
    <fill in req->r_base_oid>
    <fill in req->r_base_oloc>
    ceph_osdc_alloc_messages(req);

Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx>
---
 drivers/block/rbd.c             | 18 ++++++++-
 fs/ceph/addr.c                  |  8 ++++
 fs/ceph/file.c                  |  7 ++++
 include/linux/ceph/osd_client.h |  1 +
 net/ceph/osd_client.c           | 88 +++++++++++++++++++++++------------------
 5 files changed, 82 insertions(+), 40 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b1e68dacba18..a90c9291cbf9 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1954,7 +1954,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
 	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
 					  GFP_NOIO);
 	if (!osd_req)
-		return NULL;	/* ENOMEM */
+		goto fail;
 
 	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
 		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
@@ -1967,7 +1967,14 @@ static struct ceph_osd_request *rbd_osd_req_create(
 	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
 	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
 
+	if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
+		goto fail;
+
 	return osd_req;
+
+fail:
+	ceph_osdc_put_request(osd_req);
+	return NULL;
 }
 
 /*
@@ -2003,7 +2010,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
 	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
 						false, GFP_NOIO);
 	if (!osd_req)
-		return NULL;	/* ENOMEM */
+		goto fail;
 
 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
 	osd_req->r_callback = rbd_osd_req_callback;
@@ -2012,7 +2019,14 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
 	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
 	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
 
+	if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO))
+		goto fail;
+
 	return osd_req;
+
+fail:
+	ceph_osdc_put_request(osd_req);
+	return NULL;
 }
 
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 3e61fc8bb371..6fee7e0b8931 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1762,6 +1762,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 		 "%llx.00000000", ci->i_vino.ino);
 	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
 
+	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
+	if (err)
+		goto out_unlock;
+
 	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!wr_req) {
@@ -1775,6 +1779,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 	wr_req->r_base_oloc.pool = pool;
 	wr_req->r_base_oid = rd_req->r_base_oid;
 
+	err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
+	if (err)
+		goto out_unlock;
+
 	/* one page should be large enough for STAT data */
 	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
 	if (IS_ERR(pages)) {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index a79f9269831e..5d46d106bbb7 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -717,6 +717,13 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	req->r_base_oloc = orig_req->r_base_oloc;
 	req->r_base_oid = orig_req->r_base_oid;
 
+	ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
+	if (ret) {
+		ceph_osdc_put_request(req);
+		req = orig_req;
+		goto out;
+	}
+
 	req->r_ops[0] = orig_req->r_ops[0];
 	osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index cbf460927c42..66a1fcd5bff7 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -322,6 +322,7 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
 					       unsigned int num_ops,
 					       bool use_mempool,
 					       gfp_t gfp_flags);
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp);
 
 extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off,
 				    struct ceph_snap_context *snapc,
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ccb9539dc780..d66dacc9d0d4 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -369,8 +369,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       gfp_t gfp_flags)
 {
 	struct ceph_osd_request *req;
-	struct ceph_msg *msg;
-	size_t msg_size;
 
 	if (use_mempool) {
 		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
@@ -407,53 +405,59 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	req->r_base_oloc.pool = -1;
 	req->r_target_oloc.pool = -1;
 
-	msg_size = OSD_OPREPLY_FRONT_LEN;
-	if (num_ops > CEPH_OSD_SLAB_OPS) {
-		/* ceph_osd_op and rval */
-		msg_size += (num_ops - CEPH_OSD_SLAB_OPS) *
-			    (sizeof(struct ceph_osd_op) + 4);
-	}
+	dout("%s req %p\n", __func__, req);
+	return req;
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
-	/* create reply message */
-	if (use_mempool)
-		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
-	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size,
-				   gfp_flags, true);
-	if (!msg) {
-		ceph_osdc_put_request(req);
-		return NULL;
-	}
-	req->r_reply = msg;
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_msg *msg;
+	int msg_size;
 
+	/* create request message */
 	msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
 	msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
 	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
 	msg_size += 1 + 8 + 4 + 4; /* pgid */
-	msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */
-	msg_size += 2 + num_ops * sizeof(struct ceph_osd_op);
+	msg_size += 4 + req->r_base_oid.name_len; /* oid */
+	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
 	msg_size += 8; /* snapid */
 	msg_size += 8; /* snap_seq */
-	msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */
+	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
 	msg_size += 4; /* retry_attempt */
 
-	/* create request message; allow space for oid */
-	if (use_mempool)
+	if (req->r_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
-	if (!msg) {
-		ceph_osdc_put_request(req);
-		return NULL;
-	}
+		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+	if (!msg)
+		return -ENOMEM;
 
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
-
 	req->r_request = msg;
 
-	return req;
+	/* create reply message */
+	msg_size = OSD_OPREPLY_FRONT_LEN;
+	if (req->r_num_ops > CEPH_OSD_SLAB_OPS) {
+		/* ceph_osd_op and rval */
+		msg_size += (req->r_num_ops - CEPH_OSD_SLAB_OPS) *
+			    (sizeof(struct ceph_osd_op) + 4);
+	}
+
+	if (req->r_mempool)
+		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+	else
+		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+	if (!msg)
+		return -ENOMEM;
+
+	req->r_reply = msg;
+
+	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_alloc_request);
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 
 static bool osd_req_opcode_valid(u16 opcode)
 {
@@ -828,17 +832,17 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
-	if (!req)
-		return ERR_PTR(-ENOMEM);
+	if (!req) {
+		r = -ENOMEM;
+		goto fail;
+	}
 
 	req->r_flags = flags;
 
 	/* calculate max write size */
 	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
-	if (r < 0) {
-		ceph_osdc_put_request(req);
-		return ERR_PTR(r);
-	}
+	if (r)
+		goto fail;
 
 	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
 		osd_req_op_init(req, which, opcode, 0);
@@ -864,7 +868,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 		 "%llx.%08llx", vino.ino, objnum);
 	req->r_base_oid.name_len = strlen(req->r_base_oid.name);
 
+	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+	if (r)
+		goto fail;
+
 	return req;
+
+fail:
+	ceph_osdc_put_request(req);
+	return ERR_PTR(r);
 }
 EXPORT_SYMBOL(ceph_osdc_new_request);
 
-- 
2.4.3

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux