[PATCH v4 11/12] rbd: replay events in journal

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



when we found uncommitted events in journal, we need to do a replay.
This commit only implement three kinds of events replaying:

EVENT_TYPE_AIO_DISCARD:
        Will send a img_request to image with OBJ_OP_ZEROOUT, and
        wait for it completed.
EVENT_TYPE_AIO_WRITE:
        Will send a img_request to image with OBJ_OP_WRITE, and
        wait for it completed.
EVENT_TYPE_AIO_FLUSH:
        As all other events are replayed in synchoronized way, that
        means the events before are all flushed. we did nothing for this event.

Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx>
---
 drivers/block/rbd.c | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 236 insertions(+)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 79929c7..e8c2446 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -7182,6 +7182,242 @@ static void rbd_img_journal_append(struct rbd_img_request *img_req)
 	}
 }
 
+static int setup_write_bvecs(void *buf, u64 offset,
+			     u64 length, struct bio_vec **bvecs,
+			     u32 *bvec_count)
+{
+	u32 i;
+	u32 off;
+
+	*bvec_count = calc_pages_for(offset, length);
+	*bvecs = kcalloc(*bvec_count, sizeof(**bvecs), GFP_NOIO);
+	if (!(*bvecs))
+		return -ENOMEM;
+
+	div_u64_rem(offset, PAGE_SIZE, &off);
+	for (i = 0; i < *bvec_count; i++) {
+		u32 len = min(length, (u64)PAGE_SIZE - off);
+
+		(*bvecs)[i].bv_page = alloc_page(GFP_NOIO);
+		if (!(*bvecs)[i].bv_page)
+			return -ENOMEM;
+
+		(*bvecs)[i].bv_offset = off;
+		(*bvecs)[i].bv_len = len;
+		memcpy(page_address((*bvecs)[i].bv_page) + (*bvecs)[i].bv_offset, buf,
+		       (*bvecs)[i].bv_len);
+		length -= len;
+		buf += len;
+		off = 0;
+	}
+	rbd_assert(!length);
+	return 0;
+}
+
+static int rbd_journal_handle_aio_write(struct rbd_device *rbd_dev, void **p,
+					void *end, u8 struct_v, u64 commit_tid)
+{
+	struct ceph_snap_context *snapc;
+	struct rbd_img_request *img_request;
+	struct journal_commit_info *commit_info;
+	struct ceph_file_extent ex;
+	struct bio_vec *bvecs = NULL;
+	u32 bvec_count;
+	ssize_t data_len;
+	u64 offset, length;
+	int result;
+
+	/* get offset and length */
+	offset = ceph_decode_64(p);
+	length = ceph_decode_64(p);
+	/* get data_len and check the buffer length */
+	data_len = ceph_decode_32(p);
+	if (!ceph_has_room(p, end, data_len)) {
+		pr_err("our of range");
+		return -ERANGE;
+	}
+	rbd_assert(length == data_len);
+	/* Create a new image request for writing */
+	snapc = rbd_dev->header.snapc;
+	ceph_get_snap_context(snapc);
+	img_request = rbd_img_request_create(rbd_dev, OBJ_OP_WRITE, snapc);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err;
+	}
+	/* Allocate a new commit_info for this journal replay */
+	commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO);
+	if (!commit_info) {
+		result = -ENOMEM;
+		rbd_img_request_put(img_request);
+		goto err;
+	}
+	INIT_LIST_HEAD(&commit_info->node);
+
+	/* Don't down ->lock_rwsem in __rbd_img_handle_request */
+	__set_bit(IMG_REQ_NOLOCK, &img_request->flags);
+	commit_info->commit_tid = commit_tid;
+	list_add_tail(&commit_info->node, &img_request->journal_commit_list);
+	snapc = NULL; /* img_request consumes a ref */
+
+	result = setup_write_bvecs(*p, offset, length, &bvecs, &bvec_count);
+	if (result) {
+		rbd_warn(rbd_dev, "failed to setup bvecs.");
+		rbd_img_request_put(img_request);
+		goto err;
+	}
+	/* Skip data for this event */
+	*p = (char *)*p + data_len;
+	ex.fe_off = offset;
+	ex.fe_len = length;
+	result = rbd_img_fill_from_bvecs(img_request, &ex, 1, bvecs);
+	if (result) {
+		rbd_img_request_put(img_request);
+		goto err;
+	}
+	/* 
+	 * As we are doing journal replaying in exclusive-lock aqcuiring,
+	 * we don't need to aquire exclusive-lock for this writing, so
+	 * set the ->state to RBD_IMG_APPEND_JOURNAL to skip exclusive-lock
+	 * acquiring in state machine
+	 */
+	img_request->state = RBD_IMG_APPEND_JOURNAL;
+	rbd_img_handle_request(img_request, 0);
+	result = wait_for_completion_interruptible(&img_request->completion);
+err:
+	if (bvecs) {
+		int i;
+
+		for (i = 0; i < bvec_count; i++) {
+			if (bvecs[i].bv_page)
+				__free_page(bvecs[i].bv_page);
+		}
+		kfree(bvecs);
+	}
+	return result;
+}
+
+static int rbd_journal_handle_aio_discard(struct rbd_device *rbd_dev, void **p,
+					  void *end, u8 struct_v,
+					  u64 commit_tid)
+{
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc;
+	struct journal_commit_info *commit_info;
+	enum obj_operation_type	op_type;
+	u64 offset, length;
+	bool skip_partial_discard = true;
+	int result;
+
+	/* get offset and length*/
+	offset = ceph_decode_64(p);
+	length = ceph_decode_64(p);
+	if (struct_v >= 4)
+		skip_partial_discard = ceph_decode_8(p);
+	if (struct_v >= 5) {
+		/* skip discard_granularity_bytes */
+		ceph_decode_32(p);
+	}
+
+	snapc = rbd_dev->header.snapc;
+	ceph_get_snap_context(snapc);
+	/*
+	 * When the skip_partial_discard is false, we need to guarantee
+	 * that the data range would be zeroout-ed. Otherwise, we don't
+	 * guarantee the range discarded would be zero filled.
+	 */
+	if (skip_partial_discard)
+		op_type = OBJ_OP_DISCARD;
+	else
+		op_type = OBJ_OP_ZEROOUT;
+
+	img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err;
+	}
+	/* Allocate a commit_info for this image request */
+	commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO);
+	if (!commit_info) {
+		result = -ENOMEM;
+		rbd_img_request_put(img_request);
+		goto err;
+	}
+	INIT_LIST_HEAD(&commit_info->node);
+
+	/* Don't down ->lock_rwsem in __rbd_img_handle_request */
+	__set_bit(IMG_REQ_NOLOCK, &img_request->flags);
+	commit_info->commit_tid = commit_tid;
+	list_add_tail(&commit_info->node, &img_request->journal_commit_list);
+	result = rbd_img_fill_nodata(img_request, offset, length);
+	if (result) {
+		rbd_img_request_put(img_request);
+		goto err;
+	}
+
+	img_request->state = RBD_IMG_APPEND_JOURNAL;
+	rbd_img_handle_request(img_request, 0);
+	result = wait_for_completion_interruptible(&img_request->completion);
+err:
+	return result;
+}
+
+static int rbd_journal_replay(void *entry_handler,
+			      struct ceph_journaler_entry *entry,
+			      u64 commit_tid)
+{
+	struct rbd_device *rbd_dev = entry_handler;
+	void *data = entry->data;
+	void **p = &data;
+	void *end = *p + entry->data_len;
+	u32 event_type;
+	u8 struct_v;
+	u32 struct_len;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "rbd_decode_entry", &struct_v,
+				  &struct_len);
+	if (ret)
+		return -EINVAL;
+
+	event_type = ceph_decode_32(p);
+	switch (event_type) {
+	case EVENT_TYPE_AIO_WRITE:
+		ret = rbd_journal_handle_aio_write(rbd_dev, p, end, struct_v,
+						   commit_tid);
+		break;
+	case EVENT_TYPE_AIO_DISCARD:
+		ret = rbd_journal_handle_aio_discard(rbd_dev, p, end, struct_v,
+						     commit_tid);
+		break;
+	case EVENT_TYPE_AIO_FLUSH:
+		/*
+		 * As the all event replaying are synchronized, we don't
+		 * need any more work for replaying flush event. Just
+		 * commit it.
+		 */
+		ceph_journaler_client_committed(rbd_dev->journal->journaler,
+						commit_tid);
+		break;
+	default:
+		rbd_warn(rbd_dev, "unknown event_type: %u", event_type);
+		return -EINVAL;
+	}
+
+	if (struct_v >= 4) {
+		u8 meta_struct_v;
+		u32 meta_struct_len;
+
+		/* skip metadata */
+		ret = ceph_start_decoding(p, end, 1, "decode_metadata",
+					     &meta_struct_v, &meta_struct_len);
+		if (ret)
+			return ret;
+		*p += meta_struct_len;
+	}
+	return ret;
+}
+
 struct rbd_journal_tag_predecessor {
 	bool commit_valid;
 	u64 tag_tid;
-- 
1.8.3.1





[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux