[PATCH v3 06/10] ceph-rbd: generalize mon requests, add pool op support

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Generalize the current statfs synchronous requests, and support pool_ops.

Signed-off-by: Yehuda Sadeh <yehuda@xxxxxxxxxxxxxxx>
Signed-off-by: Sage Weil <sage@xxxxxxxxxxxx>
---
 fs/ceph/mon_client.c |  170 +++++++++++++++++++++++++++++++++++++++++++++-----
 fs/ceph/mon_client.h |    5 ++
 2 files changed, 158 insertions(+), 17 deletions(-)

diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 4e022ed..c1082e8 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -349,7 +349,7 @@ out:
 }
 
 /*
- * statfs
+ * generic requests (e.g., statfs, poolop)
  */
 static struct ceph_mon_generic_request *__lookup_generic_req(
 	struct ceph_mon_client *monc, u64 tid)
@@ -442,6 +442,35 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
 	return m;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+			      struct ceph_mon_generic_request *req)
+{
+	int err;
+
+	/* register request */
+	mutex_lock(&monc->mutex);
+	req->tid = ++monc->last_tid;
+	req->request->hdr.tid = cpu_to_le64(req->tid);
+	__insert_generic_request(monc, req);
+	monc->num_generic_requests++;
+	ceph_con_send(monc->con, ceph_msg_get(req->request));
+	mutex_unlock(&monc->mutex);
+
+	err = wait_for_completion_interruptible(&req->completion);
+
+	mutex_lock(&monc->mutex);
+	rb_erase(&req->node, &monc->generic_request_tree);
+	monc->num_generic_requests--;
+	mutex_unlock(&monc->mutex);
+
+	if (!err)
+		err = req->result;
+	return err;
+}
+
+/*
+ * statfs
+ */
 static void handle_statfs_reply(struct ceph_mon_client *monc,
 				struct ceph_msg *msg)
 {
@@ -468,7 +497,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
 	return;
 
 bad:
-	pr_err("corrupt generic reply, no tid\n");
+	pr_err("corrupt generic reply, tid %llu\n", tid);
 	ceph_msg_dump(msg);
 }
 
@@ -487,6 +516,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 
 	kref_init(&req->kref);
 	req->buf = buf;
+	req->buf_len = sizeof(*buf);
 	init_completion(&req->completion);
 
 	err = -ENOMEM;
@@ -504,33 +534,134 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
 	h->monhdr.session_mon_tid = 0;
 	h->fsid = monc->monmap->fsid;
 
-	/* register request */
-	mutex_lock(&monc->mutex);
-	req->tid = ++monc->last_tid;
-	req->request->hdr.tid = cpu_to_le64(req->tid);
-	__insert_generic_request(monc, req);
-	monc->num_generic_requests++;
-	mutex_unlock(&monc->mutex);
+	err = do_generic_request(monc, req);
 
-	/* send request and wait */
-	ceph_con_send(monc->con, ceph_msg_get(req->request));
-	err = wait_for_completion_interruptible(&req->completion);
+out:
+	kref_put(&req->kref, release_generic_request);
+	return err;
+}
+
+/*
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+				char *dst, size_t dst_len)
+{
+	u32 buf_len;
+
+	if (src_len != sizeof(u32) + dst_len)
+		return -EINVAL;
+
+	buf_len = le32_to_cpu(*(u32 *)src);
+	if (buf_len != dst_len)
+		return -EINVAL;
+
+	memcpy(dst, src + sizeof(u32), dst_len);
+	return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+				struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+
+	if (msg->front.iov_len < sizeof(*reply))
+		goto bad;
+	dout("handle_poolop_reply %p tid %llu\n", msg, tid);
 
 	mutex_lock(&monc->mutex);
-	rb_erase(&req->node, &monc->generic_request_tree);
-	monc->num_generic_requests--;
+	req = __lookup_generic_req(monc, tid);
+	if (req) {
+		if (req->buf_len &&
+		    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+				     msg->front.iov_len - sizeof(*reply),
+				     req->buf, req->buf_len) < 0) {
+			mutex_unlock(&monc->mutex);
+			goto bad;
+		}
+		req->result = le32_to_cpu(reply->reply_code);
+		get_generic_request(req);
+	}
 	mutex_unlock(&monc->mutex);
+	if (req) {
+		complete(&req->completion);
+		put_generic_request(req);
+	}
+	return;
 
-	if (!err)
-		err = req->result;
+bad:
+	pr_err("corrupt generic reply, tid %llu\n", tid);
+	ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+			u32 pool, u64 snapid,
+			char *buf, int len)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop *h;
+	int err;
+
+	req = kzalloc(sizeof(*req), GFP_NOFS);
+	if (!req)
+		return -ENOMEM;
+
+	kref_init(&req->kref);
+	req->buf = buf;
+	req->buf_len = len;
+	init_completion(&req->completion);
+
+	err = -ENOMEM;
+	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+	if (!req->request)
+		goto out;
+	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+	if (!req->reply)
+		goto out;
+
+	/* fill out request */
+	req->request->hdr.version = cpu_to_le16(2);
+	h = req->request->front.iov_base;
+	h->monhdr.have_version = 0;
+	h->monhdr.session_mon = cpu_to_le16(-1);
+	h->monhdr.session_mon_tid = 0;
+	h->fsid = monc->monmap->fsid;
+	h->pool = cpu_to_le32(pool);
+	h->op = cpu_to_le32(op);
+	h->auid = 0;
+	h->snapid = cpu_to_le64(snapid);
+	h->name_len = 0;
+
+	err = do_generic_request(monc, req);
 
 out:
 	kref_put(&req->kref, release_generic_request);
 	return err;
 }
 
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 *snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, snapid, 0, 0);
+
+}
+
 /*
- * Resend pending statfs requests.
+ * Resend pending generic requests.
  */
 static void __resend_generic_request(struct ceph_mon_client *monc)
 {
@@ -782,6 +913,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 		handle_statfs_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_POOLOP_REPLY:
+		handle_poolop_reply(monc, msg);
+		break;
+
 	case CEPH_MSG_MON_MAP:
 		ceph_monc_handle_map(monc, msg);
 		break;
@@ -820,6 +955,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
 		m = ceph_msg_get(monc->m_subscribe_ack);
 		break;
+	case CEPH_MSG_POOLOP_REPLY:
 	case CEPH_MSG_STATFS_REPLY:
 		return get_generic_reply(con, hdr, skip);
 	case CEPH_MSG_AUTH_REPLY:
diff --git a/fs/ceph/mon_client.h b/fs/ceph/mon_client.h
index 174d794..8e396f2 100644
--- a/fs/ceph/mon_client.h
+++ b/fs/ceph/mon_client.h
@@ -50,6 +50,7 @@ struct ceph_mon_generic_request {
 	struct rb_node node;
 	int result;
 	void *buf;
+	int buf_len;
 	struct completion completion;
 	struct ceph_msg *request;  /* original request */
 	struct ceph_msg *reply;    /* and reply */
@@ -111,6 +112,10 @@ extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
 
+extern int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+				   u32 pool, u64 *snapid);
 
+extern int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+				   u32 pool, u64 snapid);
 
 #endif
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux