[PATCH v3 08/15] libceph: journaling: introduce api for journal appending

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This commit introduce 3 APIs for journal recording:

(1) ceph_journaler_allocate_tag()
    This api allocate a new tag for user to get a unified
tag_tid. Then each event appended by this user will be tagged
by this tag_tid.

(2) ceph_journaler_append()
    This api allow user to append event to journal objects.

(3) ceph_journaler_client_committed()
    This api will notify journaling that a event is already
committed, you can remove it from journal if there is no other
client refre to it.

Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx>
---
 include/linux/ceph/journaler.h |   2 +-
 net/ceph/journaler.c           | 834 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 835 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h
index e3b82af..f04fb3f 100644
--- a/include/linux/ceph/journaler.h
+++ b/include/linux/ceph/journaler.h
@@ -173,7 +173,7 @@ static inline uint64_t ceph_journaler_get_max_append_size(struct ceph_journaler
 struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void);
 void ceph_journaler_ctx_put(struct ceph_journaler_ctx *journaler_ctx);
 int ceph_journaler_append(struct ceph_journaler *journaler,
-			  uint64_t tag_tid, uint64_t *commit_tid,
+			  uint64_t tag_tid,
 			  struct ceph_journaler_ctx *ctx);
 void ceph_journaler_client_committed(struct ceph_journaler *journaler,
 				     uint64_t commit_tid);
diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c
index 3e92e96..26a5b97 100644
--- a/net/ceph/journaler.c
+++ b/net/ceph/journaler.c
@@ -29,6 +29,42 @@ static char *object_oid_prefix(int pool_id, const char *journal_id)
 	return prefix;
 }
 
+static void journaler_flush(struct work_struct *work);
+static void journaler_client_commit(struct work_struct *work);
+static void journaler_notify_update(struct work_struct *work);
+static void journaler_overflow(struct work_struct *work);
+static void journaler_append_finish(struct work_struct *work);
+
+/*
+ * Append got through the following state machine:
+ *
+ *                    JOURNALER_APPEND_START 
+ *                              |              
+ *                              v              
+ *            ..>. . . JOURNALER_APPEND_SEND  . . . . 
+ *            .                 |                    .
+ * JOURNALER_APPEND_OVERFLOW    |                    .
+ *                              |                    . 
+ *            ^                 v                    v 
+ *            .. . . . JOURNALER_APPEND_FLUSH . . >. .
+ *                              |                    . 
+ *                              v                 (error) 
+ *                     JOURNALER_APPEND_SAFE         v
+ *                              |                    .
+ *                              v                    . 
+ *                     JOURNALER_APPEND_FINISH . < . .
+ *
+ * Append starts in JOURNALER_APPEND_START and ends in JOURNALER_APPEND_FINISH
+ */
+enum journaler_append_state {
+	JOURNALER_APPEND_START = 1,
+	JOURNALER_APPEND_SEND,
+	JOURNALER_APPEND_FLUSH,
+	JOURNALER_APPEND_OVERFLOW,
+	JOURNALER_APPEND_SAFE,
+	JOURNALER_APPEND_FINISH,
+};
+
 /*
  * journaler_append_ctx is an internal structure to represent an append op.
  */
@@ -44,6 +80,7 @@ struct journaler_append_ctx {
 	struct ceph_journaler_entry entry;
 	struct ceph_journaler_ctx journaler_ctx;
 
+	enum journaler_append_state state;
 	struct kref	kref;
 };
 
@@ -151,6 +188,12 @@ struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
 	if (!journaler->notify_wq)
 		goto err_destroy_task_wq;
 
+	INIT_WORK(&journaler->flush_work, journaler_flush);
+	INIT_WORK(&journaler->finish_work, journaler_append_finish);
+	INIT_DELAYED_WORK(&journaler->commit_work, journaler_client_commit);
+	INIT_WORK(&journaler->notify_update_work, journaler_notify_update);
+	INIT_WORK(&journaler->overflow_work, journaler_overflow);
+
 	journaler->fetch_buf = NULL;
 	journaler->handle_entry = NULL;
 	journaler->entry_handler = NULL;
