When we are going to make sure the data and journal are consistent in opening journal, we can call the api of start_replay() to replay the all events recorded but not committed. Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx> --- net/ceph/journaler.c | 693 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 693 insertions(+) diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c index 1b04d3f..3e92e96 100644 --- a/net/ceph/journaler.c +++ b/net/ceph/journaler.c @@ -594,3 +594,696 @@ int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, char *cli return ret; } EXPORT_SYMBOL(ceph_journaler_get_cached_client); + +// replaying +static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, uint32_t read_off, uint64_t buf_len) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + pages = ceph_alloc_page_vector(num_pages, GFP_NOIO); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } + + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, read_off, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_req; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) + ceph_copy_from_page_vector(pages, buf, 0, ret); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static bool entry_is_readable(struct ceph_journaler *journaler, void *buf, + void *end, uint32_t *bytes_needed) +{ + /// preamble, version, entry tid, tag id + const uint32_t HEADER_FIXED_SIZE = 25; + uint32_t remaining = end - buf; + uint64_t preamble; + uint32_t data_size; + void *origin_buf = buf; + uint32_t crc, crc_encoded; + + if (remaining < HEADER_FIXED_SIZE) { + *bytes_needed = HEADER_FIXED_SIZE - remaining; + return false; + } + + preamble = ceph_decode_64(&buf); + if (PREAMBLE != preamble) { + *bytes_needed = 0; + return false; + } + + buf += (HEADER_FIXED_SIZE - sizeof(preamble)); + remaining = end - buf; + if (remaining < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - remaining; + return false; + } + + data_size = ceph_decode_32(&buf); + remaining = end - buf; + if (remaining < data_size) { + *bytes_needed = data_size - remaining; + return false; + } + + buf += data_size; + + remaining = end - buf; + if (remaining < sizeof(uint32_t)) { + *bytes_needed = sizeof(uint32_t) - remaining; + return false; + } + + *bytes_needed = 0; + crc = crc32c(0, origin_buf, buf - origin_buf); + crc_encoded = ceph_decode_32(&buf); + if (crc != crc_encoded) { + pr_err("crc corrupted"); + return false; + } + return true; +} + +static int playback_entry(struct ceph_journaler *journaler, + struct ceph_journaler_entry *entry, + uint64_t commit_tid) +{ + BUG_ON(!journaler->handle_entry || !journaler->entry_handler); + + return journaler->handle_entry(journaler->entry_handler, + entry, commit_tid); +} + +static bool get_last_entry_tid(struct ceph_journaler *journaler, + uint64_t tag_tid, uint64_t *entry_tid) +{ + struct entry_tid *pos; + + spin_lock(&journaler->entry_tid_lock); + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + *entry_tid = pos->entry_tid; + spin_unlock(&journaler->entry_tid_lock); + return true; + } + } + spin_unlock(&journaler->entry_tid_lock); + + return false; +} + +// There would not be too many entry_tids here, we need +// only one entry_tid for all entries with same tag_tid. +static struct entry_tid *entry_tid_alloc(struct ceph_journaler *journaler, + uint64_t tag_tid) +{ + struct entry_tid *entry_tid; + + entry_tid = kzalloc(sizeof(struct entry_tid), GFP_NOIO); + if (!entry_tid) { + pr_err("failed to allocate new entry."); + return NULL; + } + + entry_tid->tag_tid = tag_tid; + entry_tid->entry_tid = 0; + INIT_LIST_HEAD(&entry_tid->node); + + list_add_tail(&entry_tid->node, &journaler->entry_tids); + return entry_tid; +} + +static int reserve_entry_tid(struct ceph_journaler *journaler, + uint64_t tag_tid, uint64_t entry_tid) +{ + struct entry_tid *pos; + + spin_lock(&journaler->entry_tid_lock); + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + if (pos->entry_tid < entry_tid) { + pos->entry_tid = entry_tid; + } + + spin_unlock(&journaler->entry_tid_lock); + return 0; + } + } + + pos = entry_tid_alloc(journaler, tag_tid); + if (!pos) { + spin_unlock(&journaler->entry_tid_lock); + pr_err("failed to allocate new entry."); + return -ENOMEM; + } + + pos->entry_tid = entry_tid; + spin_unlock(&journaler->entry_tid_lock); + + return 0; +} + +static void journaler_entry_free(struct ceph_journaler_entry *entry) +{ + if (entry->data) + kvfree(entry->data); + kfree(entry); +} + +static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end) +{ + struct ceph_journaler_entry *entry; + uint64_t preamble; + uint8_t version; + uint32_t crc, crc_encoded; + void *start = *p; + + preamble = ceph_decode_64(p); + if (PREAMBLE != preamble) { + return NULL; + } + + version = ceph_decode_8(p); + if (version != 1) + return NULL; + + entry = kzalloc(sizeof(struct ceph_journaler_entry), GFP_NOIO); + if (!entry) { + goto err; + } + + INIT_LIST_HEAD(&entry->node); + entry->entry_tid = ceph_decode_64(p); + entry->tag_tid = ceph_decode_64(p); + // use kvmalloc to extract the data + entry->data = ceph_extract_encoded_string_kvmalloc(p, end, + &entry->data_len, GFP_KERNEL); + if (IS_ERR(entry->data)) { + entry->data = NULL; + goto free_entry; + } + + crc = crc32c(0, start, *p - start); + crc_encoded = ceph_decode_32(p); + if (crc != crc_encoded) { + goto free_entry; + } + return entry; + +free_entry: + journaler_entry_free(entry); +err: + return NULL; +} + +static int fetch(struct ceph_journaler *journaler, uint64_t object_num) +{ + struct ceph_object_id object_oid; + int ret; + void *read_buf, *end; + uint64_t read_len = 2 << journaler->order; + struct ceph_journaler_object_pos *pos; + struct object_replayer *obj_replayer; + + // get the replayer for this splay and set the object number of it to object_num. + obj_replayer = &journaler->obj_replayers[object_num % journaler->splay_width]; + obj_replayer->object_num = object_num; + + // find the commit position for this object_num. + list_for_each_entry(pos, &journaler->client->object_positions, node) { + if (pos->in_using && pos->object_num == object_num) { + // Tell the replayer there is a commit position + // in this object. So delete the entries before + // this position, they are already committed. + obj_replayer->pos = pos; + break; + } + } + + // read the object data + ceph_oid_init(&object_oid); + ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu", + journaler->object_oid_prefix, object_num); + if (ret) { + pr_err("failed to initialize object_id : %d", ret); + return ret; + } + + read_buf = journaler->fetch_buf; + ret = ceph_journaler_obj_read_sync(journaler, &object_oid, + &journaler->data_oloc, read_buf, + 0, read_len); + if (ret == -ENOENT) { + dout("no such object, %s: %d", object_oid.name, ret); + goto err_free_object_oid; + } else if (ret < 0) { + pr_err("failed to read: %d", ret); + goto err_free_object_oid; + } else if (ret == 0) { + pr_err("no data: %d", ret); + goto err_free_object_oid; + } + + // decode the entries in this object + end = read_buf + ret; + while (read_buf < end) { + uint32_t bytes_needed = 0; + struct ceph_journaler_entry *entry = NULL; + + if (!entry_is_readable(journaler, read_buf, end, &bytes_needed)) { + ret = -EIO; + goto err_free_object_oid; + } + + entry = journaler_entry_decode(&read_buf, end); + if (!entry) + goto err_free_object_oid; + + list_add_tail(&entry->node, &obj_replayer->entry_list); + } + ret = 0; + +err_free_object_oid: + ceph_oid_destroy(&object_oid); + return ret; +} + +static int add_commit_entry(struct ceph_journaler *journaler, uint64_t commit_tid, + uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid) +{ + struct commit_entry *commit_entry; + + commit_entry = kmem_cache_zalloc(journaler_commit_entry_cache, GFP_NOIO); + if (!commit_entry) + return -ENOMEM; + + RB_CLEAR_NODE(&commit_entry->r_node); + + commit_entry->commit_tid = commit_tid; + commit_entry->object_num = object_num; + commit_entry->tag_tid = tag_tid; + commit_entry->entry_tid = entry_tid; + + mutex_lock(&journaler->commit_lock); + insert_commit_entry(&journaler->commit_entries, commit_entry); + mutex_unlock(&journaler->commit_lock); + + return 0; +} + +// journaler->meta_lock held +static uint64_t __allocate_commit_tid(struct ceph_journaler *journaler) +{ + return ++journaler->commit_tid; +} + +static uint64_t allocate_commit_tid(struct ceph_journaler *journaler) +{ + uint64_t commit_tid; + + mutex_lock(&journaler->meta_lock); + commit_tid = __allocate_commit_tid(journaler); + mutex_unlock(&journaler->meta_lock); + + return commit_tid; +} + +static void prune_tag(struct ceph_journaler *journaler, uint64_t tag_tid) +{ + struct ceph_journaler_entry *entry, *next; + struct object_replayer *obj_replayer; + int i; + + if (journaler->prune_tag_tid == UINT_MAX || + journaler->prune_tag_tid < tag_tid) { + journaler->prune_tag_tid = tag_tid; + } + + for (i = 0; i < journaler->splay_width; i++) { + obj_replayer = &journaler->obj_replayers[i]; + list_for_each_entry_safe(entry, next, + &obj_replayer->entry_list, node) { + if (entry->tag_tid == tag_tid) { + list_del(&entry->node); + journaler_entry_free(entry); + } + } + } +} + +static int get_first_entry(struct ceph_journaler *journaler, + struct ceph_journaler_entry **entry, + uint64_t *commit_tid) +{ + struct object_replayer *obj_replayer; + struct ceph_journaler_entry *tmp_entry; + uint64_t last_entry_tid; + bool expect_first_entry = false; + int ret; + +next: + // find the current replayer. + obj_replayer = &journaler->obj_replayers[journaler->splay_offset]; + if (list_empty(&obj_replayer->entry_list)) { + if (journaler->splay_offset == 0) { + return -ENOENT; + } else { + journaler->splay_offset = 0; + goto next; + } + } + + // get the first entry in current replayer + tmp_entry = list_first_entry(&obj_replayer->entry_list, + struct ceph_journaler_entry, node); + + // advance the splay_offset + journaler->splay_offset = (journaler->splay_offset + 1) % journaler->splay_width; + if (journaler->active_tag_tid == UINT_MAX) { + // There is no active tag tid. This would happen when + // there is no commit in this journal. But + // we have some uncommitted entries. So set the current + // tag_tid to be the active_tag_tid. + journaler->active_tag_tid = tmp_entry->tag_tid; + } else if (tmp_entry->tag_tid < journaler->active_tag_tid || + (journaler->prune_tag_tid != UINT_MAX && + tmp_entry->tag_tid <= journaler->prune_tag_tid)) { + pr_err("detected stale entry: object_num=%llu, tag_tid=%llu,\ + entry_tid=%llu.", obj_replayer->object_num, tmp_entry->tag_tid, + tmp_entry->entry_tid); + list_del(&tmp_entry->node); + prune_tag(journaler, tmp_entry->tag_tid); + journaler_entry_free(tmp_entry); + goto next; + } else if (tmp_entry->tag_tid > journaler->active_tag_tid) { + // found a new tag_tid, which means a new client is starting + // to append journal events. lets prune the old tag + prune_tag(journaler, journaler->active_tag_tid); + if (tmp_entry->entry_tid == 0) { + // this is the first entry of new tag client, + // advance the active_tag_tid to the new tag_tid. + journaler->active_tag_tid = tmp_entry->tag_tid; + } else { + if (expect_first_entry) { + pr_err("We expect this is the first entry for \ + next tag. but the entry_tid is: %llu.", + tmp_entry->entry_tid); + return -ENOMSG; + } + // each client is appending events from the first + // object (splay_offset: 0). If we found a new tag + // but this is not the first entry (entry_tid: 0), + // let's jump the splay_offset to 0 to get the + // first entry from the new tag client. + journaler->splay_offset = 0; + + // When we jump splay_offset to 0, we expect to get + // the first entry for a new tag. + expect_first_entry = true; + goto next; + } + } + + // Pop this entry from journal + list_del(&tmp_entry->node); + + // check entry_tid to make sure this entry_tid is after last_entry_tid + // for the same tag. + ret = get_last_entry_tid(journaler, tmp_entry->tag_tid, &last_entry_tid); + if (ret && tmp_entry->entry_tid != last_entry_tid + 1) { + pr_err("missing prior journal entry, last_entry_tid: %llu", + last_entry_tid); + ret = -ENOMSG; + goto free_entry; + } + + // fetch next object if this object is done. + if (list_empty(&obj_replayer->entry_list)) { + ret = fetch(journaler, obj_replayer->object_num + journaler->splay_width); + if (ret && ret != -ENOENT) { + goto free_entry; + } + } + + ret = reserve_entry_tid(journaler, tmp_entry->tag_tid, tmp_entry->entry_tid); + if (ret) + goto free_entry; + + // allocate commit_tid for this entry + *commit_tid = allocate_commit_tid(journaler); + ret = add_commit_entry(journaler, *commit_tid, obj_replayer->object_num, + tmp_entry->tag_tid, tmp_entry->entry_tid); + if (ret) + goto free_entry; + + *entry = tmp_entry; + return 0; + +free_entry: + journaler_entry_free(tmp_entry); + return ret; +} + +static int process_replay(struct ceph_journaler *journaler) +{ + int r; + struct ceph_journaler_entry *entry; + uint64_t commit_tid; + +next: + // get the first entry from the journal, while there + // are different journal objects. + r = get_first_entry(journaler, &entry, &commit_tid); + if (r) { + if (r == -ENOENT) { + prune_tag(journaler, journaler->active_tag_tid); + r = 0; + } + return r; + } + + r = playback_entry(journaler, entry, commit_tid); + journaler_entry_free(entry); + if (r) { + return r; + } + + goto next; +} + +// reserve entry tid and delete entries before commit position +static int preprocess_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_entry *entry, *next; + bool found_commit = false; + struct object_replayer *obj_replayer; + int i, ret; + + for (i = 0; i < journaler->splay_width; i++) { + obj_replayer = &journaler->obj_replayers[i]; + // If obj_replayer->pos is NULL, that means + // there is no commit position in this object. + if (!obj_replayer->pos) + continue; + found_commit = false; + list_for_each_entry_safe(entry, next, + &obj_replayer->entry_list, node) { + if (entry->tag_tid == obj_replayer->pos->tag_tid && + entry->entry_tid == obj_replayer->pos->entry_tid) { + found_commit = true; + } else if (found_commit) { + break; + } + + // This entry is before commit position, skip it in replaying. + ret = reserve_entry_tid(journaler, entry->tag_tid, entry->entry_tid); + if (ret) + return ret; + list_del(&entry->node); + journaler_entry_free(entry); + } + } + return 0; +} + +/* + * Why do we need to replay? + * + * Because we append journal firstly before write data objects. So when + * a crash happened in last writing, there would be some entries in journal + * which means these data is safe, but they are not committed to data + * objects. So when we want to open journal again, we need to check the journal + * and playback the uncommitted journal. + * + * Example: + * + * splay_width: 4 + * + * commit positions: + * [[object_number=2, tag_tid=1, entry_tid=10], + * [object_number=1, tag_tid=1, entry_tid=9], + * [object_number=0, tag_tid=1, entry_tid=8], + * [object_number=3, tag_tid=1, entry_tid=7]] + * + * journal entries (number means the entry_tid; * means commit position): + * object 0: |0|4|8*|12|16|20| + * object 1: |1|5|9*|13|17|21| + * object 2: |2|6|10*|14|18| + * object 3: |3|7*|11|15|19| + * + * In this case, we need to replay the entries from 11 to 21. + * + * Step 1: Get the active position and fetch objects. + * "Active position" means the last committied position, that's the head + * of commit positions. We need to replay the entries after this position. + * + * "fetch objects" means the last committed object in each splay. we don't need + * to replay entries committed, so let's fetch journal from the last committed + * objects + * + * The active position in above example is [object_number=2, tag_tid=1, entry_tid=10]. + * The fetch objects in above example is [0, 1, 2, 3] + * + * Step 2: fetch journal objects. + * read the data in "fetch objects" ([0, 1, 2, 3]), decode the entries and put + * them in list of replayer->entry_list. + * + * In above example, the entry_list would be like this: + * replayer_0->entry_list: 0->4->8*->12->16->20. + * replayer_1->entry_list: 1->5->9*->13->17->21. + * replayer_2->entry_list: 2->6->10*->14->18. + * replayer_3->entry_list: 3->7*->11->15->19. + * + * Step 3: preprocess the journal entries + * delete entries before commit position which we dont need to replay, + * because they are already committed. So after preprocess, the entry_list + * would be that: + * + * replayer_0->entry_list: 12->16->20. + * replayer_1->entry_list: 13->17->21. + * replayer_2->entry_list: 14->18. + * replayer_3->entry_list: 11->15->19. + * + * Step 4: process the journal entries + * replay the entries one by one start from the entry after last commit position. + * + * As we know in Step 1, the last commit position is 10, the replay will begin + * from 11 and end after the last entry 21. + */ +int ceph_journaler_start_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_object_pos *active_pos; + uint64_t *fetch_objects; + uint64_t buf_len = (2 << journaler->order); + uint64_t object_num; + int i; + int ret = 0; + + if (!journaler->handle_entry || !journaler->entry_handler) { + pr_err("Please initialize the ->handle_entry and ->entry_handler"); + return -EINVAL; + } + + fetch_objects = kzalloc(sizeof(uint64_t) * journaler->splay_width, GFP_NOIO); + if (!fetch_objects) + return -ENOMEM; + + // Step 1: Get the active position. + mutex_lock(&journaler->meta_lock); + // Get the HEAD of commit positions, that means the last committed object position. + active_pos = list_first_entry(&journaler->client->object_positions, + struct ceph_journaler_object_pos, node); + // When there is no commit position in this journal, the active_pos would be empty. + // So skip getting active information when active_pos->in_using is false. + if (active_pos->in_using) { + journaler->splay_offset = (active_pos->object_num + 1) % journaler->splay_width; + journaler->active_tag_tid = active_pos->tag_tid; + + list_for_each_entry(active_pos, &journaler->client->object_positions, node) { + if (active_pos->in_using) { + fetch_objects[active_pos->object_num % + journaler->splay_width] = active_pos->object_num; + } + } + } + mutex_unlock(&journaler->meta_lock); + + // Step 2: fetch journal objects. + // fetch_buf will be used to read every journal object + journaler->fetch_buf = ceph_kvmalloc(buf_len, GFP_NOIO); + if (!journaler->fetch_buf) { + pr_err("failed to alloc fetch buf: %llu", buf_len); + ret = -ENOMEM; + goto out; + } + + for (i = 0; i < journaler->splay_width; i++) { + if (fetch_objects[i] == 0) { + // No active commit position, so fetch + // them in splay order. + object_num = i; + } else { + object_num = fetch_objects[i]; + } + ret = fetch(journaler, object_num); + if (ret && ret != -ENOENT) + goto free_fetch_buf; + } + + // Step 3: preprocess the journal entries + ret = preprocess_replay(journaler); + if (ret) + goto free_fetch_buf; + + // Step 4: process the journal entries + ret = process_replay(journaler); + +free_fetch_buf: + kvfree(journaler->fetch_buf); +out: + // cleanup replayers + for (i = 0; i < journaler->splay_width; i++) { + struct object_replayer *obj_replayer = &journaler->obj_replayers[i]; + struct ceph_journaler_entry *entry = NULL, *next_entry = NULL; + + spin_lock(&obj_replayer->lock); + list_for_each_entry_safe(entry, next_entry, &obj_replayer->entry_list, node) { + list_del(&entry->node); + journaler_entry_free(entry); + } + spin_unlock(&obj_replayer->lock); + } + kfree(fetch_objects); + return ret; +} +EXPORT_SYMBOL(ceph_journaler_start_replay); -- 1.8.3.1