Re: [PATCH 2/4] libceph: introduce cls_journaler_client

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 08/16/2018 12:59 AM, Dongsheng Yang wrote:
> This is a cls client module for journaler.
> 
> Signed-off-by: Dongsheng Yang <dongsheng.yang@xxxxxxxxxxxx>

Trivial comments.  Sorry, I can't offer a "proper" review...

					-Alex

> ---
>  include/linux/ceph/cls_journaler_client.h |  87 ++++++
>  net/ceph/cls_journaler_client.c           | 501 ++++++++++++++++++++++++++++++
>  2 files changed, 588 insertions(+)
>  create mode 100644 include/linux/ceph/cls_journaler_client.h
>  create mode 100644 net/ceph/cls_journaler_client.c
> 
> diff --git a/include/linux/ceph/cls_journaler_client.h b/include/linux/ceph/cls_journaler_client.h
> new file mode 100644
> index 0000000..cc9be96
> --- /dev/null
> +++ b/include/linux/ceph/cls_journaler_client.h
> @@ -0,0 +1,87 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H
> +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H
> +
> +#include <linux/ceph/osd_client.h>
> +
> +struct ceph_journaler;
> +struct ceph_journaler_client;
> +
> +struct ceph_journaler_object_pos {
> +	struct list_head	node;
> +	u64 			object_num;
> +	u64 			tag_tid;
> +	u64 			entry_tid;
> +};
> +
> +struct ceph_journaler_client {
> +	struct list_head	node;
> +	size_t 			id_len;
> +	char 			*id;
> +	size_t 			data_len;
> +	char 			*data;
> +	struct list_head	object_positions;
> +};
> +
> +struct ceph_journaler_tag {
> +	uint64_t tid;
> +	uint64_t tag_class;
> +	size_t data_len;
> +	char *data;
> +};
> +
> +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +					   uint8_t *order,
> +					   uint8_t *splay_width,
> +					   int64_t *pool_id);
> +
> +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc,
> +		       			 struct ceph_object_id *oid,
> +		       			 struct ceph_object_locator *oloc,
> +					 uint64_t *minimum_set, uint64_t *active_set);
> +
> +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc,
> +		       		   struct ceph_object_id *oid,
> +		       		   struct ceph_object_locator *oloc,
> +				   struct ceph_journaler_client **clients,
> +				   uint32_t *client_num);
> +
> +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc,
> +		       		   struct ceph_object_id *oid,
> +		       		   struct ceph_object_locator *oloc,
> +				   uint64_t *tag_tid);
> +
> +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc,
> +		       	       struct ceph_object_id *oid,
> +		       	       struct ceph_object_locator *oloc,
> +			       uint64_t tag_tid, struct ceph_journaler_tag *tag);
> +
> +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc,
> +		       		  struct ceph_object_id *oid,
> +		       		  struct ceph_object_locator *oloc,
> +				  uint64_t tag_tid, uint64_t tag_class,
> +				  void *buf, uint32_t buf_len);
> +
> +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   struct ceph_journaler_client *client,
> +					struct list_head *object_positions);
> +
> +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t active_set);
> +
> +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t minimum_set);
> +
> +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t soft_limit);
> +#endif
> diff --git a/net/ceph/cls_journaler_client.c b/net/ceph/cls_journaler_client.c
> new file mode 100644
> index 0000000..971fc5d
> --- /dev/null
> +++ b/net/ceph/cls_journaler_client.c
> @@ -0,0 +1,501 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <linux/ceph/ceph_debug.h>
> +
> +#include <linux/types.h>
> +#include <linux/slab.h>
> +
> +#include <linux/ceph/cls_journaler_client.h>
> +#include <linux/ceph/decode.h>
> +#include <linux/ceph/journaler.h>
> +
> +//TODO get all metas in one single request

You should get rid of "TODO" comments; if it's a to-do item, describe the
work to be done elsewhere (like a tracker entry).

Also, do not use // style comments in kernel code.

> +int ceph_cls_journaler_get_immutable_metas(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +					   uint8_t *order,
> +					   uint8_t *splay_width,
> +					   int64_t *pool_id)
> +{
> +	struct page *reply_page;
> +	size_t reply_len = sizeof(*order);
> +	int ret;
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page)
> +		return -ENOMEM;
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {

Curly braces not needed for blocks with one line (throughout).

If this does not succeed, should you return an error here rather
than continuing?


Ok, that's it for now...








> +		memcpy(order, page_address(reply_page), reply_len);
> +	}
> +
> +	reply_len = sizeof(*splay_width);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		memcpy(splay_width, page_address(reply_page), reply_len);
> +	}
> +
> +	reply_len = sizeof(*pool_id);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		memcpy(pool_id, page_address(reply_page), reply_len);
> +	}
> +
> +	dout("%s: status %d, order: %d\n", __func__, ret, *order);
> +
> +	__free_page(reply_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_get_immutable_metas);
> +
> +//TODO get all metas in one single request
> +int ceph_cls_journaler_get_mutable_metas(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +					   uint64_t *minimum_set, uint64_t *active_set)
> +{
> +	struct page *reply_page;
> +	int ret;
> +	size_t reply_len = sizeof(*minimum_set);
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page)
> +		return -ENOMEM;
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		memcpy(minimum_set, page_address(reply_page), reply_len);
> +	}
> +
> +	reply_len = sizeof(active_set);
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		memcpy(active_set, page_address(reply_page), reply_len);
> +	}
> +
> +	dout("%s: status %d, minimum_set: %llu, active_set: %llu\n", __func__, ret, *minimum_set, *active_set);
> +
> +	__free_page(reply_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_get_mutable_metas);
> +
> +static int decode_object_position(void **p, void *end, struct ceph_journaler_object_pos *pos)
> +{
> +	u8 struct_v;
> +	u32 struct_len;
> +	int ret = 0;
> +
> +	u64 object_num = 0;
> +	u64 tag_tid = 0;
> +	u64 entry_tid = 0;
> +
> +	ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	object_num = ceph_decode_64(p);

Maybe drop some of these extra blank lines.

> +
> +	tag_tid = ceph_decode_64(p);
> +
> +	entry_tid = ceph_decode_64(p);
> +
> +	dout("object_num: %llu, tag_tid: %llu, entry_tid: %llu", object_num, tag_tid, entry_tid);
> +
> +	pos->object_num = object_num;
> +	pos->tag_tid = tag_tid;
> +	pos->entry_tid = entry_tid;
> +
> +	return ret;
> +}
> +
> +static int decode_client(void **p, void *end, struct ceph_journaler_client *client)
> +{
> +	u8 struct_v;
> +	u32 struct_len;
> +	int ret = 0;
> +	int num, i;
> +	u8 state_raw;
> +	
> +	INIT_LIST_HEAD(&client->node);
> +	INIT_LIST_HEAD(&client->object_positions);
> +	ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply",
> +				  &struct_v, &struct_len);
> +	dout("%s, ret from ceph_start_decoding: %d", __func__, ret);
> +	if (ret)
> +		return ret;
> +
> +	client->id = ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); 
> +
> +	client->data = ceph_extract_encoded_string(p, end, &client->data_len, GFP_NOIO); 
> +
> +	ret = ceph_start_decoding(p, end, 1, "cls_joural_client_object_set_position",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	num = ceph_decode_32(p);
> +
> +	for (i = 0; i < num; i++) {
> +		struct ceph_journaler_object_pos *pos = kzalloc(sizeof(*pos), GFP_KERNEL);

Put a blank line under declarations (even if you assign).

> +		if (!pos)
> +			return -ENOMEM;
> +
> +		ret = decode_object_position(p, end, pos);
> +		if (ret)
> +			return ret;
> +		list_add_tail(&pos->node, &client->object_positions);
> +	}
> +
> +	state_raw = ceph_decode_8(p);
> +
> +	return ret;
> +}
> +
> +static int decode_clients(void **p, void *end, struct ceph_journaler_client **clients, uint32_t *client_num)
> +{
> +	int i;
> +	int ret = 0;
> +	
> +	*client_num = ceph_decode_32(p);
> +	if (ret)
> +		return ret;
> +
> +	*clients = kcalloc(*client_num, sizeof(**clients), GFP_NOIO);
> +	if (!*clients)
> +		return -ENOMEM;
> +
> +	for (i = 0; i < *client_num; i++) {
> +		ret = decode_client(p, end, *clients + i);
> +		if (ret)
> +			return ret;
> +	}
> +
> +	return ret;
> +}
> +
> +int ceph_cls_journaler_client_list(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   struct ceph_journaler_client **clients,
> +				   uint32_t *client_num)
> +{
> +	struct page *reply_page;
> +	struct page *req_page;
> +	int ret;
> +	size_t reply_len = PAGE_SIZE;
> +	int buf_size;
> +	void *p, *end;
> +	char name[] = "";
> +
> +	buf_size = strlen(name) + sizeof(__le32) + sizeof(uint64_t);
> +
> +	if (buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page)
> +		return -ENOMEM;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +	end = p + buf_size;
> +
> +	ceph_encode_string(&p, end, name, strlen(name));
> +	ceph_encode_64(&p, (uint64_t)256);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list",
> +			     CEPH_OSD_FLAG_READ, req_page,
> +			     buf_size, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		p = page_address(reply_page);
> +		end = p + reply_len;
> +
> +		ret = decode_clients(&p, end, clients, client_num);
> +	}
> +
> +	__free_page(reply_page);
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_client_list);
> +
> +int ceph_cls_journaler_get_next_tag_tid(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   uint64_t *tag_tid)
> +{
> +	struct page *reply_page;
> +	int ret;
> +	size_t reply_len = PAGE_SIZE;
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page)
> +		return -ENOMEM;
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid",
> +			     CEPH_OSD_FLAG_READ, NULL,
> +			     0, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		memcpy(tag_tid, page_address(reply_page), reply_len);
> +	}
> +
> +	__free_page(reply_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_get_next_tag_tid);
> +
> +int ceph_cls_journaler_tag_create(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +					uint64_t tag_tid, uint64_t tag_class,
> +				   void *buf, uint32_t buf_len)
> +{
> +	struct page *req_page;
> +	int ret;
> +	int buf_size;
> +	void *p, *end;
> +
> +	buf_size = buf_len + sizeof(__le32) + sizeof(uint64_t) + sizeof(uint64_t);
> +
> +	if (buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +	end = p + buf_size;
> +
> +	ceph_encode_64(&p, tag_tid);
> +	ceph_encode_64(&p, tag_class);
> +	ceph_encode_string(&p, end, buf, buf_len);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create",
> +			     CEPH_OSD_FLAG_WRITE, req_page,
> +			     buf_size, NULL, NULL);
> +
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_tag_create);
> +
> +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag)
> +{
> +	int ret = 0;
> +	u8 struct_v;
> +	u32 struct_len;
> +
> +	ret = ceph_start_decoding(p, end, 1, "cls_journaler_tag",
> +				  &struct_v, &struct_len);
> +	if (ret)
> +		return ret;
> +
> +	tag->tid = ceph_decode_64(p);
> +	tag->tag_class = ceph_decode_64(p);
> +	tag->data = ceph_extract_encoded_string(p, end, &tag->data_len, GFP_NOIO); 
> +
> +	return 0;
> +}
> +
> +int ceph_cls_journaler_get_tag(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   uint64_t tag_tid, struct ceph_journaler_tag *tag)
> +{
> +	struct page *reply_page;
> +	struct page *req_page;
> +	int ret;
> +	size_t reply_len = PAGE_SIZE;
> +	int buf_size;
> +	void *p, *end;
> +
> +	buf_size = sizeof(tag_tid);
> +
> +	reply_page = alloc_page(GFP_NOIO);
> +	if (!reply_page)
> +		return -ENOMEM;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +	end = p + buf_size;
> +
> +	ceph_encode_64(&p, tag_tid);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag",
> +			     CEPH_OSD_FLAG_READ, req_page,
> +			     buf_size, reply_page, &reply_len);
> +
> +	if (!ret) {
> +		p = page_address(reply_page);
> +		end = p + reply_len;
> +
> +		ret = decode_tag(&p, end, tag);
> +	}
> +
> +	__free_page(reply_page);
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_get_tag);
> +
> +static int version_len = 6;
> +
> +int ceph_cls_journaler_client_committed(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   struct ceph_journaler_client *client,
> +					struct list_head *object_positions)
> +{
> +	struct page *req_page;
> +	int ret;
> +	int buf_size;
> +	void *p, *end;
> +	struct ceph_journaler_object_pos *position = NULL;
> +
> +	int object_position_len = version_len + 8 + 8 + 8;
> +
> +	int pos_num = 0;
> +
> +	buf_size = 4 + client->id_len + version_len + 4;
> +
> +	list_for_each_entry(position, object_positions, node) {
> +		buf_size += object_position_len;
> +		pos_num++;
> +	}
> +
> +	if (buf_size > PAGE_SIZE)
> +		return -E2BIG;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +	end = p + buf_size;
> +
> +	ceph_encode_string(&p, end, client->id, client->id_len);
> +
> +	ceph_start_encoding(&p, 1, 1, buf_size - client->id_len - version_len - 4);
> +
> +	ceph_encode_32(&p, pos_num);
> +
> +	list_for_each_entry(position, object_positions, node) {
> +		ceph_start_encoding(&p, 1, 1, 24);
> +		ceph_encode_64(&p, position->object_num);
> +		ceph_encode_64(&p, position->tag_tid);
> +		ceph_encode_64(&p, position->entry_tid);
> +	}
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit",
> +			     CEPH_OSD_FLAG_WRITE, req_page,
> +			     buf_size, NULL, NULL);
> +
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_client_committed);
> +
> +
> +int ceph_cls_journaler_set_minimum_set(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t minimum_set)
> +{
> +	struct page *req_page;
> +	int ret;
> +	void *p;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +
> +	ceph_encode_64(&p, minimum_set);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set",
> +			     CEPH_OSD_FLAG_WRITE, req_page,
> +			     8, NULL, NULL);
> +
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_set_minimum_set);
> +
> +int ceph_cls_journaler_set_active_set(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t active_set)
> +{
> +	struct page *req_page;
> +	int ret;
> +	void *p;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +
> +	ceph_encode_64(&p, active_set);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set",
> +			     CEPH_OSD_FLAG_WRITE, req_page,
> +			     8, NULL, NULL);
> +
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_set_active_set);
> +
> +int ceph_cls_journaler_guard_append(struct ceph_osd_client *osdc,
> +		       			   struct ceph_object_id *oid,
> +		       			   struct ceph_object_locator *oloc,
> +				   	   uint64_t soft_limit)
> +{
> +	struct page *req_page;
> +	int ret;
> +	void *p;
> +
> +	req_page = alloc_page(GFP_NOIO);
> +	if (!req_page)
> +		return -ENOMEM;
> +
> +	p = page_address(req_page);
> +
> +	ceph_encode_64(&p, soft_limit);
> +
> +	ret = ceph_osdc_call(osdc, oid, oloc, "journal", "guard_append",
> +			     CEPH_OSD_FLAG_READ, req_page,
> +			     8, NULL, NULL);
> +
> +	__free_page(req_page);
> +	return ret;
> +}
> +EXPORT_SYMBOL(ceph_cls_journaler_guard_append);
> 




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux