On 03/16/2018 07:37 AM, Alex Elder wrote: > From: Ilya Dryomov <idryomov@xxxxxxxxx> > > The notable changes are: > > - instead of explicitly stat'ing the object to see if it exists before > issuing the write, send the write optimistically along with the stat > in a single OSD request This was a suggestion long ago but I don't think we knew whether it would be wasteful or helpful. You guys have apparently decided it was worthwhile in practice to do it this way. > - zero copyup optimization This is "a zero length copyup means full zero-filled object" or something. > - all object requests are associated with an image request and have > a valid ->img_request pointer; there are no standalone (!IMG_DATA) > object requests anymore It seems the above statement was true before? As I recall the reason for standalone object requests was for reading image header objects or something. It looks like that's done by synchronous requests, so we don't need to have non-image requests? Anyway, if this was the case before it probably should have been cleaned up separately. > - code is structured as a state machine (vs a bunch of callbacks with > implicit state) > > Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx> This patch is much too large for a very good review. I know you said it was hard not to do all these interconnected things together. Still I'm sure you could have broken it down a bit more and made the review task easier. I didn't complete my review. But I offer a variaety of comments below. I unfortunately don't know how much time I'll have this week to continue this, but I'll do them as I can--at least the easy ones. -Alex > --- > drivers/block/rbd.c | 659 ++++++++++++++++++++++++++++++++++++++++++++++------ > 1 file changed, 582 insertions(+), 77 deletions(-) > > diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c > index 5ca190b15e1d..29d8d8ce50eb 100644 > --- a/drivers/block/rbd.c > +++ b/drivers/block/rbd.c > @@ -235,11 +235,21 @@ enum obj_req_flags { > OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ > }; It would be good to explain what the state machine is doing, and when it is used, here. > +enum rbd_obj_write_state { > + RBD_OBJ_WRITE_FLAT = 1, > + RBD_OBJ_WRITE_GUARD, So looking at the code later, it looks like you have a stat op, and that is followed by the setallochint and write request. And if I remember right, this will automatically handle the opportunistic write: if the object already exists it'll be written (because the stat op won't fail), but if it does not, it won't be written (because the stat op failure will prevent the subsequent ops from executing). > + RBD_OBJ_WRITE_COPYUP, > +}; > + > struct rbd_obj_request { > u64 object_no; > u64 offset; /* object start byte */ > u64 length; /* bytes from offset */ > unsigned long flags; > + union { > + bool tried_parent; /* for reads */ > + enum rbd_obj_write_state write_state; /* for writes */ > + }; > > /* > * An object request associated with an image will have its > @@ -1282,6 +1292,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes) > })); > } > > +/* > + * Zero a range in @obj_req data buffer defined by a bio (list) or > + * bio_vec array. > + * > + * @off is relative to the start of the data buffer. > + */ > +static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off, > + u32 bytes) > +{ > + switch (obj_req->type) { > + case OBJ_REQUEST_BIO: > + zero_bios(&obj_req->bio_pos, off, bytes); > + break; > + case OBJ_REQUEST_BVECS: > + zero_bvecs(&obj_req->bvec_pos, off, bytes); > + break; > + default: > + rbd_assert(0); > + } > +} > + > /* > * The default/initial value for all object request flags is 0. For > * each flag, once its value is set to 1 it is never reset to 0 > @@ -1567,6 +1598,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request) > return OBJ_OP_READ; > } > > +static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + > + return !obj_req->offset && > + obj_req->length == rbd_dev->layout.object_size; > +} > + > +static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + > + return obj_req->offset + obj_req->length == > + rbd_dev->layout.object_size; > +} > + > +static bool rbd_img_is_write(struct rbd_img_request *img_req) > +{ > + switch (rbd_img_request_op_type(img_req)) { > + case OBJ_OP_READ: > + return false; > + case OBJ_OP_WRITE: > + case OBJ_OP_DISCARD: > + return true; > + default: > + rbd_assert(0); There are only three, so maybe: op_type = rbd_img_request_op_type(img_req); if (op_type == OBJ_OP_WRITE || type == OBJ_TYPE_DISCARD) return true; rbd_assert(op_type == OBJ_OP_READ); return false; (But I think you like switch statements better...) > + } > +} > + > static void > rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) > { > @@ -1697,63 +1757,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) > obj_request_done_set(obj_request); > } > > +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req); > + > static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) > { > - struct rbd_obj_request *obj_request = osd_req->r_priv; > - u16 opcode; > + struct rbd_obj_request *obj_req = osd_req->r_priv; > > - dout("%s: osd_req %p\n", __func__, osd_req); > - rbd_assert(osd_req == obj_request->osd_req); > - if (obj_request_img_data_test(obj_request)) { > - rbd_assert(obj_request->img_request); > - rbd_assert(obj_request->which != BAD_WHICH); > - } else { > - rbd_assert(obj_request->which == BAD_WHICH); > - } > + dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req, > + osd_req->r_result, obj_req); > + rbd_assert(osd_req == obj_req->osd_req); > > - if (osd_req->r_result < 0) > - obj_request->result = osd_req->r_result; > + obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0; Was the obj_req->result previously just implicitly kept at 0 if there was no error? > + if (!obj_req->result && !rbd_img_is_write(obj_req->img_request)) > + obj_req->xferred = osd_req->r_result; > + else > + /* > + * Writes aren't allowed to return a data payload. In some > + * guarded write cases (e.g. stat + zero on an empty object) > + * a stat response makes it through, but we don't care. > + */ > + obj_req->xferred = 0; > > - /* > - * We support a 64-bit length, but ultimately it has to be > - * passed to the block layer, which just supports a 32-bit > - * length field. > - */ > - obj_request->xferred = osd_req->r_ops[0].outdata_len; Apparently the value of osd_req->r_result is the same as osd_req->r_ops[0].outdata_len (if it's non-negative)? > - rbd_assert(obj_request->xferred < (u64)UINT_MAX); > - > - opcode = osd_req->r_ops[0].op; Since we're only looking at the first op in the array here, your use of the image request type in __rbd_obj_handle_request() makes more sense. > - switch (opcode) { > - case CEPH_OSD_OP_READ: > - rbd_osd_read_callback(obj_request); > - break; > - case CEPH_OSD_OP_SETALLOCHINT: > - rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE || > - osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL); > - /* fall through */ > - case CEPH_OSD_OP_WRITE: > - case CEPH_OSD_OP_WRITEFULL: > - rbd_osd_write_callback(obj_request); > - break; > - case CEPH_OSD_OP_STAT: > - rbd_osd_stat_callback(obj_request); > - break; > - case CEPH_OSD_OP_DELETE: > - case CEPH_OSD_OP_TRUNCATE: > - case CEPH_OSD_OP_ZERO: > - rbd_osd_discard_callback(obj_request); > - break; > - case CEPH_OSD_OP_CALL: > - rbd_osd_call_callback(obj_request); > - break; > - default: > - rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", > - obj_request->object_no, opcode); > - break; > - } > - > - if (obj_request_done_test(obj_request)) > - rbd_obj_request_complete(obj_request); > + rbd_obj_handle_request(obj_req); > } > > static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) > @@ -1806,12 +1831,6 @@ __rbd_osd_req_create(struct rbd_device *rbd_dev, > return NULL; > } > > -/* > - * Create an osd request. A read request has one osd op (read). > - * A write request has either one (watch) or two (hint+write) osd ops. > - * (All rbd data writes are prefixed with an allocation hint op, but > - * technically osd watch is a write request, hence this distinction.) > - */ > static struct ceph_osd_request *rbd_osd_req_create( > struct rbd_device *rbd_dev, > enum obj_operation_type op_type, > @@ -1831,8 +1850,6 @@ static struct ceph_osd_request *rbd_osd_req_create( > snapc = img_request->snapc; > } > > - rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); > - > return __rbd_osd_req_create(rbd_dev, snapc, num_ops, > (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? > CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); > @@ -2251,6 +2268,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request, > rbd_osd_req_format_read(obj_request); > } > > +static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which) > +{ > + switch (obj_req->type) { > + case OBJ_REQUEST_BIO: > + osd_req_op_extent_osd_data_bio(obj_req->osd_req, which, > + &obj_req->bio_pos, > + obj_req->length); > + break; > + case OBJ_REQUEST_BVECS: > + rbd_assert(obj_req->bvec_pos.iter.bi_size == > + obj_req->length); > + osd_req_op_extent_osd_data_bvecs(obj_req->osd_req, which, > + &obj_req->bvec_pos); > + break; > + default: > + rbd_assert(0); > + } > +} > + > +static int rbd_obj_setup_read(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + > + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req); > + if (!obj_req->osd_req) > + return -ENOMEM; > + > + osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ, > + obj_req->offset, obj_req->length, 0, 0); > + rbd_osd_req_setup_data(obj_req, 0); > + > + rbd_osd_req_format_read(obj_req); > + return 0; > +} > + > +static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req, > + unsigned int which) > +{ > + struct page **pages; > + > + /* > + * The response data for a STAT call consists of: > + * le64 length; > + * struct { > + * le32 tv_sec; > + * le32 tv_nsec; > + * } mtime; > + */ > + pages = ceph_alloc_page_vector(1, GFP_NOIO); > + if (IS_ERR(pages)) > + return PTR_ERR(pages); > + > + osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0); > + osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages, > + 8 + sizeof(struct ceph_timespec), > + 0, false, true); > + return 0; > +} > + > +static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req, > + unsigned int which) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + u16 opcode; > + > + osd_req_op_alloc_hint_init(obj_req->osd_req, which++, > + rbd_dev->layout.object_size, > + rbd_dev->layout.object_size); > + > + if (rbd_obj_is_entire(obj_req)) > + opcode = CEPH_OSD_OP_WRITEFULL; > + else > + opcode = CEPH_OSD_OP_WRITE; > + > + osd_req_op_extent_init(obj_req->osd_req, which, opcode, > + obj_req->offset, obj_req->length, 0, 0); > + rbd_osd_req_setup_data(obj_req, which++); > + > + rbd_assert(which == obj_req->osd_req->r_num_ops); > + rbd_osd_req_format_write(obj_req); > +} > + > +static int rbd_obj_setup_write(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + unsigned int num_osd_ops, which = 0; > + int ret; > + > + if (obj_request_overlaps_parent(obj_req)) { > + obj_req->write_state = RBD_OBJ_WRITE_GUARD; > + num_osd_ops = 3; /* stat + setallochint + write/writefull */ > + } else { > + obj_req->write_state = RBD_OBJ_WRITE_FLAT; > + num_osd_ops = 2; /* setallochint + write/writefull */ > + } > + > + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, > + num_osd_ops, obj_req); > + if (!obj_req->osd_req) > + return -ENOMEM; > + > + if (obj_request_overlaps_parent(obj_req)) { > + ret = __rbd_obj_setup_stat(obj_req, which++); > + if (ret) > + return ret; > + } > + > + __rbd_obj_setup_write(obj_req, which); > + return 0; > +} > + > +static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req, > + unsigned int which) > +{ > + u16 opcode; > + > + if (rbd_obj_is_entire(obj_req)) { > + if (obj_request_overlaps_parent(obj_req)) { > + opcode = CEPH_OSD_OP_TRUNCATE; > + } else { > + osd_req_op_init(obj_req->osd_req, which++, > + CEPH_OSD_OP_DELETE, 0); > + opcode = 0; > + } > + } else if (rbd_obj_is_tail(obj_req)) { > + opcode = CEPH_OSD_OP_TRUNCATE; > + } else { > + opcode = CEPH_OSD_OP_ZERO; > + } > + > + if (opcode) > + osd_req_op_extent_init(obj_req->osd_req, which++, opcode, > + obj_req->offset, obj_req->length, > + 0, 0); > + > + rbd_assert(which == obj_req->osd_req->r_num_ops); > + rbd_osd_req_format_write(obj_req); > +} > + > +static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + unsigned int num_osd_ops, which = 0; > + int ret; > + > + if (rbd_obj_is_entire(obj_req)) { > + obj_req->write_state = RBD_OBJ_WRITE_FLAT; > + num_osd_ops = 1; /* truncate/delete */ > + } else { > + if (obj_request_overlaps_parent(obj_req)) { > + obj_req->write_state = RBD_OBJ_WRITE_GUARD; > + num_osd_ops = 2; /* stat + truncate/zero */ > + } else { > + obj_req->write_state = RBD_OBJ_WRITE_FLAT; > + num_osd_ops = 1; /* truncate/zero */ > + } > + } > + > + obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD, > + num_osd_ops, obj_req); > + if (!obj_req->osd_req) > + return -ENOMEM; > + > + if (!rbd_obj_is_entire(obj_req) && > + obj_request_overlaps_parent(obj_req)) { > + ret = __rbd_obj_setup_stat(obj_req, which++); > + if (ret) > + return ret; > + } > + > + __rbd_obj_setup_discard(obj_req, which); > + return 0; > +} > + > +/* > + * For each object request in @img_req, allocate an OSD request, add > + * individual OSD ops and prepare them for submission. The number of > + * OSD ops depends on op_type and the overlap point (if any). > + */ > +static int __rbd_img_fill_request(struct rbd_img_request *img_req) > +{ > + struct rbd_obj_request *obj_req; > + int ret; > + > + for_each_obj_request(img_req, obj_req) { > + switch (rbd_img_request_op_type(img_req)) { > + case OBJ_OP_READ: > + ret = rbd_obj_setup_read(obj_req); > + break; > + case OBJ_OP_WRITE: > + ret = rbd_obj_setup_write(obj_req); > + break; > + case OBJ_OP_DISCARD: > + ret = rbd_obj_setup_discard(obj_req); > + break; > + default: > + rbd_assert(0); > + } > + if (ret) > + return ret; > + } > + > + return 0; > +} > + > /* > * Split up an image request into one or more object requests, each > * to a different object. The "type" parameter indicates whether > @@ -2268,7 +2490,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, > struct rbd_obj_request *next_obj_request; > struct ceph_bio_iter bio_it; > struct ceph_bvec_iter bvec_it; > - enum obj_operation_type op_type; > u64 img_offset; > u64 resid; > > @@ -2278,7 +2499,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, > img_offset = img_request->offset; > resid = img_request->length; > rbd_assert(resid > 0); > - op_type = rbd_img_request_op_type(img_request); > > if (type == OBJ_REQUEST_BIO) { > bio_it = *(struct ceph_bio_iter *)data_desc; > @@ -2289,7 +2509,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, > } > > while (resid) { > - struct ceph_osd_request *osd_req; > u64 object_no = img_offset >> rbd_dev->header.obj_order; > u64 offset = rbd_segment_offset(rbd_dev, img_offset); > u64 length = rbd_segment_length(rbd_dev, img_offset, resid); > @@ -2317,23 +2536,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, > ceph_bvec_iter_advance(&bvec_it, length); > } > > - osd_req = rbd_osd_req_create(rbd_dev, op_type, > - (op_type == OBJ_OP_WRITE) ? 2 : 1, > - obj_request); > - if (!osd_req) > - goto out_unwind; > - > - obj_request->osd_req = osd_req; > obj_request->callback = rbd_img_obj_callback; > obj_request->img_offset = img_offset; > > - rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); > - > img_offset += length; > resid -= length; > } > > - return 0; > + return __rbd_img_fill_request(img_request); > > out_unwind: > for_each_obj_request_safe(img_request, obj_request, next_obj_request) > @@ -2712,16 +2922,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request) > > rbd_img_request_get(img_request); > for_each_obj_request_safe(img_request, obj_request, next_obj_request) { > - ret = rbd_img_obj_request_submit(obj_request); > - if (ret) > - goto out_put_ireq; > + rbd_obj_request_submit(obj_request); > } > > -out_put_ireq: > rbd_img_request_put(img_request); > return ret; > } > > +static void rbd_img_end_child_request(struct rbd_img_request *img_req); > + > +static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req, > + u64 img_offset, u32 bytes) > +{ > + struct rbd_img_request *img_req = obj_req->img_request; > + struct rbd_img_request *child_img_req; > + int ret; > + > + child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes); > + if (!child_img_req) > + return -ENOMEM; > + > + child_img_req->callback = rbd_img_end_child_request; > + > + if (!rbd_img_is_write(img_req)) { > + switch (obj_req->type) { > + case OBJ_REQUEST_BIO: > + ret = rbd_img_request_fill(child_img_req, > + OBJ_REQUEST_BIO, > + &obj_req->bio_pos); > + break; > + case OBJ_REQUEST_BVECS: > + ret = rbd_img_request_fill(child_img_req, > + OBJ_REQUEST_BVECS, > + &obj_req->bvec_pos); > + break; > + default: > + rbd_assert(0); > + } > + } else { > + struct ceph_bvec_iter it = { > + .bvecs = obj_req->copyup_bvecs, > + .iter = { .bi_size = bytes }, > + }; > + > + ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS, > + &it); > + } > + if (ret) { > + rbd_img_request_put(child_img_req); > + return ret; > + } > + > + rbd_img_request_submit(child_img_req); > + return 0; > +} > + > +static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + int ret; > + > + if (obj_req->result == -ENOENT && Previously there was a test for "image data" here; is it no longer checked because now we know all object requests have image data now? Some more cleanup is probably in order to convert "img_obj" in names just be "obj", since there is no longer the distinction. What takes the place of indicating that a request is layered (i.e., one with a parent image associated with it)? Is a zero parent_overlap sufficient to mean there is no parent? > + obj_req->img_offset < rbd_dev->parent_overlap && > + !obj_req->tried_parent) { > + u64 obj_overlap = min(obj_req->length, > + rbd_dev->parent_overlap - obj_req->img_offset); > + > + obj_req->tried_parent = true; Is there any chance a read could come in at this point (after tried_parent is set), but before the read from parent has completed? If it could, that path would erroeously treat it as a hole. (Maybe reads need a state machine too.) Maybe this isn't possible; I'm pretty rusty on a lot of the more subtle things... > + ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset, > + obj_overlap); > + if (ret) { > + obj_req->result = ret; > + return true; > + } > + return false; > + } > + > + /* > + * -ENOENT means a hole in the image -- zero-fill the entire > + * length of the request. A short read also implies zero-fill > + * to the end of the request. In both cases we update xferred > + * count to indicate the whole request was satisfied. > + */ > + if (obj_req->result == -ENOENT || > + (!obj_req->result && obj_req->xferred < obj_req->length)) { > + rbd_assert(!obj_req->xferred || !obj_req->result); > + rbd_obj_zero_range(obj_req, obj_req->xferred, > + obj_req->length - obj_req->xferred); > + obj_req->result = 0; > + obj_req->xferred = obj_req->length; > + } > + > + return true; > +} > + > +/* > + * copyup_bvecs pages are never highmem pages > + */ > +static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) > +{ > + struct ceph_bvec_iter it = { > + .bvecs = bvecs, > + .iter = { .bi_size = bytes }, > + }; > + > + ceph_bvec_iter_advance_step(&it, bytes, ({ > + if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0, > + bv.bv_len)) > + return false; > + })); > + return true; > +} > + > +static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; > + > + dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); > + rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); > + rbd_osd_req_destroy(obj_req->osd_req); > + > + /* > + * Create a copyup request with the same number of OSD ops as > + * the original request. The original request was stat + op(s), > + * the new copyup request will be copyup + the same op(s). > + */ > + obj_req->osd_req = rbd_osd_req_create(rbd_dev, > + rbd_img_request_op_type(obj_req->img_request), > + num_osd_ops, obj_req); > + if (!obj_req->osd_req) > + return -ENOMEM; > + > + /* > + * Only send non-zero copyup data to save some I/O and network > + * bandwidth -- zero copyup data is equivalent to the object not > + * existing. > + */ > + if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) { > + dout("%s obj_req %p detected zeroes\n", __func__, obj_req); > + bytes = 0; > + } > + > + osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", > + "copyup"); > + osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, > + obj_req->copyup_bvecs, bytes); > + > + switch (rbd_img_request_op_type(obj_req->img_request)) { > + case OBJ_OP_WRITE: > + __rbd_obj_setup_write(obj_req, 1); > + break; > + case OBJ_OP_DISCARD: > + rbd_assert(!rbd_obj_is_entire(obj_req)); > + __rbd_obj_setup_discard(obj_req, 1); > + break; > + default: > + rbd_assert(0); > + } > + > + rbd_obj_request_submit(obj_req); > + /* FIXME: in lieu of rbd_img_obj_callback() */ > + rbd_img_request_put(obj_req->img_request); > + return 0; > +} > + > static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap) > { > u32 i; > @@ -2850,6 +3215,146 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request) > obj_request_done_set(obj_request); > } > > +static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req) > +{ > + struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev; > + u64 img_offset; > + u64 obj_overlap; > + int ret; > + > + if (!obj_request_overlaps_parent(obj_req)) { > + /* > + * The overlap has become 0 (most likely because the > + * image has been flattened). Use rbd_obj_issue_copyup() > + * to re-submit the original write request -- the copyup > + * operation itself will be a no-op, since someone must > + * have populated the child object while we weren't > + * looking. Move to WRITE_FLAT state as we'll be done > + * with the operation once the null copyup completes. > + */ > + obj_req->write_state = RBD_OBJ_WRITE_FLAT; > + return rbd_obj_issue_copyup(obj_req, 0); > + } > + > + /* > + * Determine the byte range covered by the object in the > + * child image to which the original request was to be sent. > + */ > + img_offset = obj_req->img_offset - obj_req->offset; > + obj_overlap = rbd_dev->layout.object_size; > + > + /* > + * There is no defined parent data beyond the parent > + * overlap, so limit what we read at that boundary if > + * necessary. > + */ > + if (img_offset + obj_overlap > rbd_dev->parent_overlap) { > + rbd_assert(img_offset < rbd_dev->parent_overlap); > + obj_overlap = rbd_dev->parent_overlap - img_offset; > + } > + > + ret = setup_copyup_bvecs(obj_req, obj_overlap); > + if (ret) > + return ret; > + > + obj_req->write_state = RBD_OBJ_WRITE_COPYUP; > + return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap); > +} > + > +static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req) > +{ > + int ret; > + > +again: > + switch (obj_req->write_state) { > + case RBD_OBJ_WRITE_GUARD: > + rbd_assert(!obj_req->xferred); > + if (obj_req->result == -ENOENT) { > + /* > + * The target object doesn't exist. Read the data for > + * the entire target object up to the overlap point (if > + * any) from the parent, so we can use it for a copyup. > + */ > + ret = rbd_obj_handle_write_guard(obj_req); > + if (ret) { > + obj_req->result = ret; > + return true; > + } > + return false; > + } > + /* fall through */ > + case RBD_OBJ_WRITE_FLAT: > + if (!obj_req->result) > + /* > + * There is no such thing as a successful short > + * write -- indicate the whole request was satisfied. > + */ > + obj_req->xferred = obj_req->length; > + return true; > + case RBD_OBJ_WRITE_COPYUP: > + obj_req->write_state = RBD_OBJ_WRITE_GUARD; > + if (obj_req->result) > + goto again; > + > + rbd_assert(obj_req->xferred); > + ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred); > + if (ret) { > + obj_req->result = ret; > + return true; > + } > + return false; > + default: > + rbd_assert(0); > + } > +} > + /* Returns true if the request is complete, false otherwise */ > +static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req) > +{ > + switch (rbd_img_request_op_type(obj_req->img_request)) { > + case OBJ_OP_READ: > + return rbd_obj_handle_read(obj_req); > + case OBJ_OP_WRITE: > + return rbd_obj_handle_write(obj_req); > + case OBJ_OP_DISCARD: > + if (rbd_obj_handle_write(obj_req)) { > + /* > + * Hide -ENOENT from delete/truncate/zero -- discarding > + * a non-existent object is not a problem. > + */ > + if (obj_req->result == -ENOENT) { > + obj_req->result = 0; > + obj_req->xferred = obj_req->length; > + } > + return true; > + } > + return false; > + default: > + rbd_assert(0); > + } > +} > + > +static void rbd_img_end_child_request(struct rbd_img_request *img_req) > +{ > + struct rbd_obj_request *obj_req = img_req->obj_request; > + > + rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags)); > + > + obj_req->result = img_req->result; > + obj_req->xferred = img_req->xferred; > + rbd_img_request_put(img_req); > + > + rbd_obj_handle_request(obj_req); > +} > + > +static void rbd_obj_handle_request(struct rbd_obj_request *obj_req) > +{ > + if (!__rbd_obj_handle_request(obj_req)) > + return; > + > + obj_request_done_set(obj_req); > + rbd_obj_request_complete(obj_req); > +} > + > static const struct rbd_client_id rbd_empty_cid; > > static bool rbd_cid_equal(const struct rbd_client_id *lhs, > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html