@@ -1287,3 +1330,794 @@ int ceph_journaler_start_replay(struct ceph_journaler *journaler)
 	return ret;
 }
 EXPORT_SYMBOL(ceph_journaler_start_replay);
+
+// recording
+static int get_new_entry_tid(struct ceph_journaler *journaler,
+			      uint64_t tag_tid, uint64_t *entry_tid)
+{
+	struct entry_tid *pos;
+
+	spin_lock(&journaler->entry_tid_lock);
+	list_for_each_entry(pos, &journaler->entry_tids, node) {
+		if (pos->tag_tid == tag_tid) {
+			*entry_tid = pos->entry_tid++;
+			spin_unlock(&journaler->entry_tid_lock);
+			return 0;
+		}
+	}
+
+	pos = entry_tid_alloc(journaler, tag_tid);
+	if (!pos) {
+		spin_unlock(&journaler->entry_tid_lock);
+		pr_err("failed to allocate new entry.");
+		return -ENOMEM;
+	}
+
+	*entry_tid = pos->entry_tid++;
+	spin_unlock(&journaler->entry_tid_lock);
+
+	return 0;
+}
+
+static uint64_t get_object(struct ceph_journaler *journaler, uint64_t splay_offset)
+{
+	return splay_offset + (journaler->splay_width * journaler->active_set);
+}
+
+static void future_init(struct ceph_journaler_future *future,
+			uint64_t tag_tid,
+						   uint64_t entry_tid,
+						   uint64_t commit_tid,
+			struct ceph_journaler_ctx *journaler_ctx)
+{
+	future->tag_tid = tag_tid;
+	future->entry_tid = entry_tid;
+	future->commit_tid = commit_tid;
+
+	spin_lock_init(&future->lock);
+	future->safe = false;
+	future->consistent = false;
+
+	future->ctx = journaler_ctx;
+	future->wait = NULL;
+}
+
+static void set_prev_future(struct ceph_journaler *journaler,
+			    struct journaler_append_ctx *append_ctx)
+{
+	struct ceph_journaler_future *future = &append_ctx->future;
+	bool prev_future_finished = false;
+
+	if (journaler->prev_future == NULL) {
+		prev_future_finished = true;
+	} else {
+		spin_lock(&journaler->prev_future->lock);
+		prev_future_finished = (journaler->prev_future->consistent &&
+					journaler->prev_future->safe);
+		journaler->prev_future->wait = append_ctx;
+		spin_unlock(&journaler->prev_future->lock);
+	}
+
+	spin_lock(&future->lock);
+	if (prev_future_finished) {
+		future->consistent = true;
+	}
+	spin_unlock(&future->lock);
+
+	journaler->prev_future = future;
+}
+
+static void entry_init(struct ceph_journaler_entry *entry,
+			uint64_t tag_tid,
+			uint64_t entry_tid,
+			struct ceph_journaler_ctx *journaler_ctx)
+{
+	entry->tag_tid = tag_tid;
+	entry->entry_tid = entry_tid;
+	entry->data_len = journaler_ctx->bio_len +
+			  journaler_ctx->prefix_len + journaler_ctx->suffix_len;
+}
+
+static void journaler_entry_encode_prefix(struct ceph_journaler_entry *entry,
+					  void **p, void *end)
+{
+	ceph_encode_64(p, PREAMBLE);
+	ceph_encode_8(p, (uint8_t)1);
+	ceph_encode_64(p, entry->entry_tid);
+	ceph_encode_64(p, entry->tag_tid);
+
+	ceph_encode_32(p, entry->data_len);
+}
+
+static uint32_t crc_bio(uint32_t crc, struct bio *bio, u64 length)
+{
+	struct bio_vec bv;
+	struct bvec_iter iter;
+	char *buf;
+	u64 offset = 0;
+	u64 len = length;
+
+next:
+	bio_for_each_segment(bv, bio, iter) {
+		buf = page_address(bv.bv_page) + bv.bv_offset;
+		len = min_t(u64, length, bv.bv_len);
+		crc = crc32c(crc, buf, len);
+		offset += len;
+		length -= len;
+
+		if (length == 0)
+			goto out;
+	}
+
+	if (length && bio->bi_next) {
+		bio = bio->bi_next;
+		goto next;
+	}
+
+	WARN_ON(length != 0);
+out:
+	return crc;
+}
+
+static void journaler_append_finish(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work, struct ceph_journaler,
+						        finish_work);
+	struct journaler_append_ctx *ctx_pos, *next;
+	LIST_HEAD(tmp_list);
+
+	spin_lock(&journaler->finish_lock);
+	list_splice_init(&journaler->finish_list, &tmp_list);
+	spin_unlock(&journaler->finish_lock);
+
+	list_for_each_entry_safe(ctx_pos, next, &tmp_list, node) {
+		list_del(&ctx_pos->node);
+		ctx_pos->journaler_ctx.callback(&ctx_pos->journaler_ctx);
+	}
+}
+
+static void journaler_handle_append(struct journaler_append_ctx *ctx, int ret);
+static void future_consistent(struct journaler_append_ctx *append_ctx,
+			      int result) {
+	struct ceph_journaler_future *future = &append_ctx->future;
+	bool future_finished = false;
+
+	spin_lock(&future->lock);
+	future->consistent = true;
+	future_finished = (future->safe && future->consistent);
+	spin_unlock(&future->lock);
+
+	if (future_finished) {
+		append_ctx->state = JOURNALER_APPEND_FINISH;
+		journaler_handle_append(append_ctx, result);
+	}
+}
+
+static void future_finish(struct journaler_append_ctx *append_ctx,
+			  int result) {
+	struct ceph_journaler *journaler = append_ctx->journaler;
+	struct ceph_journaler_ctx *journaler_ctx = &append_ctx->journaler_ctx;
+	struct ceph_journaler_future *future = &append_ctx->future;
+	struct journaler_append_ctx *wait = future->wait;
+
+	mutex_lock(&journaler->meta_lock);
+	if (journaler->prev_future == future)
+		journaler->prev_future = NULL;
+	mutex_unlock(&journaler->meta_lock);
+
+	spin_lock(&journaler->finish_lock);
+	if (journaler_ctx->result == 0)
+		journaler_ctx->result = result;
+	list_add_tail(&append_ctx->node, &journaler->finish_list);
+	spin_unlock(&journaler->finish_lock);
+
+	queue_work(journaler->task_wq, &journaler->finish_work);
+	if (wait)
+		future_consistent(wait, result);
+}
+
+static void journaler_notify_update(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work,
+							struct ceph_journaler,
+						        notify_update_work);
+	int ret = 0;
+
+	ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid,
+				&journaler->header_oloc, NULL, 0,
+				5000, NULL, NULL);
+	if (ret)
+		pr_err("notify_update failed: %d", ret);
+}
+
+// advance the active_set to (active_set + 1). This function
+// will call ceph_cls_journaler_set_active_set to update journal
+// metadata and notify all clients about this event. We don't
+// update journaler->active_set in memory currently.
+//
+// The journaler->active_set will be updated in refresh() when
+// we get the notification.
+static void advance_object_set(struct ceph_journaler *journaler)
+{
+	struct object_recorder *obj_recorder;
+	uint64_t active_set;
+	int i, ret;
+
+	mutex_lock(&journaler->meta_lock);
+	if (journaler->advancing || journaler->flushing) {
+		mutex_unlock(&journaler->meta_lock);
+		return;
+	}
+
+	// make sure all inflight appending finish
+	for (i = 0; i < journaler->splay_width; i++) {
+		obj_recorder = &journaler->obj_recorders[i];
+		spin_lock(&obj_recorder->lock);
+		if (obj_recorder->inflight_append) {
+			spin_unlock(&obj_recorder->lock);
+			mutex_unlock(&journaler->meta_lock);
+			return;
+		}
+		spin_unlock(&obj_recorder->lock);
+	}
+
+	journaler->advancing = true;
+
+	active_set = journaler->active_set + 1;
+	mutex_unlock(&journaler->meta_lock);
+
+	ret = ceph_cls_journaler_set_active_set(journaler->osdc,
+			&journaler->header_oid, &journaler->header_oloc,
+			active_set);
+	if (ret) {
+		pr_err("error in set active_set: %d", ret);
+	}
+
+	queue_work(journaler->task_wq, &journaler->notify_update_work);
+}
+
+static void journaler_overflow(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work,
+						        struct ceph_journaler,
+						        overflow_work);
+	advance_object_set(journaler);
+}
+
+static void journaler_append_callback(struct ceph_osd_request *osd_req)
+{
+	struct journaler_append_ctx *ctx = osd_req->r_priv;
+	int ret = osd_req->r_result;
+
+	if (ret)
+		pr_debug("ret of journaler_append_callback: %d", ret);
+
+	__free_page(ctx->req_page);
+	ceph_osdc_put_request(osd_req);
+
+	journaler_handle_append(ctx, ret);
+}
+
+static int append(struct ceph_journaler *journaler,
+		  struct ceph_object_id *oid,
+		  struct ceph_object_locator *oloc,
+		  struct journaler_append_ctx *ctx)
+
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_request *req;
+	void *p;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 2, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+	req->r_callback = journaler_append_callback;
+	req->r_priv = ctx;
+
+	// guard_append
+	ctx->req_page = alloc_page(GFP_NOIO);
+	if (!ctx->req_page) {
+		ret = -ENOMEM;
+		goto out_req;
+	}
+	p = page_address(ctx->req_page);
+	ceph_encode_64(&p, 1 << journaler->order);
+	ret = osd_req_op_cls_init(req, 0, "journal", "guard_append");
+	if (ret)
+		goto out_free_page;
+	osd_req_op_cls_request_data_pages(req, 0, &ctx->req_page, 8, 0, false, false);
+
+	// append_data
+	osd_req_op_extent_init(req, 1, CEPH_OSD_OP_APPEND, 0,
+		ctx->journaler_ctx.prefix_len + ctx->journaler_ctx.bio_len + ctx->journaler_ctx.suffix_len, 0, 0);
+
+	if (ctx->journaler_ctx.prefix_len)
+		osd_req_op_extent_prefix_pages(req, 1, &ctx->journaler_ctx.prefix_page,
+					       ctx->journaler_ctx.prefix_len,
+					       ctx->journaler_ctx.prefix_offset,
+					       false, false);
+
+	if (ctx->journaler_ctx.bio_len)
+		osd_req_op_extent_osd_data_bio(req, 1, &ctx->journaler_ctx.bio_iter, ctx->journaler_ctx.bio_len);
+
+	if (ctx->journaler_ctx.suffix_len)
+		osd_req_op_extent_suffix_pages(req, 1, &ctx->journaler_ctx.suffix_page,
+					       ctx->journaler_ctx.suffix_len,
+					       ctx->journaler_ctx.suffix_offset,
+					       false, false);
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_free_page;
+
+	ceph_osdc_start_request(osdc, req, false);
+	return 0;
+
+out_free_page:
+	__free_page(ctx->req_page);
+out_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+
+static int send_append_request(struct ceph_journaler *journaler,
+			       uint64_t object_num,
+			       struct journaler_append_ctx *ctx)
+{
+	struct ceph_object_id object_oid;
+	int ret = 0;
+
+	ceph_oid_init(&object_oid);
+	ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu",
+				journaler->object_oid_prefix, object_num);
+	if (ret) {
+		pr_err("failed to initialize object id: %d", ret);
+		goto out;
+	}
+
+	ret = append(journaler, &object_oid, &journaler->data_oloc, ctx);
+out:
+	ceph_oid_destroy(&object_oid);
+	return ret;
+}
+
+static void journaler_flush(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(work,
+							struct ceph_journaler,
+							flush_work);
+	int i = 0;
+	struct object_recorder *obj_recorder;
+	struct journaler_append_ctx *ctx, *next_ctx;
+	LIST_HEAD(tmp);
+
+	mutex_lock(&journaler->meta_lock);
+	if (journaler->overflowed) {
+		mutex_unlock(&journaler->meta_lock);
+		return;
+	}
+
+	journaler->flushing = true;
+	mutex_unlock(&journaler->meta_lock);
+
+	for (i = 0; i < journaler->splay_width; i++) {
+		INIT_LIST_HEAD(&tmp);
+		obj_recorder = &journaler->obj_recorders[i];
+
+		spin_lock(&obj_recorder->lock);
+		list_splice_tail_init(&obj_recorder->overflow_list, &tmp);
+		list_splice_tail_init(&obj_recorder->append_list, &tmp);
+		spin_unlock(&obj_recorder->lock);
+
+		list_for_each_entry_safe(ctx, next_ctx, &tmp, node) {
+			list_del(&ctx->node);
+			ctx->object_num = get_object(journaler, obj_recorder->splay_offset);
+			journaler_handle_append(ctx, 0);
+		}
+	}
+
+	mutex_lock(&journaler->meta_lock);
+	journaler->flushing = false;
+	// As we don't do advance in flushing, so queue another overflow_work
+	// after flushing finished if we journaler is overflowed.
+	if (journaler->overflowed)
+		queue_work(journaler->task_wq, &journaler->overflow_work);
+	mutex_unlock(&journaler->meta_lock);
+}
+
+static void ceph_journaler_object_append(struct ceph_journaler *journaler,
+					 struct journaler_append_ctx *append_ctx)
+{
+	void *buf, *end;
+	uint32_t crc = 0;
+	struct ceph_journaler_ctx *journaler_ctx = &append_ctx->journaler_ctx;
+	struct ceph_bio_iter *bio_iter = &journaler_ctx->bio_iter;
+	struct object_recorder *obj_recorder;
+
+	// PEAMBLE(8) + version(1) + entry_tid(8) + tag_tid(8) + string_len(4) = 29
+	journaler_ctx->prefix_offset -= 29;
+	journaler_ctx->prefix_len += 29;
+	buf = page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset;
+	end = buf + 29;
+	journaler_entry_encode_prefix(&append_ctx->entry, &buf, end);
+
+	// size of crc is 4
+	journaler_ctx->suffix_offset += 0;
+	journaler_ctx->suffix_len += 4;
+	buf = page_address(journaler_ctx->suffix_page);
+	end = buf + 4;
+	crc = crc32c(crc, page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset,
+		     journaler_ctx->prefix_len);
+	if (journaler_ctx->bio_len)
+		crc = crc_bio(crc, bio_iter->bio, journaler_ctx->bio_len);
+	ceph_encode_32(&buf, crc);
+	obj_recorder = &journaler->obj_recorders[append_ctx->splay_offset];
+
+	spin_lock(&obj_recorder->lock);
+	list_add_tail(&append_ctx->node, &obj_recorder->append_list);
+	queue_work(journaler->task_wq, &journaler->flush_work);
+	spin_unlock(&obj_recorder->lock);
+}
+
+static void journaler_handle_append(struct journaler_append_ctx *ctx, int ret)
+{
+	struct ceph_journaler *journaler = ctx->journaler;
+	struct object_recorder *obj_recorder = &journaler->obj_recorders[ctx->splay_offset];
+
+again:
+	switch (ctx->state) {
+	case JOURNALER_APPEND_START:
+		ctx->state = JOURNALER_APPEND_SEND;
+		ceph_journaler_object_append(journaler, ctx);
+		break;
+	case JOURNALER_APPEND_SEND:
+		ctx->state = JOURNALER_APPEND_FLUSH;
+		spin_lock(&obj_recorder->lock);
+		obj_recorder->inflight_append++;
+		spin_unlock(&obj_recorder->lock);
+		ret = send_append_request(journaler, ctx->object_num, ctx);
+		if (ret) {
+			pr_err("failed to send append request: %d", ret);
+			ctx->state = JOURNALER_APPEND_FINISH;
+			goto again;
+		}
+		break;
+	case JOURNALER_APPEND_FLUSH:
+		if (ret == -EOVERFLOW) {
+			mutex_lock(&journaler->meta_lock);
+			journaler->overflowed = true;
+			mutex_unlock(&journaler->meta_lock);
+
+			spin_lock(&obj_recorder->lock);
+			ctx->state = JOURNALER_APPEND_OVERFLOW;
+			list_add_tail(&ctx->node, &obj_recorder->overflow_list);
+			if (--obj_recorder->inflight_append == 0)
+				queue_work(journaler->task_wq, &journaler->overflow_work);
+			spin_unlock(&obj_recorder->lock);
+			break;
+		}
+
+		spin_lock(&obj_recorder->lock);
+		if (--obj_recorder->inflight_append == 0) {
+			mutex_lock(&journaler->meta_lock);
+			if (journaler->overflowed)
+				queue_work(journaler->task_wq, &journaler->overflow_work);
+			mutex_unlock(&journaler->meta_lock);
+		}
+		spin_unlock(&obj_recorder->lock);
+
+		ret = add_commit_entry(journaler, ctx->future.commit_tid, ctx->object_num,
+				       ctx->future.tag_tid, ctx->future.entry_tid);
+		if (ret) {
+			pr_err("failed to add_commit_entry: %d", ret);
+			ctx->state = JOURNALER_APPEND_FINISH;
+			ret = -ENOMEM;
+			goto again;
+		}
+
+		ctx->state = JOURNALER_APPEND_SAFE;
+		goto again;
+	case JOURNALER_APPEND_OVERFLOW:
+		ctx->state = JOURNALER_APPEND_SEND;
+		goto again;
+	case JOURNALER_APPEND_SAFE:
+		spin_lock(&ctx->future.lock);
+		ctx->future.safe = true;
+		if (ctx->future.consistent) {
+			spin_unlock(&ctx->future.lock);
+			ctx->state = JOURNALER_APPEND_FINISH;
+			goto again;
+		}
+		spin_unlock(&ctx->future.lock);
+		break;
+	case JOURNALER_APPEND_FINISH:
+		future_finish(ctx, ret);
+		break;
+	default:
+		BUG();
+	}
+}
+
+// journaler_append_ctx alloc and release
+struct journaler_append_ctx *journaler_append_ctx_alloc(void)
+{
+	struct journaler_append_ctx *append_ctx;
+	struct ceph_journaler_ctx *journaler_ctx;
+
+	append_ctx = kmem_cache_zalloc(journaler_append_ctx_cache, GFP_NOIO);
+	if (!append_ctx)
+		return NULL;
+
+	journaler_ctx = &append_ctx->journaler_ctx;
+	journaler_ctx->prefix_page = alloc_page(GFP_NOIO);
+	if (!journaler_ctx->prefix_page)
+		goto free_journaler_ctx;
+
+	journaler_ctx->suffix_page = alloc_page(GFP_NOIO);
+	if (!journaler_ctx->suffix_page)
+		goto free_prefix_page;
+
+	memset(page_address(journaler_ctx->prefix_page), 0, PAGE_SIZE);
+	memset(page_address(journaler_ctx->suffix_page), 0, PAGE_SIZE);
+	INIT_LIST_HEAD(&journaler_ctx->node);
+
+	kref_init(&append_ctx->kref);
+	INIT_LIST_HEAD(&append_ctx->node);
+	return append_ctx;
+
+free_prefix_page:
+	__free_page(journaler_ctx->prefix_page);
+free_journaler_ctx:
+	kmem_cache_free(journaler_append_ctx_cache, append_ctx);
+	return NULL;
+}
+
+struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void)
+{
+	struct journaler_append_ctx *append_ctx;
+
+	append_ctx = journaler_append_ctx_alloc();
+	if (!append_ctx)
+		return NULL;
+
+	return &append_ctx->journaler_ctx;
+}
+EXPORT_SYMBOL(ceph_journaler_ctx_alloc);
+
+static void journaler_append_ctx_release(struct kref *kref)
+{
+	struct journaler_append_ctx *append_ctx;
+	struct ceph_journaler_ctx *journaler_ctx;
+
+	append_ctx = container_of(kref, struct journaler_append_ctx, kref);
+	journaler_ctx = &append_ctx->journaler_ctx;
+
+	__free_page(journaler_ctx->prefix_page);
+	__free_page(journaler_ctx->suffix_page);
+	kmem_cache_free(journaler_append_ctx_cache, append_ctx);
+}
+
+static void journaler_append_ctx_put(struct journaler_append_ctx *append_ctx)
+{
+	if (append_ctx) {
+		kref_put(&append_ctx->kref, journaler_append_ctx_release);
+	}
+}
+
+void ceph_journaler_ctx_put(struct ceph_journaler_ctx *journaler_ctx)
+{
+	struct journaler_append_ctx *append_ctx;
+
+	if (journaler_ctx) {
+		append_ctx = container_of(journaler_ctx,
+					  struct journaler_append_ctx,
+					  journaler_ctx);
+		journaler_append_ctx_put(append_ctx);
+	}
+}
+EXPORT_SYMBOL(ceph_journaler_ctx_put);
+
+int ceph_journaler_append(struct ceph_journaler *journaler,
+			  uint64_t tag_tid,
+			  struct ceph_journaler_ctx *journaler_ctx)
+{
+	uint64_t entry_tid;
+	struct object_recorder *obj_recorder;
+	struct journaler_append_ctx *append_ctx;
+	int ret;
+
+	append_ctx = container_of(journaler_ctx,
+				  struct journaler_append_ctx,
+				  journaler_ctx);
+
+	append_ctx->journaler = journaler;
+	mutex_lock(&journaler->meta_lock);
+	// get entry_tid for this event. (tag_tid, entry_tid) is
+	// the uniq id for every journal event.
+	ret = get_new_entry_tid(journaler, tag_tid, &entry_tid);
+	if (ret) {
+		mutex_unlock(&journaler->meta_lock);
+		return ret;
+	}
+
+	// calculate the object_num for this entry.
+	append_ctx->splay_offset = entry_tid % journaler->splay_width;
+	obj_recorder = &journaler->obj_recorders[journaler->splay_width];
+	append_ctx->object_num = get_object(journaler, append_ctx->splay_offset);
+
+	// allocate a commit_tid for this event, when the data is committed
+	// to data objects, ceph_journaler_client_committed() will accept
+	// the commit_tid to understand how to update journal commit position.
+	journaler_ctx->commit_tid = __allocate_commit_tid(journaler);
+	entry_init(&append_ctx->entry, tag_tid, entry_tid, journaler_ctx);
+
+	// To make sure the journal entry is consistent, we use future
+	// to track it. And every journal entry depent on the previous
+	// entry. Only if the previous entry is finished, current entry
+	// could be consistent. and then we can finish current entry. 
+	future_init(&append_ctx->future, tag_tid, entry_tid,
+		    journaler_ctx->commit_tid, journaler_ctx);
+	set_prev_future(journaler, append_ctx);
+	mutex_unlock(&journaler->meta_lock);
+
+	append_ctx->state = JOURNALER_APPEND_START;
+	journaler_handle_append(append_ctx, 0);
+	return 0;
+}
+EXPORT_SYMBOL(ceph_journaler_append);
+
+static void journaler_client_commit(struct work_struct *work)
+{
+	struct ceph_journaler *journaler = container_of(to_delayed_work(work),
+							struct ceph_journaler,
+							commit_work);
+	int ret;
+
+	mutex_lock(&journaler->commit_lock);
+	copy_pos_list(&journaler->obj_pos_pending,
+		      &journaler->obj_pos_committing);
+	mutex_unlock(&journaler->commit_lock);
+
+	ret = ceph_cls_journaler_client_committed(journaler->osdc,
+		&journaler->header_oid, &journaler->header_oloc,
+		journaler->client, &journaler->obj_pos_committing);
+
+	if (ret) {
+		pr_err("error in client committed: %d", ret);
+	}
+
+	queue_work(journaler->task_wq, &journaler->notify_update_work);
+
+	mutex_lock(&journaler->commit_lock);
+	journaler->commit_scheduled = false;
+	mutex_unlock(&journaler->commit_lock);
+}
+
+// hold journaler->commit_lock
+static void add_object_position(struct commit_entry *entry,
+			       struct list_head *object_positions,
+			       uint64_t splay_width)
+{
+	struct ceph_journaler_object_pos *position;
+	uint8_t splay_offset = entry->object_num % splay_width;
+	bool found = false;
+	
+	list_for_each_entry(position, object_positions, node) {
+		if (position->in_using == false) {
+			found = true;
+			break;
+		}
+
+		if (splay_offset == position->object_num % splay_width) {
+			found = true;
+			break;
+		}
+	}
+
+	BUG_ON(!found);
+	if (position->in_using == false)
+		position->in_using = true;
+	position->object_num = entry->object_num;
+	position->tag_tid = entry->tag_tid;
+	position->entry_tid = entry->entry_tid;
+	list_move(&position->node, object_positions);
+}
+
+void ceph_journaler_client_committed(struct ceph_journaler *journaler, uint64_t commit_tid)
+{
+	struct commit_entry *commit_entry;
+	bool update_client_commit = true;
+	struct rb_node *n;
+
+	mutex_lock(&journaler->commit_lock);
+	// search commit entries in commit_tid order.
+	for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) {
+		commit_entry = rb_entry(n, struct commit_entry, r_node);
+		// set current commit entry to be committed.
+		if (commit_entry->commit_tid == commit_tid) {
+			commit_entry->committed = true;
+			break;
+		}
+		// if there is any one entry before commit_tid is not committed,
+		// we dont need to update_client_commit.
+		if (commit_entry->committed == false)
+			update_client_commit = false;
+	}
+
+	// update_client_commit when the all commit entries before this commit_tid
+	// are all committed.
+	if (update_client_commit) {
+		// update commit positions rely on commit entries one by one.
+		// until corrent commit_tid.
+		for (n = rb_first(&journaler->commit_entries); n;) {
+			commit_entry = rb_entry(n, struct commit_entry, r_node);
+			n = rb_next(n);
+
+			if (commit_entry->commit_tid > commit_tid)
+				break;
+			add_object_position(commit_entry,
+				&journaler->obj_pos_pending,
+				journaler->splay_width);
+			erase_commit_entry(&journaler->commit_entries, commit_entry);
+			kmem_cache_free(journaler_commit_entry_cache, commit_entry);
+		}
+	}
+
+	// schedule commit_work to call ceph_cls_journaler_client_committed()
+	// after JOURNALER_COMMIT_INTERVAL
+	if (update_client_commit && !journaler->commit_scheduled) {
+		queue_delayed_work(journaler->task_wq, &journaler->commit_work,
+				   JOURNALER_COMMIT_INTERVAL);
+		journaler->commit_scheduled = true;
+	}
+	mutex_unlock(&journaler->commit_lock);
+
+}
+EXPORT_SYMBOL(ceph_journaler_client_committed);
+
+// client need to allocate an uniq tag for itself, then every
+// journaler entry from this client will be tagged as his tag.
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler,
+				uint64_t tag_class, void *buf,
+				uint32_t buf_len,
+				struct ceph_journaler_tag *tag)
+{
+	uint64_t tag_tid;
+	int ret;
+
+retry:
+	ret = ceph_cls_journaler_get_next_tag_tid(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					&tag_tid);
+	if (ret)
+		goto out;
+
+	ret = ceph_cls_journaler_tag_create(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					tag_tid, tag_class,
+					buf, buf_len);
+	if (ret < 0) {
+		if (ret == -ESTALE) {
+			goto retry;
+		} else {
+			goto out;
+		}
+	}
+
+	ret = ceph_cls_journaler_get_tag(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					tag_tid, tag);
+	if (ret)
+		goto out;
+
+out:
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_allocate_tag);
-- 
1.8.3.1





[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux