On Sat, Oct 9, 2010 at 2:21 AM, Stefan Hajnoczi <stefanha@xxxxxxxxx> wrote: > On Fri, Oct 8, 2010 at 8:00 PM, Yehuda Sadeh <yehuda@xxxxxxxxxxxxxxx> wrote: > No flush operation is supported. Can the guest be sure written data > is on stable storage when it receives completion? > That's part of the consistency that rados provides. >> +/* >> + * This aio completion is being called from rbd_aio_event_reader() and >> + * runs in qemu context. It schedules a bh, but just in case the aio >> + * was not cancelled before. > > Cancellation looks unsafe to me because acb is freed for cancel but > then accessed here! Also see my comment on aio_cancel() below. Right. Added ->cancelled field instead of releasing the acb. > >> +/* >> + * Cancel aio. Since we don't reference acb in a non qemu threads, >> + * it is safe to access it here. >> + */ >> +static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) >> +{ >> + RBDAIOCB *acb = (RBDAIOCB *) blockacb; >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + qemu_aio_release(acb); > > Any pending librados completions are still running here and will then > cause acb to be accessed after they complete. If there is no safe way > to cancel then wait for the request to complete. Yeah, we'll keep the acb alive now and just set the ->cancelled. When last rados cb arrives for this acb we'll clean it up. > >> +} >> + >> +static AIOPool rbd_aio_pool = { >> + .aiocb_size = sizeof(RBDAIOCB), >> + .cancel = rbd_aio_cancel, >> +}; >> + >> +/* >> + * This is the callback function for rados_aio_read and _write >> + * >> + * Note: this function is being called from a non qemu thread so >> + * we need to be careful about what we do here. Generally we only >> + * write to the block notification pipe, and do the rest of the >> + * io completion handling from rbd_aio_event_reader() which >> + * runs in a qemu context. > > Do librados threads have all signals blocked? QEMU uses signals so it > is important that this signal not get sent to a librados thread and > discarded. I have seen this issue in the past when using threaded > libraries in QEMU. We're no longer blocking threads. We've hit that before too. > >> + */ >> +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) >> +{ >> + rcb->ret = rados_aio_get_return_value(c); >> + rados_aio_release(c); >> + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { > > You are writing RADOSCB* so sizeof(rcb) should be used. Oops. > >> + error_report("failed writing to acb->s->fds\n"); >> + qemu_free(rcb); >> + } >> +} >> + >> +/* Callback when all queued rados_aio requests are complete */ >> + >> +static void rbd_aio_bh_cb(void *opaque) >> +{ >> + RBDAIOCB *acb = opaque; >> + >> + if (!acb->write) { >> + qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); >> + } >> + qemu_vfree(acb->bounce); >> + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> + qemu_bh_delete(acb->bh); >> + acb->bh = NULL; >> + >> + qemu_aio_release(acb); >> +} >> + >> +static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, >> + int64_t sector_num, >> + QEMUIOVector *qiov, >> + int nb_sectors, >> + BlockDriverCompletionFunc *cb, >> + void *opaque, int write) >> +{ >> + RBDAIOCB *acb; >> + RADOSCB *rcb; >> + rados_completion_t c; >> + char n[RBD_MAX_SEG_NAME_SIZE]; >> + int64_t segnr, segoffs, segsize, last_segnr; >> + int64_t off, size; >> + char *buf; >> + >> + BDRVRBDState *s = bs->opaque; >> + >> + acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque); >> + acb->write = write; >> + acb->qiov = qiov; >> + acb->bounce = qemu_blockalign(bs, qiov->size); >> + acb->aiocnt = 0; >> + acb->ret = 0; >> + acb->error = 0; >> + acb->s = s; >> + >> + if (!acb->bh) { >> + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); >> + } > > When do you expect acb->bh to be non-NULL? I guess this part was copied from somewhere else. We're not expecting it as we clean it up always, so check is removed. > >> + >> + if (write) { >> + qemu_iovec_to_buffer(acb->qiov, acb->bounce); >> + } >> + >> + buf = acb->bounce; >> + >> + off = sector_num * BDRV_SECTOR_SIZE; >> + size = nb_sectors * BDRV_SECTOR_SIZE; >> + segnr = off / s->objsize; >> + segoffs = off % s->objsize; >> + segsize = s->objsize - segoffs; >> + >> + last_segnr = ((off + size - 1) / s->objsize); >> + acb->aiocnt = (last_segnr - segnr) + 1; >> + >> + s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ >> + >> + if (write && s->read_only) { >> + acb->ret = -EROFS; >> + return NULL; >> + } > > block.c:bdrv_aio_writev() will reject writes to read-only block > devices. This check can be eliminated and it also prevents leaking > acb here. Removed. Thanks, Yehuda diff --git a/block/rbd.c b/block/rbd.c index a51fc36..575e481 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -51,6 +51,7 @@ typedef struct RBDAIOCB { int aiocnt; int error; struct BDRVRBDState *s; + int cancelled; } RBDAIOCB; typedef struct RADOSCB { @@ -325,8 +326,18 @@ static void rbd_complete_aio(RADOSCB *rcb) int64_t r; int i; - r = rcb->ret; acb->aiocnt--; + + if (acb->cancelled) { + if (!acb->aiocnt) { + qemu_vfree(acb->bounce); + qemu_aio_release(acb); + } + goto done; + } + + r = rcb->ret; + if (acb->write) { if (r < 0) { acb->ret = r; @@ -356,6 +367,7 @@ static void rbd_complete_aio(RADOSCB *rcb) if (!acb->aiocnt && acb->bh) { qemu_bh_schedule(acb->bh); } +done: qemu_free(rcb); i = 0; } @@ -585,7 +597,7 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) RBDAIOCB *acb = (RBDAIOCB *) blockacb; qemu_bh_delete(acb->bh); acb->bh = NULL; - qemu_aio_release(acb); + acb->cancelled = 1; } static AIOPool rbd_aio_pool = { @@ -606,7 +618,7 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) { rcb->ret = rados_aio_get_return_value(c); rados_aio_release(c); - if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(&rcb)) < 0) { + if (write(rcb->s->fds[RBD_FD_WRITE], (void *)&rcb, sizeof(rcb)) < 0) { error_report("failed writing to acb->s->fds\n"); qemu_free(rcb); } @@ -654,10 +666,8 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, acb->ret = 0; acb->error = 0; acb->s = s; - - if (!acb->bh) { - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); - } + acb->cancelled = 0; + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); if (write) { qemu_iovec_to_buffer(acb->qiov, acb->bounce); @@ -676,11 +686,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ - if (write && s->read_only) { - acb->ret = -EROFS; - return NULL; - } - while (size > 0) { if (size < segsize) { segsize = size; -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html