rbd_journal_allocate_tag() get the client by client id and allocate an uniq tag for this client. All journal events from this client will be tagged by this tag. Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx> --- drivers/block/rbd.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6b387d9..6987259 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -34,6 +34,7 @@ #include <linux/ceph/cls_lock_client.h> #include <linux/ceph/striper.h> #include <linux/ceph/decode.h> +#include <linux/ceph/journaler.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -445,6 +446,8 @@ struct rbd_device { atomic_t parent_ref; struct rbd_device *parent; + struct rbd_journal *journal; + /* Block layer tags. */ struct blk_mq_tag_set tag_set; @@ -470,6 +473,11 @@ enum rbd_dev_flags { RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ }; +struct rbd_journal { + struct ceph_journaler *journaler; + u64 tag_tid; +}; + static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ @@ -6916,6 +6924,94 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +struct rbd_journal_tag_predecessor { + bool commit_valid; + u64 tag_tid; + u64 entry_tid; +}; + +struct rbd_journal_tag_data { + struct rbd_journal_tag_predecessor predecessor; +}; + +static u32 tag_data_encoding_size(struct rbd_journal_tag_data *tag_data) +{ + /* + * sizeof(uuid_len) 4 + uuid_len(0) + 1 commit_valid + 8 tag_tid + + * 8 entry_tid + 4 sizeof(uuid_len) + uuid_len(0) + */ + return (4 + 0 + 1 + 8 + 8 + 4 + 0); +} + +static void predecessor_encode(void **p, void *end, + struct rbd_journal_tag_predecessor *predecessor) +{ + /* Encode mirror uuid, as it's "", let's just encode 0 */ + ceph_encode_32(p, 0); + ceph_encode_8(p, predecessor->commit_valid); + ceph_encode_64(p, predecessor->tag_tid); + ceph_encode_64(p, predecessor->entry_tid); +} + +static void rbd_journal_encode_tag_data(void **p, void *end, + struct rbd_journal_tag_data *tag_data) +{ + /* Encode mirror uuid, as it's "", let's just encode 0 */ + ceph_encode_32(p, 0); + predecessor_encode(p, end, &tag_data->predecessor); +} + +#define LOCAL_CLIENT_ID "" + +static int rbd_journal_allocate_tag(struct rbd_journal *journal) +{ + struct ceph_journaler_tag tag = {}; + struct rbd_journal_tag_data tag_data = {}; + struct ceph_journaler *journaler = journal->journaler; + struct ceph_journaler_client *client; + struct rbd_journal_tag_predecessor *predecessor; + struct ceph_journaler_object_pos *position; + void *orig_buf, *buf, *p, *end; + u32 buf_len; + int ret; + + ret = ceph_journaler_get_cached_client(journaler, LOCAL_CLIENT_ID, + &client); + if (ret) + goto out; + + if (!list_empty(&client->object_positions)) { + position = list_first_entry(&client->object_positions, + struct ceph_journaler_object_pos, + node); + predecessor = &tag_data.predecessor; + predecessor->commit_valid = true; + predecessor->tag_tid = position->tag_tid; + predecessor->entry_tid = position->entry_tid; + } + buf_len = tag_data_encoding_size(&tag_data); + p = kmalloc(buf_len, GFP_KERNEL); + if (!p) { + pr_err("failed to allocate tag data"); + ret = -ENOMEM; + goto out; + } + + end = p + buf_len; + orig_buf = buf = p; + rbd_journal_encode_tag_data(&p, end, &tag_data); + + ret = ceph_journaler_allocate_tag(journaler, 0, buf, buf_len, &tag); + if (ret) + goto free_buf; + + journal->tag_tid = tag.tid; +free_buf: + kfree(orig_buf); +out: + return ret; +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); -- 1.8.3.1