[PATCH v2 06/16] libceph: introduce generic journaling

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



This commit introduce the generic journaling. This is a initial
commit, which only includes some generic functions, such as
journaler_create|destroy() and journaler_open|close().

Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx>
---
 include/linux/ceph/journaler.h | 180 ++++++++++++
 net/ceph/Makefile              |   3 +-
 net/ceph/journaler.c           | 610 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 792 insertions(+), 1 deletion(-)
 create mode 100644 include/linux/ceph/journaler.h
 create mode 100644 net/ceph/journaler.c

diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h
new file mode 100644
index 0000000..730d0ed
--- /dev/null
+++ b/include/linux/ceph/journaler.h
@@ -0,0 +1,180 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_JOURNAL_H
+#define _FS_CEPH_JOURNAL_H
+
+#include <linux/rbtree.h>
+#include <linux/ceph/cls_journaler_client.h>
+
+struct ceph_osd_client;
+
+#define JOURNAL_HEADER_PREFIX	"journal."
+#define JOURNAL_OBJECT_PREFIX	"journal_data."
+
+#define LOCAL_MIRROR_UUID	""
+
+/// preamble, version, entry tid, tag id
+static const uint32_t HEADER_FIXED_SIZE = 25;
+/// data size, crc
+static const uint32_t REMAINDER_FIXED_SIZE = 8;
+static const uint64_t PREAMBLE = 0x3141592653589793;
+
+struct ceph_journaler_ctx;
+typedef void (*ceph_journalecallback_t)(struct ceph_journaler_ctx *);
+
+struct ceph_journaler_ctx {
+	struct list_head	node;
+	struct ceph_bio_iter	*bio_iter;
+	size_t bio_len;
+
+	struct page *prefix_page;
+	unsigned int prefix_offset;
+	unsigned int prefix_len;
+
+	struct page *suffix_page;
+	unsigned int suffix_offset;
+	unsigned int suffix_len;
+
+	int result;
+	void *priv;
+	ceph_journalecallback_t callback;
+};
+
+struct ceph_journaler_future {
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	uint64_t commit_tid;
+
+	spinlock_t lock;
+	bool safe;
+	bool consistent;
+
+	struct ceph_journaler_ctx *ctx;
+	struct ceph_journaler_future *wait;
+};
+
+struct ceph_journaler_entry {
+	struct list_head node;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	ssize_t data_len;
+	char *data;
+	struct ceph_bio_iter *bio_iter;
+};
+
+struct entry_tid {
+	struct list_head	node;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+};
+
+struct commit_entry {
+	struct rb_node	r_node;
+	uint64_t commit_tid;
+	uint64_t object_num;
+	uint64_t tag_tid;
+	uint64_t entry_tid;
+	bool committed;
+};
+
+struct object_recorder {
+	spinlock_t lock;
+	bool overflowed;
+	uint64_t splay_offset;
+	uint64_t inflight_append;
+
+	struct list_head append_list;
+	struct list_head overflow_list;
+};
+
+struct object_replayer {
+	spinlock_t lock;
+	uint64_t object_num;
+	struct ceph_journaler_object_pos *pos;
+	struct list_head entry_list;
+};
+
+struct ceph_journaler {
+	struct ceph_osd_client		*osdc;
+	struct ceph_object_id		header_oid;
+	struct ceph_object_locator	header_oloc;
+	struct ceph_object_locator	data_oloc;
+	char				*object_oid_prefix;
+	char				*client_id;
+
+	bool				advancing;
+	bool				overflowed;
+	bool				commit_scheduled;
+	uint8_t				order;
+	uint8_t				splay_width;
+	int64_t				pool_id;
+	uint64_t			splay_offset;
+	uint64_t			active_tag_tid;
+	uint64_t			prune_tag_tid;
+	uint64_t			commit_tid;
+	uint64_t			minimum_set;
+	uint64_t			active_set;
+
+	struct ceph_journaler_future	*prev_future;
+	struct ceph_journaler_client	*client;
+	struct object_recorder		*obj_recorders;
+	struct object_replayer		*obj_replayers;
+
+	struct ceph_journaler_object_pos *obj_pos_pending_array;
+	struct list_head		obj_pos_pending;
+	struct ceph_journaler_object_pos *obj_pos_committing_array;
+	struct list_head		obj_pos_committing;
+
+	struct mutex			meta_lock;
+	struct mutex			commit_lock;
+	spinlock_t		entry_tid_lock;
+	spinlock_t		finish_lock;
+	struct list_head		ctx_list;
+	struct list_head		clients;
+	struct list_head		clients_cache;
+	struct list_head		entry_tids;
+	struct rb_root			commit_entries;
+
+	struct workqueue_struct		*task_wq;
+	struct workqueue_struct		*notify_wq;
+	struct work_struct		flush_work;
+	struct delayed_work		commit_work;
+	struct work_struct		overflow_work;
+	struct work_struct		finish_work;
+	struct work_struct		notify_update_work;
+
+	void *fetch_buf;
+	int (*handle_entry)(void *entry_handler,
+			    struct ceph_journaler_entry *entry,
+			    uint64_t commit_tid);
+	void *entry_handler;
+	struct ceph_osd_linger_request *watch_handle;
+};
+
+// generic functions
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     struct ceph_object_locator*_oloc,
+					     const char *journal_id,
+					     const char *client_id);
+void ceph_journaler_destroy(struct ceph_journaler *journal);
+
+int ceph_journaler_open(struct ceph_journaler *journal);
+void ceph_journaler_close(struct ceph_journaler *journal);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+				     struct ceph_journaler_client **client_result);
+// replaying
+int ceph_journaler_start_replay(struct ceph_journaler *journaler);
+
+// recording
+struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void);
+void ceph_journaler_ctx_put(struct ceph_journaler_ctx *journaler_ctx);
+int ceph_journaler_append(struct ceph_journaler *journaler,
+			  uint64_t tag_tid, uint64_t *commit_tid,
+			  struct ceph_journaler_ctx *ctx);
+void ceph_journaler_client_committed(struct ceph_journaler *journaler,
+				     uint64_t commit_tid);
+int ceph_journaler_allocate_tag(struct ceph_journaler *journaler,
+				uint64_t tag_class, void *buf,
+				uint32_t buf_len,
+				struct ceph_journaler_tag *tag);
+#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index db09defe..a965d64 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -14,4 +14,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o \
-	pagevec.o snapshot.o string_table.o
+	pagevec.o snapshot.o string_table.o \
+	journaler.o cls_journaler_client.o
diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c
new file mode 100644
index 0000000..d8721a9
--- /dev/null
+++ b/net/ceph/journaler.c
@@ -0,0 +1,610 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/ceph_features.h>
+#include <linux/ceph/cls_journaler_client.h>
+#include <linux/ceph/journaler.h>
+#include <linux/ceph/libceph.h>
+#include <linux/ceph/osd_client.h>
+
+#include <linux/crc32c.h>
+#include <linux/module.h>
+
+#define JOURNALER_COMMIT_INTERVAL		msecs_to_jiffies(5000)
+
+static char *object_oid_prefix(int pool_id, const char *journal_id)
+{
+	char *prefix = NULL;
+	ssize_t len = snprintf(NULL, 0, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+			       pool_id, journal_id);
+
+	prefix = kzalloc(len + 1, GFP_KERNEL);
+
+	if (!prefix)
+		return NULL;
+
+	WARN_ON(snprintf(prefix, len + 1, "%s%d.%s.", JOURNAL_OBJECT_PREFIX,
+			 pool_id, journal_id) != len);
+
+	return prefix;
+}
+
+static void journaler_flush(struct work_struct *work);
+static void journaler_finish(struct work_struct *work);
+static void journaler_client_commit(struct work_struct *work);
+static void journaler_notify_update(struct work_struct *work);
+static void journaler_overflow(struct work_struct *work);
+
+struct journaler_append_ctx {
+	struct list_head node;
+	struct ceph_journaler *journaler;
+
+	uint64_t splay_offset;
+	uint64_t object_num;
+	struct page *req_page;
+
+	struct ceph_journaler_future future;
+	struct ceph_journaler_entry entry;
+	struct ceph_journaler_ctx journaler_ctx;
+
+	struct kref	kref;
+};
+
+static struct kmem_cache	*journaler_commit_entry_cache;
+static struct kmem_cache	*journaler_append_ctx_cache;
+
+struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc,
+					     struct ceph_object_locator *oloc,
+					     const char *journal_id,
+					     const char *client_id)
+{
+	struct ceph_journaler *journaler = NULL;
+	int ret = 0;
+
+	journaler = kzalloc(sizeof(struct ceph_journaler), GFP_KERNEL);
+	if (!journaler)
+		return NULL;
+
+	journaler->osdc = osdc;
+	ceph_oid_init(&journaler->header_oid);
+	ret = ceph_oid_aprintf(&journaler->header_oid, GFP_NOIO, "%s%s",
+				JOURNAL_HEADER_PREFIX, journal_id);
+	if (ret) {
+		pr_err("aprintf error : %d", ret);
+		goto err_free_journaler;
+	}
+
+	ceph_oloc_init(&journaler->header_oloc);
+	ceph_oloc_copy(&journaler->header_oloc, oloc);
+	ceph_oloc_init(&journaler->data_oloc);
+
+	journaler->object_oid_prefix = object_oid_prefix(journaler->header_oloc.pool,
+							 journal_id);
+
+	if (!journaler->object_oid_prefix)
+		goto err_destroy_data_oloc;
+
+	journaler->client_id = kstrdup(client_id, GFP_NOIO);
+	if (!journaler->client_id) {
+		ret = -ENOMEM;
+		goto err_free_object_oid_prefix;
+	}
+
+	journaler->advancing = false;
+	journaler->overflowed = false;
+	journaler->commit_scheduled = false;
+	journaler->order = 0;
+	journaler->splay_width = 0;
+	journaler->pool_id = -1;
+	journaler->splay_offset = 0;
+	journaler->active_tag_tid = UINT_MAX;
+	journaler->prune_tag_tid = UINT_MAX;
+	journaler->commit_tid = 0;
+	journaler->minimum_set = 0;
+	journaler->active_set = 0;
+
+	journaler->prev_future = NULL;
+	journaler->client = NULL;
+	journaler->obj_recorders = NULL;
+	journaler->obj_replayers = NULL;
+
+	mutex_init(&journaler->meta_lock);
+	mutex_init(&journaler->commit_lock);
+	spin_lock_init(&journaler->entry_tid_lock);
+	spin_lock_init(&journaler->finish_lock);
+
+	INIT_LIST_HEAD(&journaler->ctx_list);
+	INIT_LIST_HEAD(&journaler->clients);
+	INIT_LIST_HEAD(&journaler->clients_cache);
+	INIT_LIST_HEAD(&journaler->entry_tids);
+	INIT_LIST_HEAD(&journaler->obj_pos_pending);
+	INIT_LIST_HEAD(&journaler->obj_pos_committing);
+
+	journaler->commit_entries = RB_ROOT;
+	journaler_commit_entry_cache = KMEM_CACHE(commit_entry, 0);
+	if (!journaler_commit_entry_cache)
+		goto err_free_client_id;
+
+	journaler_append_ctx_cache = KMEM_CACHE(journaler_append_ctx, 0);
+	if (!journaler_append_ctx_cache)
+		goto err_destroy_commit_entry_cache;
+
+	journaler->task_wq = alloc_ordered_workqueue("journaler-tasks",
+						     WQ_MEM_RECLAIM);
+	if (!journaler->task_wq)
+		goto err_destroy_append_ctx_cache;
+
+	journaler->notify_wq = create_singlethread_workqueue("journaler-notify");
+	if (!journaler->notify_wq)
+		goto err_destroy_task_wq;
+
+	INIT_WORK(&journaler->flush_work, journaler_flush);
+	INIT_WORK(&journaler->finish_work, journaler_finish);
+	INIT_DELAYED_WORK(&journaler->commit_work, journaler_client_commit);
+	INIT_WORK(&journaler->notify_update_work, journaler_notify_update);
+	INIT_WORK(&journaler->overflow_work, journaler_overflow);
+
+	journaler->fetch_buf = NULL;
+	journaler->handle_entry = NULL;
+	journaler->entry_handler = NULL;
+	journaler->watch_handle = NULL;
+
+	return journaler;
+
+err_destroy_task_wq:
+	destroy_workqueue(journaler->task_wq);
+err_destroy_append_ctx_cache:
+	kmem_cache_destroy(journaler_append_ctx_cache);
+err_destroy_commit_entry_cache:
+	kmem_cache_destroy(journaler_commit_entry_cache);
+err_free_client_id:
+	kfree(journaler->client_id);
+err_free_object_oid_prefix:
+	kfree(journaler->object_oid_prefix);
+err_destroy_data_oloc:
+	ceph_oloc_destroy(&journaler->data_oloc);
+	ceph_oloc_destroy(&journaler->header_oloc);
+	ceph_oid_destroy(&journaler->header_oid);
+err_free_journaler:
+	kfree(journaler);
+	return NULL;
+}
+EXPORT_SYMBOL(ceph_journaler_create);
+
+void ceph_journaler_destroy(struct ceph_journaler *journaler)
+{
+	destroy_workqueue(journaler->notify_wq);
+	destroy_workqueue(journaler->task_wq);
+
+	kmem_cache_destroy(journaler_append_ctx_cache);
+	kmem_cache_destroy(journaler_commit_entry_cache);
+	kfree(journaler->client_id);
+	kfree(journaler->object_oid_prefix);
+	ceph_oloc_destroy(&journaler->data_oloc);
+	ceph_oloc_destroy(&journaler->header_oloc);
+	ceph_oid_destroy(&journaler->header_oid);
+	kfree(journaler);
+}
+EXPORT_SYMBOL(ceph_journaler_destroy);
+
+static int remove_set(struct ceph_journaler *journaler, uint64_t object_set);
+static int set_minimum_set(struct ceph_journaler* journaler,
+			   uint64_t minimum_set);
+
+static int refresh(struct ceph_journaler *journaler, bool init)
+{
+	int ret = 0;
+	struct ceph_journaler_client *client;
+	uint64_t minimum_commit_set;
+	uint64_t minimum_set;
+	uint64_t active_set;
+	bool need_advance = false;
+	LIST_HEAD(tmp_clients);
+
+	INIT_LIST_HEAD(&tmp_clients);
+	ret = ceph_cls_journaler_get_mutable_metas(journaler->osdc,
+			&journaler->header_oid, &journaler->header_oloc,
+			&minimum_set, &active_set);
+	if (ret)
+		return ret;
+
+	ret = ceph_cls_journaler_client_list(journaler->osdc, &journaler->header_oid,
+		&journaler->header_oloc, &journaler->clients_cache, journaler->splay_width);
+	if (ret)
+		return ret;
+	mutex_lock(&journaler->meta_lock);
+
+	if (init) {
+		journaler->active_set = active_set;
+		journaler->minimum_set = minimum_set;
+	} else {
+		// check for advance active_set.
+		need_advance = active_set > journaler->active_set;
+		journaler->minimum_set = minimum_set;
+	}
+
+	// swap clients with clients_cache
+	list_splice_tail_init(&journaler->clients, &tmp_clients);
+	list_splice_tail_init(&journaler->clients_cache, &journaler->clients);
+	list_splice_tail_init(&tmp_clients, &journaler->clients_cache);
+
+	// calculate the minimum_commit_set.
+	minimum_commit_set = journaler->active_set;
+	list_for_each_entry(client, &journaler->clients, node) {
+		struct ceph_journaler_object_pos *pos;
+
+		list_for_each_entry(pos, &client->object_positions, node) {
+			uint64_t object_set = pos->object_num / journaler->splay_width;
+			if (object_set < minimum_commit_set) {
+				minimum_commit_set = object_set;
+			}
+		}
+
+		if (!strcmp(client->id, journaler->client_id)) {
+			journaler->client = client;
+		}
+	}
+	mutex_unlock(&journaler->meta_lock);
+
+	if (need_advance) {
+		mutex_lock(&journaler->meta_lock);
+		journaler->active_set = active_set;
+		journaler->overflowed = false;
+		journaler->advancing = false;
+		mutex_unlock(&journaler->meta_lock);
+
+		queue_work(journaler->task_wq, &journaler->flush_work);
+	}
+
+	// remove set if necessary
+	if (minimum_commit_set > minimum_set) {
+		uint64_t trim_set = minimum_set;
+		while (trim_set < minimum_commit_set) {
+			ret = remove_set(journaler, trim_set);
+			if (ret < 0 && ret != -ENOENT) {
+				pr_err("failed to trim object_set: %llu", trim_set);
+				return ret;
+			}
+			trim_set++;
+		}
+
+		ret = set_minimum_set(journaler, minimum_commit_set);
+		if (ret < 0) {
+			pr_err("failed to set minimum set to %llu", minimum_commit_set);
+			return ret;
+		}
+	}
+
+	return 0;
+
+}
+
+static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
+{
+	struct ceph_journaler *journaler = arg;
+	int ret = 0;
+
+	ret = refresh(journaler, false);
+        if (ret < 0) {
+                pr_err("%s: failed to refresh journaler: %d", __func__, ret);
+        }
+
+	ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid,
+				   &journaler->header_oloc, notify_id,
+				   cookie, NULL, 0);
+	if (ret)
+        	pr_err("acknowledge_notify failed: %d", ret);
+}
+
+static void journaler_watch_errcb(void *arg, u64 cookie, int err)
+{
+	// TODO re-watch in watch error.
+	pr_err("journaler watch error: %d", err);
+}
+
+static int journaler_watch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	struct ceph_osd_linger_request *handle;
+
+	handle = ceph_osdc_watch(osdc, &journaler->header_oid,
+				 &journaler->header_oloc, journaler->notify_wq,
+				 journaler_watch_cb, journaler_watch_errcb,
+				 journaler);
+	if (IS_ERR(handle))
+		return PTR_ERR(handle);
+
+	journaler->watch_handle = handle;
+	return 0;
+}
+
+static void journaler_unwatch(struct ceph_journaler *journaler)
+{
+	struct ceph_osd_client *osdc = journaler->osdc;
+	int ret = 0;
+
+	ret = ceph_osdc_unwatch(osdc, journaler->watch_handle);
+	if (ret)
+		pr_err("%s: failed to unwatch: %d", __func__, ret);
+
+	journaler->watch_handle = NULL;
+}
+
+static void copy_object_pos(struct ceph_journaler_object_pos *src_pos,
+			   struct ceph_journaler_object_pos *dst_pos)
+{
+	dst_pos->object_num = src_pos->object_num;
+	dst_pos->tag_tid = src_pos->tag_tid;
+	dst_pos->entry_tid = src_pos->entry_tid;
+}
+
+static void copy_pos_list(struct list_head *src_list, struct list_head *dst_list)
+{
+	struct ceph_journaler_object_pos *src_pos, *dst_pos;
+
+	src_pos = list_first_entry(src_list, struct ceph_journaler_object_pos, node);
+	dst_pos = list_first_entry(dst_list, struct ceph_journaler_object_pos, node);
+	while (&src_pos->node != src_list && &dst_pos->node != dst_list) {
+		copy_object_pos(src_pos, dst_pos);
+		src_pos = list_next_entry(src_pos, node);
+		dst_pos = list_next_entry(dst_pos, node);
+	}
+}
+
+int ceph_journaler_open(struct ceph_journaler *journaler)
+{
+	uint8_t order, splay_width;
+	int64_t pool_id;
+	int i = 0, ret = 0;
+	struct ceph_journaler_client *client = NULL, *next_client = NULL;
+
+	ret = ceph_cls_journaler_get_immutable_metas(journaler->osdc,
+					&journaler->header_oid,
+					&journaler->header_oloc,
+					&order,
+					&splay_width,
+					&pool_id);
+	if (ret) {
+		pr_err("failed to get immutable metas.");;
+		goto out;
+	}
+
+	mutex_lock(&journaler->meta_lock);
+	// set the immutable metas.
+	journaler->order = order;
+	journaler->splay_width = splay_width;
+	journaler->pool_id = pool_id;
+
+	if (journaler->pool_id == -1) {
+		ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc);
+		journaler->pool_id = journaler->data_oloc.pool;
+	} else {
+		journaler->data_oloc.pool = journaler->pool_id;
+	}
+
+	// initialize ->obj_recorders and ->obj_replayers.
+	journaler->obj_recorders = kzalloc(sizeof(struct object_recorder) *
+					   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_recorders) {
+		mutex_unlock(&journaler->meta_lock);
+		goto out;
+	}
+
+	journaler->obj_replayers = kzalloc(sizeof(struct object_replayer) *
+					   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_replayers) {
+		mutex_unlock(&journaler->meta_lock);
+		goto free_recorders;
+	}
+
+	journaler->obj_pos_pending_array = kzalloc(sizeof(struct ceph_journaler_object_pos) *
+					   	   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_pos_pending_array) {
+		mutex_unlock(&journaler->meta_lock);
+		goto free_replayers;
+	}
+
+	journaler->obj_pos_committing_array = kzalloc(sizeof(struct ceph_journaler_object_pos) *
+					   	   journaler->splay_width, GFP_KERNEL);
+
+	if (!journaler->obj_pos_committing_array) {
+		mutex_unlock(&journaler->meta_lock);
+		goto free_pos_pending;
+	}
+
+	for (i = 0; i < journaler->splay_width; i++) {
+		struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+		struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+		struct ceph_journaler_object_pos *pos_pending = &journaler->obj_pos_pending_array[i];
+		struct ceph_journaler_object_pos *pos_committing = &journaler->obj_pos_committing_array[i];
+
+		spin_lock_init(&obj_recorder->lock);
+		obj_recorder->overflowed = false;
+		obj_recorder->splay_offset = i;
+		obj_recorder->inflight_append = 0;
+		INIT_LIST_HEAD(&obj_recorder->append_list);
+		INIT_LIST_HEAD(&obj_recorder->overflow_list);
+
+		spin_lock_init(&obj_replayer->lock);
+		obj_replayer->object_num = i;
+		obj_replayer->pos = NULL;
+		INIT_LIST_HEAD(&obj_replayer->entry_list);
+
+		pos_pending->in_using = false;
+		INIT_LIST_HEAD(&pos_pending->node);
+		list_add_tail(&pos_pending->node, &journaler->obj_pos_pending);
+
+		pos_committing->in_using = false;
+		INIT_LIST_HEAD(&pos_committing->node);
+		list_add_tail(&pos_committing->node, &journaler->obj_pos_committing);
+	}
+	mutex_unlock(&journaler->meta_lock);
+
+	ret = refresh(journaler, true);
+	if (ret)
+		goto free_pos_committing;
+
+	mutex_lock(&journaler->meta_lock);
+	if (journaler->client){
+		copy_pos_list(&journaler->client->object_positions,
+			      &journaler->obj_pos_pending);
+	}
+	mutex_unlock(&journaler->meta_lock);
+
+	ret = journaler_watch(journaler);
+	if (ret) {
+		pr_err("journaler_watch error: %d", ret);
+		goto destroy_clients;
+	}
+	return 0;
+
+destroy_clients:
+	list_for_each_entry_safe(client, next_client,
+				 &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	list_for_each_entry_safe(client, next_client,
+				 &journaler->clients_cache, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+free_pos_committing:
+	kfree(journaler->obj_pos_committing_array);
+free_pos_pending:
+	kfree(journaler->obj_pos_pending_array);
+free_replayers:
+	kfree(journaler->obj_replayers);
+free_recorders:
+	kfree(journaler->obj_recorders);
+out:
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_open);
+
+DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node)
+
+void ceph_journaler_close(struct ceph_journaler *journaler)
+{
+	struct ceph_journaler_client *client = NULL, *next = NULL;
+	struct commit_entry *commit_entry = NULL;
+	struct entry_tid *entry_tid = NULL, *entry_tid_next = NULL;
+	struct ceph_journaler_object_pos *pos = NULL, *next_pos = NULL;
+	struct rb_node *n;
+	int i = 0;
+
+	// Stop watching
+	journaler_unwatch(journaler);
+	flush_workqueue(journaler->notify_wq);
+
+	flush_delayed_work(&journaler->commit_work);
+	drain_workqueue(journaler->task_wq);
+	list_for_each_entry_safe(pos, next_pos,
+				 &journaler->obj_pos_pending, node) {
+		list_del(&pos->node);
+	}
+
+	list_for_each_entry_safe(pos, next_pos,
+				 &journaler->obj_pos_committing, node) {
+		list_del(&pos->node);
+	}
+	journaler->client = NULL;
+	list_for_each_entry_safe(client, next, &journaler->clients, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+	list_for_each_entry_safe(client, next, &journaler->clients_cache, node) {
+		list_del(&client->node);
+		destroy_client(client);
+	}
+
+	for (n = rb_first(&journaler->commit_entries); n;) {
+		commit_entry = rb_entry(n, struct commit_entry, r_node);
+
+		n = rb_next(n);
+		erase_commit_entry(&journaler->commit_entries, commit_entry);
+		kmem_cache_free(journaler_commit_entry_cache, commit_entry);
+	}
+
+	for (i = 0; i < journaler->splay_width; i++) {
+		struct object_recorder *obj_recorder = &journaler->obj_recorders[i];
+		struct object_replayer *obj_replayer = &journaler->obj_replayers[i];
+
+		spin_lock(&obj_recorder->lock);
+		BUG_ON(!list_empty(&obj_recorder->append_list) ||
+			!list_empty(&obj_recorder->overflow_list));
+		spin_unlock(&obj_recorder->lock);
+
+		spin_lock(&obj_replayer->lock);
+		BUG_ON(!list_empty(&obj_replayer->entry_list));
+		spin_unlock(&obj_replayer->lock);
+	}
+
+	kfree(journaler->obj_pos_committing_array);
+	kfree(journaler->obj_pos_pending_array);
+	kfree(journaler->obj_recorders);
+	kfree(journaler->obj_replayers);
+	journaler->obj_recorders = NULL;
+	journaler->obj_replayers = NULL;
+
+	list_for_each_entry_safe(entry_tid, entry_tid_next,
+				 &journaler->entry_tids, node) {
+		list_del(&entry_tid->node);
+		kfree(entry_tid);
+	}
+
+	WARN_ON(!list_empty(&journaler->ctx_list));
+	WARN_ON(!list_empty(&journaler->clients));
+	WARN_ON(!list_empty(&journaler->clients_cache));
+	WARN_ON(!list_empty(&journaler->entry_tids));
+	WARN_ON(!list_empty(&journaler->obj_pos_pending));
+	WARN_ON(rb_first(&journaler->commit_entries) != NULL);
+
+	mutex_lock(&journaler->meta_lock);
+	ceph_oloc_init(&journaler->data_oloc);
+	journaler->advancing = false;
+	journaler->overflowed = false;
+	journaler->commit_scheduled = false;
+	journaler->order = 0;
+	journaler->splay_width = 0;
+	journaler->pool_id = -1;
+	journaler->splay_offset = 0;
+	journaler->active_tag_tid = UINT_MAX;
+	journaler->prune_tag_tid = UINT_MAX;
+	journaler->commit_tid = 0;
+	journaler->minimum_set = 0;
+	journaler->active_set = 0;
+	journaler->prev_future = NULL;
+	journaler->fetch_buf = NULL;
+	journaler->handle_entry = NULL;
+	journaler->entry_handler = NULL;
+	journaler->watch_handle = NULL;
+
+	mutex_unlock(&journaler->meta_lock);
+
+	return;
+}
+EXPORT_SYMBOL(ceph_journaler_close);
+
+int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *client_id,
+				     struct ceph_journaler_client **client_result)
+{
+	struct ceph_journaler_client *client = NULL;
+	int ret = -ENOENT;
+
+	list_for_each_entry(client, &journaler->clients, node) {
+		if (!strcmp(client->id, client_id)) {
+			*client_result = client;
+			ret = 0;
+			break;
+		}
+	}
+
+	return ret;
+}
+EXPORT_SYMBOL(ceph_journaler_get_cached_client);
-- 
1.8.3.1





[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux