On Sun, Mar 18, 2018 at 10:23 AM, Alex Elder <elder@xxxxxxxxxx> wrote: > On 03/16/2018 07:37 AM, Alex Elder wrote: >> From: Ilya Dryomov <idryomov@xxxxxxxxx> >> >> The notable changes are: >> >> - instead of explicitly stat'ing the object to see if it exists before >> issuing the write, send the write optimistically along with the stat >> in a single OSD request > > This was a suggestion long ago but I don't think we knew whether it would > be wasteful or helpful. You guys have apparently decided it was worthwhile > in practice to do it this way. Yes, a long time ago. > >> - zero copyup optimization > > This is "a zero length copyup means full zero-filled object" or something. Yes, pretty much. > >> - all object requests are associated with an image request and have >> a valid ->img_request pointer; there are no standalone (!IMG_DATA) >> object requests anymore > > It seems the above statement was true before? As I recall the reason > for standalone object requests was for reading image header objects or > something. It looks like that's done by synchronous requests, so we > don't need to have non-image requests? Anyway, if this was the case > before it probably should have been cleaned up separately. No, it was used for watch requests, method calls, etc. I got rid of all of that gradually over time. The last hold up was explicit stat requests, removed in this commit. > >> - code is structured as a state machine (vs a bunch of callbacks with >> implicit state) >> >> Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx> > > This patch is much too large for a very good review. I know you said > it was hard not to do all these interconnected things together. Still > I'm sure you could have broken it down a bit more and made the review > task easier. > > I didn't complete my review. But I offer a variaety of comments > below. I unfortunately don't know how much time I'll have this week > to continue this, but I'll do them as I can--at least the easy ones. > > -Alex > >> --- >> drivers/block/rbd.c | 659 ++++++++++++++++++++++++++++++++++++++++++++++------ >> 1 file changed, 582 insertions(+), 77 deletions(-) >> >> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c >> index 5ca190b15e1d..29d8d8ce50eb 100644 >> --- a/drivers/block/rbd.c >> +++ b/drivers/block/rbd.c >> @@ -235,11 +235,21 @@ enum obj_req_flags { >> OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ >> }; > > It would be good to explain what the state machine is doing, and when > it is used, here. Added a picture. > >> +enum rbd_obj_write_state { >> + RBD_OBJ_WRITE_FLAT = 1, >> + RBD_OBJ_WRITE_GUARD, > > So looking at the code later, it looks like you have a stat op, and that is > followed by the setallochint and write request. And if I remember right, > this will automatically handle the opportunistic write: if the object > already exists it'll be written (because the stat op won't fail), but if > it does not, it won't be written (because the stat op failure will prevent > the subsequent ops from executing). Correct. > >> + RBD_OBJ_WRITE_COPYUP, >> +}; >> + >> struct rbd_obj_request { >> u64 object_no; >> u64 offset; /* object start byte */ >> u64 length; /* bytes from offset */ >> unsigned long flags; >> + union { >> + bool tried_parent; /* for reads */ >> + enum rbd_obj_write_state write_state; /* for writes */ >> + }; >> >> /* >> * An object request associated with an image will have its >> @@ -1282,6 +1292,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) >> })); >> } >> >> +/* >> + * Zero a range in @obj_req data buffer defined by a bio (list) or >> + * bio_vec array. >> + * >> + * @off is relative to the start of the data buffer. >> + */ >> +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, >> + u32 bytes) >> +{ >> + switch (obj_req->type) { >> + case OBJ_REQUEST_BIO: >> + zero_bios(&obj_req->bio_pos, off, bytes); >> + break; >> + case OBJ_REQUEST_BVECS: >> + zero_bvecs(&obj_req->bvec_pos, off, bytes); >> + break; >> + default: >> + rbd_assert(0); >> + } >> +} >> + >> /* >> * The default/initial value for all object request flags is 0. For >> * each flag, once its value is set to 1 it is never reset to 0 >> @@ -1567,6 +1598,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request) >> return OBJ_OP_READ; >> } >> >> +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + >> + return !obj_req->offset && >> + obj_req->length == rbd_dev->layout.object_size; >> +} >> + >> +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + >> + return obj_req->offset + obj_req->length == >> + rbd_dev->layout.object_size; >> +} >> + >> +static bool rbd_img_is_write(struct rbd_img_request *img_req) >> +{ >> + switch (rbd_img_request_op_type(img_req)) { >> + case OBJ_OP_READ: >> + return false; >> + case OBJ_OP_WRITE: >> + case OBJ_OP_DISCARD: >> + return true; >> + default: >> + rbd_assert(0); > > There are only three, so maybe: > op_type = rbd_img_request_op_type(img_req); > if (op_type == OBJ_OP_WRITE || type == OBJ_TYPE_DISCARD) > return true; > rbd_assert(op_type == OBJ_OP_READ); > return false; > > (But I think you like switch statements better...) I do ;) > >> + } >> +} >> + >> static void >> rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) >> { >> @@ -1697,63 +1757,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) >> obj_request_done_set(obj_request); >> } >> >> +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); >> + >> static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) >> { >> - struct rbd_obj_request *obj_request = osd_req->r_priv; >> - u16 opcode; >> + struct rbd_obj_request *obj_req = osd_req->r_priv; >> >> - dout("%s: osd_req %p\n", __func__, osd_req); >> - rbd_assert(osd_req == obj_request->osd_req); >> - if (obj_request_img_data_test(obj_request)) { >> - rbd_assert(obj_request->img_request); >> - rbd_assert(obj_request->which != BAD_WHICH); >> - } else { >> - rbd_assert(obj_request->which == BAD_WHICH); >> - } >> + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, >> + osd_req->r_result, obj_req); >> + rbd_assert(osd_req == obj_req->osd_req); >> >> - if (osd_req->r_result < 0) >> - obj_request->result = osd_req->r_result; >> + obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; > > Was the obj_req->result previously just implicitly kept at 0 if there was no error? I think so. > >> + if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) >> + obj_req->xferred = osd_req->r_result; >> + else >> + /* >> + * Writes aren't allowed to return a data payload. In some >> + * guarded write cases (e.g. stat + zero on an empty object) >> + * a stat response makes it through, but we don't care. >> + */ >> + obj_req->xferred = 0; >> >> - /* >> - * We support a 64-bit length, but ultimately it has to be >> - * passed to the block layer, which just supports a 32-bit >> - * length field. >> - */ >> - obj_request->xferred = osd_req->r_ops[0].outdata_len; > > Apparently the value of osd_req->r_result is the same as osd_req->r_ops[0].outdata_len > (if it's non-negative)? No, it's actually the sum of outdata_len over all OSD ops. Using ->r_result is more correct, but we look at ->xferred only when handling OBJ_OP_READ which always translates to a single OSD op, so it doesn't really matter in this case. > >> - rbd_assert(obj_request->xferred < (u64)UINT_MAX); >> - >> - opcode = osd_req->r_ops[0].op; > > Since we're only looking at the first op in the array here, your use > of the image request type in __rbd_obj_handle_request() makes more > sense. > >> - switch (opcode) { >> - case CEPH_OSD_OP_READ: >> - rbd_osd_read_callback(obj_request); >> - break; >> - case CEPH_OSD_OP_SETALLOCHINT: >> - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || >> - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); >> - /* fall through */ >> - case CEPH_OSD_OP_WRITE: >> - case CEPH_OSD_OP_WRITEFULL: >> - rbd_osd_write_callback(obj_request); >> - break; >> - case CEPH_OSD_OP_STAT: >> - rbd_osd_stat_callback(obj_request); >> - break; >> - case CEPH_OSD_OP_DELETE: >> - case CEPH_OSD_OP_TRUNCATE: >> - case CEPH_OSD_OP_ZERO: >> - rbd_osd_discard_callback(obj_request); >> - break; >> - case CEPH_OSD_OP_CALL: >> - rbd_osd_call_callback(obj_request); >> - break; >> - default: >> - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", >> - obj_request->object_no, opcode); >> - break; >> - } >> - >> - if (obj_request_done_test(obj_request)) >> - rbd_obj_request_complete(obj_request); >> + rbd_obj_handle_request(obj_req); >> } >> >> static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) >> @@ -1806,12 +1831,6 @@ __rbd_osd_req_create(struct rbd_device *rbd_dev, >> return NULL; >> } >> >> -/* >> - * Create an osd request. A read request has one osd op (read). >> - * A write request has either one (watch) or two (hint+write) osd ops. >> - * (All rbd data writes are prefixed with an allocation hint op, but >> - * technically osd watch is a write request, hence this distinction.) >> - */ >> static struct ceph_osd_request *rbd_osd_req_create( >> struct rbd_device *rbd_dev, >> enum obj_operation_type op_type, >> @@ -1831,8 +1850,6 @@ static struct ceph_osd_request *rbd_osd_req_create( >> snapc = img_request->snapc; >> } >> >> - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); >> - >> return __rbd_osd_req_create(rbd_dev, snapc, num_ops, >> (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? >> CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); >> @@ -2251,6 +2268,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, >> rbd_osd_req_format_read(obj_request); >> } >> >> +static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) >> +{ >> + switch (obj_req->type) { >> + case OBJ_REQUEST_BIO: >> + osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, >> + &obj_req->bio_pos, >> + obj_req->length); >> + break; >> + case OBJ_REQUEST_BVECS: >> + rbd_assert(obj_req->bvec_pos.iter.bi_size == >> + obj_req->length); >> + osd_req_op_extent_osd_data_bvecs(obj_req->osd_req, which, >> + &obj_req->bvec_pos); >> + break; >> + default: >> + rbd_assert(0); >> + } >> +} >> + >> +static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + >> + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req); >> + if (!obj_req->osd_req) >> + return -ENOMEM; >> + >> + osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, >> + obj_req->offset, obj_req->length, 0, 0); >> + rbd_osd_req_setup_data(obj_req, 0); >> + >> + rbd_osd_req_format_read(obj_req); >> + return 0; >> +} >> + >> +static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, >> + unsigned int which) >> +{ >> + struct page **pages; >> + >> + /* >> + * The response data for a STAT call consists of: >> + * le64 length; >> + * struct { >> + * le32 tv_sec; >> + * le32 tv_nsec; >> + * } mtime; >> + */ >> + pages = ceph_alloc_page_vector(1, GFP_NOIO); >> + if (IS_ERR(pages)) >> + return PTR_ERR(pages); >> + >> + osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); >> + osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, >> + 8 + sizeof(struct ceph_timespec), >> + 0, false, true); >> + return 0; >> +} >> + >> +static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, >> + unsigned int which) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + u16 opcode; >> + >> + osd_req_op_alloc_hint_init(obj_req->osd_req, which++, >> + rbd_dev->layout.object_size, >> + rbd_dev->layout.object_size); >> + >> + if (rbd_obj_is_entire(obj_req)) >> + opcode = CEPH_OSD_OP_WRITEFULL; >> + else >> + opcode = CEPH_OSD_OP_WRITE; >> + >> + osd_req_op_extent_init(obj_req->osd_req, which, opcode, >> + obj_req->offset, obj_req->length, 0, 0); >> + rbd_osd_req_setup_data(obj_req, which++); >> + >> + rbd_assert(which == obj_req->osd_req->r_num_ops); >> + rbd_osd_req_format_write(obj_req); >> +} >> + >> +static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + unsigned int num_osd_ops, which = 0; >> + int ret; >> + >> + if (obj_request_overlaps_parent(obj_req)) { >> + obj_req->write_state = RBD_OBJ_WRITE_GUARD; >> + num_osd_ops = 3; /* stat + setallochint + write/writefull */ >> + } else { >> + obj_req->write_state = RBD_OBJ_WRITE_FLAT; >> + num_osd_ops = 2; /* setallochint + write/writefull */ >> + } >> + >> + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, >> + num_osd_ops, obj_req); >> + if (!obj_req->osd_req) >> + return -ENOMEM; >> + >> + if (obj_request_overlaps_parent(obj_req)) { >> + ret = __rbd_obj_setup_stat(obj_req, which++); >> + if (ret) >> + return ret; >> + } >> + >> + __rbd_obj_setup_write(obj_req, which); >> + return 0; >> +} >> + >> +static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, >> + unsigned int which) >> +{ >> + u16 opcode; >> + >> + if (rbd_obj_is_entire(obj_req)) { >> + if (obj_request_overlaps_parent(obj_req)) { >> + opcode = CEPH_OSD_OP_TRUNCATE; >> + } else { >> + osd_req_op_init(obj_req->osd_req, which++, >> + CEPH_OSD_OP_DELETE, 0); >> + opcode = 0; >> + } >> + } else if (rbd_obj_is_tail(obj_req)) { >> + opcode = CEPH_OSD_OP_TRUNCATE; >> + } else { >> + opcode = CEPH_OSD_OP_ZERO; >> + } >> + >> + if (opcode) >> + osd_req_op_extent_init(obj_req->osd_req, which++, opcode, >> + obj_req->offset, obj_req->length, >> + 0, 0); >> + >> + rbd_assert(which == obj_req->osd_req->r_num_ops); >> + rbd_osd_req_format_write(obj_req); >> +} >> + >> +static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + unsigned int num_osd_ops, which = 0; >> + int ret; >> + >> + if (rbd_obj_is_entire(obj_req)) { >> + obj_req->write_state = RBD_OBJ_WRITE_FLAT; >> + num_osd_ops = 1; /* truncate/delete */ >> + } else { >> + if (obj_request_overlaps_parent(obj_req)) { >> + obj_req->write_state = RBD_OBJ_WRITE_GUARD; >> + num_osd_ops = 2; /* stat + truncate/zero */ >> + } else { >> + obj_req->write_state = RBD_OBJ_WRITE_FLAT; >> + num_osd_ops = 1; /* truncate/zero */ >> + } >> + } >> + >> + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD, >> + num_osd_ops, obj_req); >> + if (!obj_req->osd_req) >> + return -ENOMEM; >> + >> + if (!rbd_obj_is_entire(obj_req) && >> + obj_request_overlaps_parent(obj_req)) { >> + ret = __rbd_obj_setup_stat(obj_req, which++); >> + if (ret) >> + return ret; >> + } >> + >> + __rbd_obj_setup_discard(obj_req, which); >> + return 0; >> +} >> + >> +/* >> + * For each object request in @img_req, allocate an OSD request, add >> + * individual OSD ops and prepare them for submission. The number of >> + * OSD ops depends on op_type and the overlap point (if any). >> + */ >> +static int __rbd_img_fill_request(struct rbd_img_request *img_req) >> +{ >> + struct rbd_obj_request *obj_req; >> + int ret; >> + >> + for_each_obj_request(img_req, obj_req) { >> + switch (rbd_img_request_op_type(img_req)) { >> + case OBJ_OP_READ: >> + ret = rbd_obj_setup_read(obj_req); >> + break; >> + case OBJ_OP_WRITE: >> + ret = rbd_obj_setup_write(obj_req); >> + break; >> + case OBJ_OP_DISCARD: >> + ret = rbd_obj_setup_discard(obj_req); >> + break; >> + default: >> + rbd_assert(0); >> + } >> + if (ret) >> + return ret; >> + } >> + >> + return 0; >> +} >> + >> /* >> * Split up an image request into one or more object requests, each >> * to a different object. The "type" parameter indicates whether >> @@ -2268,7 +2490,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, >> struct rbd_obj_request *next_obj_request; >> struct ceph_bio_iter bio_it; >> struct ceph_bvec_iter bvec_it; >> - enum obj_operation_type op_type; >> u64 img_offset; >> u64 resid; >> >> @@ -2278,7 +2499,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, >> img_offset = img_request->offset; >> resid = img_request->length; >> rbd_assert(resid > 0); >> - op_type = rbd_img_request_op_type(img_request); >> >> if (type == OBJ_REQUEST_BIO) { >> bio_it = *(struct ceph_bio_iter *)data_desc; >> @@ -2289,7 +2509,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, >> } >> >> while (resid) { >> - struct ceph_osd_request *osd_req; >> u64 object_no = img_offset >> rbd_dev->header.obj_order; >> u64 offset = rbd_segment_offset(rbd_dev, img_offset); >> u64 length = rbd_segment_length(rbd_dev, img_offset, resid); >> @@ -2317,23 +2536,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, >> ceph_bvec_iter_advance(&bvec_it, length); >> } >> >> - osd_req = rbd_osd_req_create(rbd_dev, op_type, >> - (op_type == OBJ_OP_WRITE) ? 2 : 1, >> - obj_request); >> - if (!osd_req) >> - goto out_unwind; >> - >> - obj_request->osd_req = osd_req; >> obj_request->callback = rbd_img_obj_callback; >> obj_request->img_offset = img_offset; >> >> - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); >> - >> img_offset += length; >> resid -= length; >> } >> >> - return 0; >> + return __rbd_img_fill_request(img_request); >> >> out_unwind: >> for_each_obj_request_safe(img_request, obj_request, next_obj_request) >> @@ -2712,16 +2922,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request) >> >> rbd_img_request_get(img_request); >> for_each_obj_request_safe(img_request, obj_request, next_obj_request) { >> - ret = rbd_img_obj_request_submit(obj_request); >> - if (ret) >> - goto out_put_ireq; >> + rbd_obj_request_submit(obj_request); >> } >> >> -out_put_ireq: >> rbd_img_request_put(img_request); >> return ret; >> } >> >> +static void rbd_img_end_child_request(struct rbd_img_request *img_req); >> + >> +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req, >> + u64 img_offset, u32 bytes) >> +{ >> + struct rbd_img_request *img_req = obj_req->img_request; >> + struct rbd_img_request *child_img_req; >> + int ret; >> + >> + child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes); >> + if (!child_img_req) >> + return -ENOMEM; >> + >> + child_img_req->callback = rbd_img_end_child_request; >> + >> + if (!rbd_img_is_write(img_req)) { >> + switch (obj_req->type) { >> + case OBJ_REQUEST_BIO: >> + ret = rbd_img_request_fill(child_img_req, >> + OBJ_REQUEST_BIO, >> + &obj_req->bio_pos); >> + break; >> + case OBJ_REQUEST_BVECS: >> + ret = rbd_img_request_fill(child_img_req, >> + OBJ_REQUEST_BVECS, >> + &obj_req->bvec_pos); >> + break; >> + default: >> + rbd_assert(0); >> + } >> + } else { >> + struct ceph_bvec_iter it = { >> + .bvecs = obj_req->copyup_bvecs, >> + .iter = { .bi_size = bytes }, >> + }; >> + >> + ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS, >> + &it); >> + } >> + if (ret) { >> + rbd_img_request_put(child_img_req); >> + return ret; >> + } >> + >> + rbd_img_request_submit(child_img_req); >> + return 0; >> +} >> + >> +static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + int ret; >> + >> + if (obj_req->result == -ENOENT && > > Previously there was a test for "image data" here; is it no longer > checked because now we know all object requests have image data now? Correct. > Some more cleanup is probably in order to convert "img_obj" in names > just be "obj", since there is no longer the distinction. I think I got most of them in later patches? > > What takes the place of indicating that a request is layered (i.e., > one with a parent image associated with it)? Is a zero parent_overlap > sufficient to mean there is no parent? Yes. > >> + obj_req->img_offset < rbd_dev->parent_overlap && >> + !obj_req->tried_parent) { >> + u64 obj_overlap = min(obj_req->length, >> + rbd_dev->parent_overlap - obj_req->img_offset); >> + >> + obj_req->tried_parent = true; > > Is there any chance a read could come in at this point (after tried_parent > is set), but before the read from parent has completed? If it could, that > path would erroeously treat it as a hole. (Maybe reads need a state machine > too.) Maybe this isn't possible; I'm pretty rusty on a lot of the more > subtle things... Can you elaborate? If a new read comes in, it'll get a new object request with ->tried_parent initialized to false. > >> + ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset, >> + obj_overlap); >> + if (ret) { >> + obj_req->result = ret; >> + return true; >> + } >> + return false; >> + } >> + >> + /* >> + * -ENOENT means a hole in the image -- zero-fill the entire >> + * length of the request. A short read also implies zero-fill >> + * to the end of the request. In both cases we update xferred >> + * count to indicate the whole request was satisfied. >> + */ >> + if (obj_req->result == -ENOENT || >> + (!obj_req->result && obj_req->xferred < obj_req->length)) { >> + rbd_assert(!obj_req->xferred || !obj_req->result); >> + rbd_obj_zero_range(obj_req, obj_req->xferred, >> + obj_req->length - obj_req->xferred); >> + obj_req->result = 0; >> + obj_req->xferred = obj_req->length; >> + } >> + >> + return true; >> +} >> + >> +/* >> + * copyup_bvecs pages are never highmem pages >> + */ >> +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) >> +{ >> + struct ceph_bvec_iter it = { >> + .bvecs = bvecs, >> + .iter = { .bi_size = bytes }, >> + }; >> + >> + ceph_bvec_iter_advance_step(&it, bytes, ({ >> + if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, >> + bv.bv_len)) >> + return false; >> + })); >> + return true; >> +} >> + >> +static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; >> + >> + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); >> + rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); >> + rbd_osd_req_destroy(obj_req->osd_req); >> + >> + /* >> + * Create a copyup request with the same number of OSD ops as >> + * the original request. The original request was stat + op(s), >> + * the new copyup request will be copyup + the same op(s). >> + */ >> + obj_req->osd_req = rbd_osd_req_create(rbd_dev, >> + rbd_img_request_op_type(obj_req->img_request), >> + num_osd_ops, obj_req); >> + if (!obj_req->osd_req) >> + return -ENOMEM; >> + >> + /* >> + * Only send non-zero copyup data to save some I/O and network >> + * bandwidth -- zero copyup data is equivalent to the object not >> + * existing. >> + */ >> + if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { >> + dout("%s obj_req %p detected zeroes\n", __func__, obj_req); >> + bytes = 0; >> + } >> + >> + osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", >> + "copyup"); >> + osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, >> + obj_req->copyup_bvecs, bytes); >> + >> + switch (rbd_img_request_op_type(obj_req->img_request)) { >> + case OBJ_OP_WRITE: >> + __rbd_obj_setup_write(obj_req, 1); >> + break; >> + case OBJ_OP_DISCARD: >> + rbd_assert(!rbd_obj_is_entire(obj_req)); >> + __rbd_obj_setup_discard(obj_req, 1); >> + break; >> + default: >> + rbd_assert(0); >> + } >> + >> + rbd_obj_request_submit(obj_req); >> + /* FIXME: in lieu of rbd_img_obj_callback() */ >> + rbd_img_request_put(obj_req->img_request); >> + return 0; >> +} >> + >> static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) >> { >> u32 i; >> @@ -2850,6 +3215,146 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request) >> obj_request_done_set(obj_request); >> } >> >> +static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) >> +{ >> + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; >> + u64 img_offset; >> + u64 obj_overlap; >> + int ret; >> + >> + if (!obj_request_overlaps_parent(obj_req)) { >> + /* >> + * The overlap has become 0 (most likely because the >> + * image has been flattened). Use rbd_obj_issue_copyup() >> + * to re-submit the original write request -- the copyup >> + * operation itself will be a no-op, since someone must >> + * have populated the child object while we weren't >> + * looking. Move to WRITE_FLAT state as we'll be done >> + * with the operation once the null copyup completes. >> + */ >> + obj_req->write_state = RBD_OBJ_WRITE_FLAT; >> + return rbd_obj_issue_copyup(obj_req, 0); >> + } >> + >> + /* >> + * Determine the byte range covered by the object in the >> + * child image to which the original request was to be sent. >> + */ >> + img_offset = obj_req->img_offset - obj_req->offset; >> + obj_overlap = rbd_dev->layout.object_size; >> + >> + /* >> + * There is no defined parent data beyond the parent >> + * overlap, so limit what we read at that boundary if >> + * necessary. >> + */ >> + if (img_offset + obj_overlap > rbd_dev->parent_overlap) { >> + rbd_assert(img_offset < rbd_dev->parent_overlap); >> + obj_overlap = rbd_dev->parent_overlap - img_offset; >> + } >> + >> + ret = setup_copyup_bvecs(obj_req, obj_overlap); >> + if (ret) >> + return ret; >> + >> + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; >> + return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap); >> +} >> + >> +static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) >> +{ >> + int ret; >> + >> +again: >> + switch (obj_req->write_state) { >> + case RBD_OBJ_WRITE_GUARD: >> + rbd_assert(!obj_req->xferred); >> + if (obj_req->result == -ENOENT) { >> + /* >> + * The target object doesn't exist. Read the data for >> + * the entire target object up to the overlap point (if >> + * any) from the parent, so we can use it for a copyup. >> + */ >> + ret = rbd_obj_handle_write_guard(obj_req); >> + if (ret) { >> + obj_req->result = ret; >> + return true; >> + } >> + return false; >> + } >> + /* fall through */ >> + case RBD_OBJ_WRITE_FLAT: >> + if (!obj_req->result) >> + /* >> + * There is no such thing as a successful short >> + * write -- indicate the whole request was satisfied. >> + */ >> + obj_req->xferred = obj_req->length; >> + return true; >> + case RBD_OBJ_WRITE_COPYUP: >> + obj_req->write_state = RBD_OBJ_WRITE_GUARD; >> + if (obj_req->result) >> + goto again; >> + >> + rbd_assert(obj_req->xferred); >> + ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); >> + if (ret) { >> + obj_req->result = ret; >> + return true; >> + } >> + return false; >> + default: >> + rbd_assert(0); >> + } >> +} >> + > > /* Returns true if the request is complete, false otherwise */ Added. Thanks, Ilya -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html