Change the type of ceph_osdc_call()'s reply to a ceph_databuf struct rather than a list of pages. Signed-off-by: David Howells <dhowells@xxxxxxxxxx> --- drivers/block/rbd.c | 134 ++++++++++++++++++-------------- include/linux/ceph/osd_client.h | 5 +- net/ceph/cls_lock_client.c | 40 +++++----- net/ceph/osd_client.c | 64 +++++++++++++-- 4 files changed, 158 insertions(+), 85 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 2a161b03dd7a..971fa4a581cf 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1823,9 +1823,8 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; CEPH_DEFINE_OID_ONSTACK(oid); - struct page **pages; - void *p, *end; - size_t reply_len; + struct ceph_databuf *reply; + void *p, *q, *end; u64 num_objects; u64 object_map_bytes; u64 object_map_size; @@ -1839,48 +1838,57 @@ static int __rbd_object_map_load(struct rbd_device *rbd_dev) object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ, BITS_PER_BYTE); num_pages = calc_pages_for(0, object_map_bytes) + 1; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - reply_len = num_pages * PAGE_SIZE; + reply = ceph_databuf_alloc(num_pages, num_pages * PAGE_SIZE, + GFP_KERNEL); + if (!reply) + return -ENOMEM; + rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid); ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc, "rbd", "object_map_load", CEPH_OSD_FLAG_READ, - NULL, 0, pages, &reply_len); + NULL, 0, reply); if (ret) goto out; - p = page_address(pages[0]); - end = p + min(reply_len, (size_t)PAGE_SIZE); - ret = decode_object_map_header(&p, end, &object_map_size); + p = kmap_ceph_databuf_page(reply, 0); + end = p + min(reply->iter.count, (size_t)PAGE_SIZE); + q = p; + ret = decode_object_map_header(&q, end, &object_map_size); if (ret) - goto out; + goto out_unmap; if (object_map_size != num_objects) { rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu", object_map_size, num_objects); ret = -EINVAL; - goto out; + goto out_unmap; } + iov_iter_advance(&reply->iter, q - p); - if (offset_in_page(p) + object_map_bytes > reply_len) { + if (object_map_bytes > reply->iter.count) { ret = -EINVAL; - goto out; + goto out_unmap; } rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL); if (!rbd_dev->object_map) { ret = -ENOMEM; - goto out; + goto out_unmap; } rbd_dev->object_map_size = object_map_size; - ceph_copy_from_page_vector(pages, rbd_dev->object_map, - offset_in_page(p), object_map_bytes); + ret = -EIO; + if (copy_from_iter(rbd_dev->object_map, object_map_bytes, + &reply->iter) != object_map_bytes) + goto out_unmap; + + ret = 0; +out_unmap: + kunmap_local(p); out: - ceph_release_page_vector(pages, num_pages); + ceph_databuf_release(reply); return ret; } @@ -1949,6 +1957,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, { struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; struct ceph_osd_data *osd_data; + struct ceph_databuf *dbuf; u64 objno; u8 state, new_state, current_state; bool has_current_state; @@ -1968,9 +1977,10 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, */ rbd_assert(osd_req->r_num_ops == 2); osd_data = osd_req_op_data(osd_req, 1, cls, request_data); - rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES); + rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_DATABUF); + dbuf = osd_data->dbuf; - p = page_address(osd_data->pages[0]); + p = kmap_ceph_databuf_page(dbuf, 0); objno = ceph_decode_64(&p); rbd_assert(objno == obj_req->ex.oe_objno); rbd_assert(ceph_decode_64(&p) == objno + 1); @@ -1978,6 +1988,7 @@ static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req, has_current_state = ceph_decode_8(&p); if (has_current_state) current_state = ceph_decode_8(&p); + kunmap_local(p); spin_lock(&rbd_dev->object_map_lock); state = __rbd_object_map_get(rbd_dev, objno); @@ -2017,7 +2028,7 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req, int which, u64 objno, u8 new_state, const u8 *current_state) { - struct page **pages; + struct ceph_databuf *dbuf; void *p, *start; int ret; @@ -2025,11 +2036,11 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req, if (ret) return ret; - pages = ceph_alloc_page_vector(1, GFP_NOIO); - if (IS_ERR(pages)) - return PTR_ERR(pages); + dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO); + if (!dbuf) + return -ENOMEM; - p = start = page_address(pages[0]); + p = start = kmap_ceph_databuf_page(dbuf, 0); ceph_encode_64(&p, objno); ceph_encode_64(&p, objno + 1); ceph_encode_8(&p, new_state); @@ -2039,9 +2050,11 @@ static int rbd_cls_object_map_update(struct ceph_osd_request *req, } else { ceph_encode_8(&p, 0); } + kunmap_local(p); + dbuf->length = p - start; - osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0, - false, true); + osd_req_op_cls_request_databuf(req, which, dbuf); + ceph_databuf_release(dbuf); return 0; } @@ -4613,8 +4626,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, size_t inbound_size) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_databuf *reply; struct page *req_page = NULL; - struct page *reply_page; int ret; /* @@ -4635,8 +4648,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, memcpy(page_address(req_page), outbound, outbound_size); } - reply_page = alloc_page(GFP_KERNEL); - if (!reply_page) { + reply = ceph_databuf_alloc(1, inbound_size, GFP_KERNEL); + if (!reply) { if (req_page) __free_page(req_page); return -ENOMEM; @@ -4644,15 +4657,16 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, CEPH_OSD_FLAG_READ, req_page, outbound_size, - &reply_page, &inbound_size); + reply); if (!ret) { - memcpy(inbound, page_address(reply_page), inbound_size); - ret = inbound_size; + ret = reply->length; + if (copy_from_iter(inbound, reply->length, &reply->iter) != ret) + ret = -EIO; } if (req_page) __free_page(req_page); - __free_page(reply_page); + ceph_databuf_release(reply); return ret; } @@ -5615,7 +5629,7 @@ static int decode_parent_image_spec(void **p, void *end, static int __get_parent_info(struct rbd_device *rbd_dev, struct page *req_page, - struct page *reply_page, + struct ceph_databuf *reply, struct parent_image_info *pii) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; @@ -5625,27 +5639,29 @@ static int __get_parent_info(struct rbd_device *rbd_dev, ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), &reply_page, &reply_len); + req_page, sizeof(u64), reply); if (ret) return ret == -EOPNOTSUPP ? 1 : ret; - p = page_address(reply_page); + p = kmap_ceph_databuf_page(reply, 0); end = p + reply_len; ret = decode_parent_image_spec(&p, end, pii); + kunmap_local(p); if (ret) return ret; ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), &reply_page, &reply_len); + req_page, sizeof(u64), reply); if (ret) return ret; - p = page_address(reply_page); + p = kmap_ceph_databuf_page(reply, 0); end = p + reply_len; ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval); if (pii->has_overlap) ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + kunmap_local(p); return 0; @@ -5658,25 +5674,25 @@ static int __get_parent_info(struct rbd_device *rbd_dev, */ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, struct page *req_page, - struct page *reply_page, + struct ceph_databuf *reply, struct parent_image_info *pii) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - size_t reply_len = PAGE_SIZE; void *p, *end; int ret; ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, "rbd", "get_parent", CEPH_OSD_FLAG_READ, - req_page, sizeof(u64), &reply_page, &reply_len); + req_page, sizeof(u64), reply); if (ret) return ret; - p = page_address(reply_page); - end = p + reply_len; + p = kmap_ceph_databuf_page(reply, 0); + end = p + reply->length; ceph_decode_64_safe(&p, end, pii->pool_id, e_inval); pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); if (IS_ERR(pii->image_id)) { + kunmap_local(p); ret = PTR_ERR(pii->image_id); pii->image_id = NULL; return ret; @@ -5684,6 +5700,7 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, ceph_decode_64_safe(&p, end, pii->snap_id, e_inval); pii->has_overlap = true; ceph_decode_64_safe(&p, end, pii->overlap, e_inval); + kunmap_local(p); return 0; @@ -5694,29 +5711,30 @@ static int __get_parent_info_legacy(struct rbd_device *rbd_dev, static int get_parent_info(struct rbd_device *rbd_dev, struct parent_image_info *pii) { - struct page *req_page, *reply_page; + struct ceph_databuf *reply; + struct page *req_page; void *p; - int ret; + int ret = -ENOMEM; req_page = alloc_page(GFP_KERNEL); if (!req_page) - return -ENOMEM; + goto out; - reply_page = alloc_page(GFP_KERNEL); - if (!reply_page) { - __free_page(req_page); - return -ENOMEM; - } + reply = ceph_databuf_alloc(1, PAGE_SIZE, GFP_KERNEL); + if (!reply) + goto out_free; - p = page_address(req_page); + p = kmap_local_page(req_page); ceph_encode_64(&p, rbd_dev->spec->snap_id); - ret = __get_parent_info(rbd_dev, req_page, reply_page, pii); + kunmap_local(p); + ret = __get_parent_info(rbd_dev, req_page, reply, pii); if (ret > 0) - ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page, - pii); + ret = __get_parent_info_legacy(rbd_dev, req_page, reply, pii); + ceph_databuf_release(reply); +out_free: __free_page(req_page); - __free_page(reply_page); +out: return ret; } diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 2d8cd45f1c34..0e008837dac1 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -531,6 +531,9 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, unsigned int which, struct bio_vec *bvecs, u32 num_bvecs, u32 bytes); +void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_databuf *dbuf); extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, unsigned int which, struct page **pages, u64 length, @@ -605,7 +608,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page **resp_pages, size_t *resp_len); + struct ceph_databuf *response); /* watch/notify */ struct ceph_osd_linger_request * diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index 66136a4c1ce7..e2f508704c29 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -74,7 +74,7 @@ int ceph_cls_lock(struct ceph_osd_client *osdc, __func__, lock_name, type, cookie, tag, desc, flags); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock", CEPH_OSD_FLAG_WRITE, lock_op_page, - lock_op_buf_size, NULL, NULL); + lock_op_buf_size, NULL); dout("%s: status %d\n", __func__, ret); __free_page(lock_op_page); @@ -124,7 +124,7 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc, dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock", CEPH_OSD_FLAG_WRITE, unlock_op_page, - unlock_op_buf_size, NULL, NULL); + unlock_op_buf_size, NULL); dout("%s: status %d\n", __func__, ret); __free_page(unlock_op_page); @@ -179,7 +179,7 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc, cookie, ENTITY_NAME(*locker)); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock", CEPH_OSD_FLAG_WRITE, break_op_page, - break_op_buf_size, NULL, NULL); + break_op_buf_size, NULL); dout("%s: status %d\n", __func__, ret); __free_page(break_op_page); @@ -230,7 +230,7 @@ int ceph_cls_set_cookie(struct ceph_osd_client *osdc, __func__, lock_name, type, old_cookie, tag, new_cookie); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie", CEPH_OSD_FLAG_WRITE, cookie_op_page, - cookie_op_buf_size, NULL, NULL); + cookie_op_buf_size, NULL); dout("%s: status %d\n", __func__, ret); __free_page(cookie_op_page); @@ -337,10 +337,10 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, char *lock_name, u8 *type, char **tag, struct ceph_locker **lockers, u32 *num_lockers) { + struct ceph_databuf *reply; int get_info_op_buf_size; int name_len = strlen(lock_name); - struct page *get_info_op_page, *reply_page; - size_t reply_len = PAGE_SIZE; + struct page *get_info_op_page; void *p, *end; int ret; @@ -353,8 +353,8 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, if (!get_info_op_page) return -ENOMEM; - reply_page = alloc_page(GFP_NOIO); - if (!reply_page) { + reply = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO); + if (!reply) { __free_page(get_info_op_page); return -ENOMEM; } @@ -370,18 +370,19 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, dout("%s lock_name %s\n", __func__, lock_name); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info", CEPH_OSD_FLAG_READ, get_info_op_page, - get_info_op_buf_size, &reply_page, &reply_len); + get_info_op_buf_size, reply); dout("%s: status %d\n", __func__, ret); if (ret >= 0) { - p = page_address(reply_page); - end = p + reply_len; + p = kmap_ceph_databuf_page(reply, 0); + end = p + reply->length; ret = decode_lockers(&p, end, type, tag, lockers, num_lockers); + kunmap_local(p); } __free_page(get_info_op_page); - __free_page(reply_page); + ceph_databuf_release(reply); return ret; } EXPORT_SYMBOL(ceph_cls_lock_info); @@ -389,11 +390,11 @@ EXPORT_SYMBOL(ceph_cls_lock_info); int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, char *lock_name, u8 type, char *cookie, char *tag) { + struct ceph_databuf *dbuf; int assert_op_buf_size; int name_len = strlen(lock_name); int cookie_len = strlen(cookie); int tag_len = strlen(tag); - struct page **pages; void *p, *end; int ret; @@ -408,11 +409,11 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, if (ret) return ret; - pages = ceph_alloc_page_vector(1, GFP_NOIO); - if (IS_ERR(pages)) - return PTR_ERR(pages); + dbuf = ceph_databuf_alloc(1, PAGE_SIZE, GFP_NOIO); + if (!dbuf) + return -ENOMEM; - p = page_address(pages[0]); + p = kmap_ceph_databuf_page(dbuf, 0); end = p + assert_op_buf_size; /* encode cls_lock_assert_op struct */ @@ -422,10 +423,11 @@ int ceph_cls_assert_locked(struct ceph_osd_request *req, int which, ceph_encode_8(&p, type); ceph_encode_string(&p, end, cookie, cookie_len); ceph_encode_string(&p, end, tag, tag_len); + kunmap(p); WARN_ON(p != end); + dbuf->length = assert_op_buf_size; - osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size, - 0, false, true); + osd_req_op_cls_request_databuf(req, which, dbuf); return 0; } EXPORT_SYMBOL(ceph_cls_assert_locked); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e3152e21418f..7ce3aef55755 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -178,6 +178,16 @@ static void ceph_osd_iter_init(struct ceph_osd_data *osd_data, osd_data->iter = *iter; } +/* + * Consumes a ref on @dbuf. + */ +static void ceph_osd_databuf_init(struct ceph_osd_data *osd_data, + struct ceph_databuf *dbuf) +{ + ceph_osd_iter_init(osd_data, &dbuf->iter); + osd_data->dbuf = dbuf; +} + static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) { @@ -207,6 +217,17 @@ void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); +void osd_req_op_extent_osd_databuf(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_databuf *dbuf) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, extent, osd_data); + ceph_osd_databuf_init(osd_data, dbuf); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_databuf); + void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, unsigned int which, struct page **pages, u64 length, u32 offset, @@ -297,6 +318,19 @@ static void osd_req_op_cls_request_info_pagelist( ceph_osd_data_pagelist_init(osd_data, pagelist); } +void osd_req_op_cls_request_databuf(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_databuf *dbuf) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, cls, request_data); + ceph_osd_databuf_init(osd_data, dbuf); + osd_req->r_ops[which].cls.indata_len += dbuf->length; + osd_req->r_ops[which].indata_len += dbuf->length; +} +EXPORT_SYMBOL(osd_req_op_cls_request_databuf); + void osd_req_op_cls_request_data_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -342,6 +376,17 @@ void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); +void osd_req_op_cls_response_databuf(struct ceph_osd_request *osd_req, + unsigned int which, + struct ceph_databuf *dbuf) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_data(osd_req, which, cls, response_data); + ceph_osd_databuf_init(osd_data, dbuf); +} +EXPORT_SYMBOL(osd_req_op_cls_response_databuf); + void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, unsigned int which, struct page **pages, u64 length, u32 offset, bool pages_from_pool, bool own_pages) @@ -5162,7 +5207,11 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map); * Execute an OSD class method on an object. * * @flags: CEPH_OSD_FLAG_* - * @resp_len: in/out param for reply length + * @response: Pointer to the storage descriptor for the reply or NULL. + * + * The size of the response buffer is set by the caller in @response->limit and + * the size of the response obtained is set in @response->length and + * @response->iter.count. */ int ceph_osdc_call(struct ceph_osd_client *osdc, struct ceph_object_id *oid, @@ -5170,7 +5219,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, const char *class, const char *method, unsigned int flags, struct page *req_page, size_t req_len, - struct page **resp_pages, size_t *resp_len) + struct ceph_databuf *response) { struct ceph_osd_request *req; int ret; @@ -5193,9 +5242,8 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); - if (resp_pages) - osd_req_op_cls_response_data_pages(req, 0, resp_pages, - *resp_len, 0, false, false); + if (response) + osd_req_op_cls_response_databuf(req, 0, response); ret = ceph_osdc_alloc_messages(req, GFP_NOIO); if (ret) @@ -5205,8 +5253,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { ret = req->r_ops[0].rval; - if (resp_pages) - *resp_len = req->r_ops[0].outdata_len; + if (response) { + response->length = req->r_ops[0].outdata_len; + response->iter.count = response->length; + } } out_put_req: