This commit introduce rbd_journal into rbd_device. with journaling feature enabled, We will open journal after exclusive-lock acquired and close journal before exclusive-lock released. Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx> --- drivers/block/rbd.c | 237 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index cc0642c..bd90c17 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -28,16 +28,19 @@ */ +#include <linux/crc32c.h> #include <linux/ceph/libceph.h> #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> #include <linux/ceph/cls_lock_client.h> #include <linux/ceph/striper.h> #include <linux/ceph/decode.h> +#include <linux/ceph/journaler.h> #include <linux/parser.h> #include <linux/bsearch.h> #include <linux/kernel.h> +#include <linux/bio.h> #include <linux/device.h> #include <linux/module.h> #include <linux/blk-mq.h> @@ -378,6 +381,8 @@ struct rbd_device { atomic_t parent_ref; struct rbd_device *parent; + struct rbd_journal *journal; + /* Block layer tags. */ struct blk_mq_tag_set tag_set; @@ -408,6 +413,22 @@ enum rbd_dev_flags { RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */ }; +#define LOCAL_MIRROR_UUID "" +#define LOCAL_CLIENT_ID "" + +enum rbd_journal_state { + RBD_JOURNAL_STATE_INITIALIZED, + RBD_JOURNAL_STATE_OPENED, + RBD_JOURNAL_STATE_CLOSED, +}; + +struct rbd_journal { + struct ceph_journaler *journaler; + uint64_t tag_tid; + /* state is protected by rbd_dev->lock_rwsem */ + enum rbd_journal_state state; +}; + static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ @@ -2681,6 +2702,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) /* * lock_rwsem must be held for write */ +static int rbd_dev_open_journal(struct rbd_device *rbd_dev); static int rbd_lock(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; @@ -2697,6 +2719,15 @@ static int rbd_lock(struct rbd_device *rbd_dev) if (ret) return ret; + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) { + ret = rbd_dev_open_journal(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "open journal failed: %d", ret); + set_disk_ro(rbd_dev->disk, true); + set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags); + return -EBLACKLISTED; + } + } rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; __rbd_lock(rbd_dev, cookie); return 0; @@ -2705,6 +2736,7 @@ static int rbd_lock(struct rbd_device *rbd_dev) /* * lock_rwsem must be held for write */ +static void rbd_journal_close(struct rbd_journal *journal); static void rbd_unlock(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; @@ -2713,6 +2745,9 @@ static void rbd_unlock(struct rbd_device *rbd_dev) WARN_ON(!__rbd_is_lock_owner(rbd_dev) || rbd_dev->lock_cookie[0] == '\0'); + if (rbd_dev->journal) + rbd_journal_close(rbd_dev->journal); + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, RBD_LOCK_NAME, rbd_dev->lock_cookie); if (ret && ret != -ENOENT) @@ -5750,6 +5785,207 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +static int rbd_journal_allocate_tag(struct rbd_journal *journal); +static int rbd_journal_open(struct rbd_journal *journal) +{ + struct ceph_journaler *journaler = journal->journaler; + int ret = 0; + + ret = ceph_journaler_open(journaler); + if (ret) + goto out; + + ret = ceph_journaler_start_replay(journaler); + if (ret) + goto err_close_journaler; + + ret = rbd_journal_allocate_tag(journal); + if (ret) + goto err_close_journaler; + + journal->state = RBD_JOURNAL_STATE_OPENED; + return ret; + +err_close_journaler: + ceph_journaler_close(journaler); + +out: + return ret; +} + +static int rbd_dev_open_journal(struct rbd_device *rbd_dev) +{ + int ret = 0; + struct rbd_journal *journal = NULL; + struct ceph_journaler *journaler = NULL; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + + if (rbd_dev->journal && rbd_dev->journal->state == RBD_JOURNAL_STATE_OPENED) + return 0; + + // create journal + if (!rbd_dev->journal) { + journal = kzalloc(sizeof(struct rbd_journal), GFP_KERNEL); + if (!journal) + return -ENOMEM; + + journal->state = RBD_JOURNAL_STATE_INITIALIZED; + journaler = ceph_journaler_create(osdc, &rbd_dev->header_oloc, + rbd_dev->spec->image_id, + LOCAL_CLIENT_ID); + if (!journaler) { + ret = -ENOMEM; + goto err_free_journal; + } + + journaler->entry_handler = rbd_dev; + journaler->handle_entry = rbd_journal_replay; + + journal->journaler = journaler; + rbd_dev->journal = journal; + } + + // open journal + ret = rbd_journal_open(rbd_dev->journal); + if (ret) + goto err_destroy_journaler; + + return ret; + +err_destroy_journaler: + ceph_journaler_destroy(journaler); +err_free_journal: + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; + return ret; +} + +static void rbd_journal_close(struct rbd_journal *journal) +{ + if (journal->state == RBD_JOURNAL_STATE_CLOSED) + return; + ceph_journaler_close(journal->journaler); + journal->tag_tid = 0; + journal->state = RBD_JOURNAL_STATE_CLOSED; +} + +static void rbd_dev_close_journal(struct rbd_device *rbd_dev) +{ + struct ceph_journaler *journaler = NULL; + + if (!rbd_dev->journal) + return; + + rbd_journal_close(rbd_dev->journal); + + journaler = rbd_dev->journal->journaler; + ceph_journaler_destroy(journaler); + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; +} + +typedef struct rbd_journal_tag_predecessor { + bool commit_valid; + uint64_t tag_tid; + uint64_t entry_tid; + uint32_t uuid_len; + char *mirror_uuid; +} rbd_journal_tag_predecessor; + +typedef struct rbd_journal_tag_data { + struct rbd_journal_tag_predecessor predecessor; + uint32_t uuid_len; + char *mirror_uuid; +} rbd_journal_tag_data; + +static uint32_t tag_data_encoding_size(struct rbd_journal_tag_data *tag_data) +{ + // sizeof(uuid_len) 4 + uuid_len + 1 commit_valid + 8 tag_tid + + // 8 entry_tid + 4 sizeof(uuid_len) + uuid_len + return (4 + tag_data->uuid_len + 1 + 8 + 8 + 4 + + tag_data->predecessor.uuid_len); +} + +static void predecessor_encode(void **p, void *end, + struct rbd_journal_tag_predecessor *predecessor) +{ + ceph_encode_string(p, end, predecessor->mirror_uuid, + predecessor->uuid_len); + ceph_encode_8(p, predecessor->commit_valid); + ceph_encode_64(p, predecessor->tag_tid); + ceph_encode_64(p, predecessor->entry_tid); +} + +static int rbd_journal_encode_tag_data(void **p, void *end, + struct rbd_journal_tag_data *tag_data) +{ + struct rbd_journal_tag_predecessor *predecessor = &tag_data->predecessor; + + ceph_encode_string(p, end, tag_data->mirror_uuid, tag_data->uuid_len); + predecessor_encode(p, end, predecessor); + + return 0; +} + +static int rbd_journal_allocate_tag(struct rbd_journal *journal) +{ + struct ceph_journaler_tag tag = {}; + struct rbd_journal_tag_data tag_data = {}; + struct ceph_journaler *journaler = journal->journaler; + struct ceph_journaler_client *client; + struct rbd_journal_tag_predecessor *predecessor; + struct ceph_journaler_object_pos *position; + void *orig_buf = NULL, *buf = NULL, *p = NULL, *end = NULL; + uint32_t buf_len; + int ret = 0; + + ret = ceph_journaler_get_cached_client(journaler, LOCAL_CLIENT_ID, &client); + if (ret) + goto out; + + predecessor = &tag_data.predecessor; + position = list_first_entry(&client->object_positions, + struct ceph_journaler_object_pos, node); + + predecessor->commit_valid = true; + predecessor->tag_tid = position->tag_tid; + predecessor->entry_tid = position->entry_tid; + predecessor->uuid_len = 0; + predecessor->mirror_uuid = LOCAL_MIRROR_UUID; + + tag_data.uuid_len = 0; + tag_data.mirror_uuid = LOCAL_MIRROR_UUID; + + buf_len = tag_data_encoding_size(&tag_data); + + p = kmalloc(buf_len, GFP_KERNEL); + if (!p) { + pr_err("failed to allocate tag data"); + return -ENOMEM; + } + + end = p + buf_len; + orig_buf = buf = p; + ret = rbd_journal_encode_tag_data(&p, end, &tag_data); + if (ret) { + pr_err("error in tag data"); + goto free_buf; + } + + ret = ceph_journaler_allocate_tag(journaler, 0, buf, buf_len, &tag); + if (ret) + goto free_data; + + journal->tag_tid = tag.tid; +free_data: + if(tag.data) + kfree(tag.data); +free_buf: + kfree(orig_buf); +out: + return ret; +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); @@ -6074,6 +6310,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, device_del(&rbd_dev->dev); rbd_dev_image_unlock(rbd_dev); + rbd_dev_close_journal(rbd_dev); rbd_dev_device_release(rbd_dev); rbd_dev_image_release(rbd_dev); rbd_dev_destroy(rbd_dev); -- 1.8.3.1