On Mon, Jul 29, 2019 at 11:43 AM Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx> wrote: > > This is a cls client module for journaler. > > Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx> > --- > include/linux/ceph/cls_journaler_client.h | 94 +++++ > net/ceph/cls_journaler_client.c | 558 ++++++++++++++++++++++++++++++ > 2 files changed, 652 insertions(+) > create mode 100644 include/linux/ceph/cls_journaler_client.h > create mode 100644 net/ceph/cls_journaler_client.c > > diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h > new file mode 100644 > index 0000000..6245882 > --- /dev/null > +++ b/include/linux/ceph/cls_journaler_client.h > @@ -0,0 +1,94 @@ > +/* SPDX-License-Identifier: GPL-2.0 */ > +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H > +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H > + > +#include <linux/ceph/osd_client.h> > + > +struct ceph_journaler; > +struct ceph_journaler_client; > + > +struct ceph_journaler_object_pos { > + struct list_head node; > + u64 object_num; > + u64 tag_tid; > + u64 entry_tid; > + // ->in_using means this object_pos is initialized. > + // There would be some stub for it created in init step > + // to allocate memory as early as possible. > + bool in_using; > +}; > + > +struct ceph_journaler_client { > + struct list_head node; > + size_t id_len; > + char *id; > + size_t data_len; > + char *data; Is client->data ever used? If not, drop it and skip over this string when decoding. > + struct list_head object_positions; > + struct ceph_journaler_object_pos *object_positions_array; > +}; > + > +struct ceph_journaler_tag { > + uint64_t tid; > + uint64_t tag_class; > + size_t data_len; > + char *data; Same here, is tag->data ever used? > +}; > + > +void destroy_client(struct ceph_journaler_client *client); > + > +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint8_t *order, > + uint8_t *splay_width, > + int64_t *pool_id); > + > +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *minimum_set, uint64_t *active_set); > + > +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct list_head *clients, > + uint8_t splay_width); > + > +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *tag_tid); > + > +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, struct ceph_journaler_tag *tag); > + > +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, uint64_t tag_class, > + void *buf, uint32_t buf_len); > + > +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct ceph_journaler_client *client, > + struct list_head *object_positions); > + > +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t active_set); > + > +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t minimum_set); > + > +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t soft_limit); > +#endif > diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c > new file mode 100644 > index 0000000..ac27589 > --- /dev/null > +++ b/net/ceph/cls_journaler_client.c > @@ -0,0 +1,558 @@ > +// SPDX-License-Identifier: GPL-2.0 > +#include <linux/ceph/ceph_debug.h> > +#include <linux/ceph/cls_journaler_client.h> > +#include <linux/ceph/journaler.h> > + > +#include <linux/types.h> > + > +//TODO get all metas in one single request > +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint8_t *order, > + uint8_t *splay_width, > + int64_t *pool_id) > +{ > + struct page *reply_page; > + size_t reply_len = sizeof(*order); > + int ret; > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(order, page_address(reply_page), reply_len); > + } else { > + goto out; > + } > + reply_len = sizeof(*splay_width); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(splay_width, page_address(reply_page), reply_len); > + } else { > + goto out; > + } > + reply_len = sizeof(*pool_id); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(pool_id, page_address(reply_page), reply_len); This memcpy() is not going to work on big endian machines. Use either ceph_decode_*() or le*_to_cpu() helpers. Also, check for errors instead of success: ret = ceph_osdc_call(...); if (ret) goto out; ... decode ... > + } else { > + goto out; > + } > +out: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas); > + > +//TODO get all metas in one single request > +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *minimum_set, uint64_t *active_set) > +{ > + struct page *reply_page; > + int ret; > + size_t reply_len = sizeof(*minimum_set); > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(minimum_set, page_address(reply_page), reply_len); Same here. > + } else { > + goto out; > + } > + reply_len = sizeof(active_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + if (!ret) { > + memcpy(active_set, page_address(reply_page), reply_len); Same here. > + } else { > + goto out; > + } > +out: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas); > + > +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos) > +{ > + u8 struct_v; > + u32 struct_len; > + int ret; > + u64 object_num; > + u64 tag_tid; > + u64 entry_tid; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + object_num = ceph_decode_64(p); > + tag_tid = ceph_decode_64(p); > + entry_tid = ceph_decode_64(p); > + > + pos->object_num = object_num; > + pos->tag_tid = tag_tid; > + pos->entry_tid = entry_tid; Can decode directly into pos, without going through local variables. > + > + return ret; > +} > + > +void destroy_client(struct ceph_journaler_client *client) > +{ > + kfree(client->object_positions_array); > + kfree(client->id); > + kfree(client->data); > + > + kfree(client); > +} > + > +struct ceph_journaler_client *create_client(uint8_t splay_width) > +{ > + struct ceph_journaler_client *client; > + struct ceph_journaler_object_pos *pos; > + int i; > + > + client = kzalloc(sizeof(*client), GFP_NOIO); > + if (!client) > + return NULL; > + > + client->object_positions_array = kcalloc(splay_width, sizeof(*pos), GFP_NOIO); > + if (!client->object_positions_array) > + goto free_client; > + > + INIT_LIST_HEAD(&client->object_positions); > + for (i = 0; i < splay_width; i++) { > + pos = &client->object_positions_array[i]; > + INIT_LIST_HEAD(&pos->node); > + list_add_tail(&pos->node, &client->object_positions); > + } > + > + INIT_LIST_HEAD(&client->node); > + client->data = NULL; > + client->id = NULL; > + > + return client; > +free_client: > + kfree(client); > + return NULL; > +} > + > +static int decode_client(void **p, void *end, struct ceph_journaler_client *client) > +{ > + u8 struct_v; > + u8 state_raw; > + u32 struct_len; > + int ret, num, i; > + struct ceph_journaler_object_pos *pos; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); > + if (IS_ERR(client->id)) { > + ret = PTR_ERR(client->id); > + client->id = NULL; > + goto err; > + } > + client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); > + if (IS_ERR(client->data)) { > + ret = PTR_ERR(client->data); > + client->data = NULL; > + goto free_id; > + } > + ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position", > + &struct_v, &struct_len); > + if (ret) > + goto free_data; > + > + num = ceph_decode_32(p); > + i = 0; > + list_for_each_entry(pos, &client->object_positions, node) { > + if (i < num) { > + // we will use this position stub > + pos->in_using = true; > + ret = decode_object_position(p, end, pos); > + if (ret) { > + goto free_data; > + } > + } else { > + // This stub is not used anymore > + pos->in_using = false; > + } > + i++; > + } > + > + state_raw = ceph_decode_8(p); Just skip over it (i.e. increment p with a comment), don't add bogus local variables. > + return 0; > + > +free_data: > + kfree(client->data); > +free_id: > + kfree(client->id); > +err: > + return ret; > +} > + > +static int decode_clients(void **p, void *end, struct list_head *clients, uint8_t splay_width) > +{ > + int i = 0; > + int ret; > + uint32_t client_num; > + struct ceph_journaler_client *client, *next; > + > + client_num = ceph_decode_32(p); > + // Reuse the clients already exist in list. > + list_for_each_entry_safe(client, next, clients, node) { > + // Some clients unregistered. > + if (i < client_num) { > + kfree(client->id); > + kfree(client->data); > + ret = decode_client(p, end, client); > + if (ret) > + return ret; > + } else { > + list_del(&client->node); > + destroy_client(client); > + } > + i++; > + } > + > + // Some more clients registered. > + for (; i < client_num; i++) { > + client = create_client(splay_width); > + if (!client) > + return -ENOMEM; > + ret = decode_client(p, end, client); > + if (ret) { > + destroy_client(client); > + return ret; > + } > + list_add_tail(&client->node, clients); > + } > + return 0; > +} > + > +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct list_head *clients, > + uint8_t splay_width) > +{ > + struct page *reply_page; > + struct page *req_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + int buf_size; > + void *p, *end; > + char name[] = ""; > + > + buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t); > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; If name is always an empty string, this can't ever be true. > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) { > + ret = -ENOMEM; > + goto free_reply_page; > + } > + > + p = page_address(req_page); > + end = p + buf_size; > + > + ceph_encode_string(&p, end, name, strlen(name)); I would drop name and encode 0 directly. > + ceph_encode_64(&p, (uint64_t)256); This constant needs a define or a comment. Unnecessary cast. > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", > + CEPH_OSD_FLAG_READ, req_page, > + buf_size, &reply_page, &reply_len); > + > + if (!ret) { > + p = page_address(reply_page); > + end = p + reply_len; > + > + ret = decode_clients(&p, end, clients, splay_width); > + } > + > + __free_page(req_page); > +free_reply_page: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_client_list); > + > +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t *tag_tid) > +{ > + struct page *reply_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", > + CEPH_OSD_FLAG_READ, NULL, > + 0, &reply_page, &reply_len); > + > + if (!ret) { > + memcpy(tag_tid, page_address(reply_page), reply_len); Same here. > + } > + > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid); > + > +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, uint64_t tag_class, > + void *buf, uint32_t buf_len) > +{ > + struct page *req_page; > + int ret; > + int buf_size; > + void *p, *end; > + > + buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t); > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + end = p + buf_size; > + > + ceph_encode_64(&p, tag_tid); > + ceph_encode_64(&p, tag_class); > + ceph_encode_string(&p, end, buf, buf_len); > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", > + CEPH_OSD_FLAG_WRITE, req_page, > + buf_size, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_tag_create); > + > +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) > +{ > + int ret; > + u8 struct_v; > + u32 struct_len; > + > + ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag", > + &struct_v, &struct_len); > + if (ret) > + return ret; > + > + tag->tid = ceph_decode_64(p); > + tag->tag_class = ceph_decode_64(p); > + tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); > + if (IS_ERR(tag->data)) { > + ret = PTR_ERR(tag->data); > + tag->data = NULL; > + return ret; > + } > + > + return 0; > +} > + > +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t tag_tid, struct ceph_journaler_tag *tag) > +{ > + struct page *reply_page; > + struct page *req_page; > + int ret; > + size_t reply_len = PAGE_SIZE; > + int buf_size; > + void *p, *end; > + > + buf_size = sizeof(tag_tid); > + > + reply_page = alloc_page(GFP_NOIO); > + if (!reply_page) > + return -ENOMEM; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) { > + ret = -ENOMEM; > + goto free_reply_page; > + } > + > + p = page_address(req_page); > + end = p + buf_size; > + ceph_encode_64(&p, tag_tid); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", > + CEPH_OSD_FLAG_READ, req_page, > + buf_size, &reply_page, &reply_len); > + > + if (!ret) { > + p = page_address(reply_page); > + end = p + reply_len; > + > + ret = decode_tag(&p, end, tag); > + } > + > + __free_page(req_page); > +free_reply_page: > + __free_page(reply_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_get_tag); > + > +static int version_len = 6; Use CEPH_ENCODING_START_BLK_LEN. > + > +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + struct ceph_journaler_client *client, > + struct list_head *object_positions) > +{ > + struct page *req_page; > + int ret; > + int buf_size; > + void *p, *end; > + struct ceph_journaler_object_pos *position = NULL; > + int object_position_len = version_len + 8 + 8 + 8; > + int pos_num = 0; > + > + buf_size = 4 + client->id_len + version_len + 4; > + list_for_each_entry(position, object_positions, node) { > + buf_size += object_position_len; > + pos_num++; > + } > + > + if (buf_size > PAGE_SIZE) > + return -E2BIG; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + end = p + buf_size; > + ceph_encode_string(&p, end, client->id, client->id_len); > + ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4); > + ceph_encode_32(&p, pos_num); > + > + list_for_each_entry(position, object_positions, node) { > + ceph_start_encoding(&p, 1, 1, 24); > + ceph_encode_64(&p, position->object_num); > + ceph_encode_64(&p, position->tag_tid); > + ceph_encode_64(&p, position->entry_tid); > + } > + > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", > + CEPH_OSD_FLAG_WRITE, req_page, > + buf_size, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_client_committed); > + > + > +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t minimum_set) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, minimum_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", > + CEPH_OSD_FLAG_WRITE, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set); > + > +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t active_set) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, active_set); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", > + CEPH_OSD_FLAG_WRITE, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set); > + > +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc, > + struct ceph_object_id *oid, > + struct ceph_object_locator *oloc, > + uint64_t soft_limit) > +{ > + struct page *req_page; > + int ret; > + void *p; > + > + req_page = alloc_page(GFP_NOIO); > + if (!req_page) > + return -ENOMEM; > + > + p = page_address(req_page); > + ceph_encode_64(&p, soft_limit); > + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append", > + CEPH_OSD_FLAG_READ, req_page, > + 8, NULL, NULL); > + > + __free_page(req_page); > + return ret; > +} > +EXPORT_SYMBOL(ceph_cls_journaler_guard_append); Looks like this function is unused. Thanks, Ilya