On Fri, Oct 8, 2010 at 7:06 AM, Anthony Liguori <anthony@xxxxxxxxxxxxx> wrote: > On 10/07/2010 05:45 PM, Sage Weil wrote: >> >> I'm sorry, I'm having a hard time understanding what it is you're >> objecting to, or what you would prefer, as there are two different things >> we're talking about here (callbacks and fd glue/pipes). (Please bear with >> me as I am not a qemu expert!) >> >> The first is the aio completion. You said a few messages back: >> >> >>> >>> It looks like you just use the eventfd to signal aio completion >>> callbacks. A better way to do this would be to schedule a bottom half. >>> >> >> This is what we're doing. The librados makes a callback to rbd.c's >> rbd_finish_aiocb(), which updates some internal rbd accounting and then >> calls qemu_bh_schedule(). Is that part right? >> > > No. You're calling qemu_bh_schedule() in a separate thread in parallel to > other operations. Oh, that makes it more clean. Considering that we did it for kvm, and looking at the kvm qemu_bh_schedule() implementation, it does look thread safe (there might be an issue though with canceling the bh though, haven't looked at it, not really relevant). In any case, we already have the completion code with the pipes (currently eventfd) that runs in the qemu context, so just moving the bh scheduling there would work. Something like this (still needs to add some mutex around qemu_aio_count that was missing before): diff --git a/block/rbd.c b/block/rbd.c index 13db079..164e547 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -315,13 +315,16 @@ done: static void rbd_aio_completion_cb(void *opaque) { BDRVRBDState *s = opaque; + RBDAIOCB *acb; - uint64_t val; ssize_t ret; do { - if ((ret = read(s->efd, &val, sizeof(val))) > 0) { - s->qemu_aio_count -= val; + if ((ret = read(s->efd, &acb, sizeof(acb))) > 0) { + s->qemu_aio_count --; + if (!acb->aiocnt && acb->bh) { + qemu_bh_schedule(acb->bh); + } } } while (ret < 0 && errno == EINTR); @@ -539,7 +542,6 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) { RBDAIOCB *acb = rcb->acb; int64_t r; - uint64_t buf = 1; int i; acb->aiocnt--; @@ -570,13 +572,10 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) acb->ret += r; } } - if (write(acb->s->efd, &buf, sizeof(buf)) < 0) + if (write(acb->s->efd, (void *)&acb, sizeof(&acb)) < 0) error_report("failed writing to acb->s->efd\n"); qemu_free(rcb); i = 0; - if (!acb->aiocnt && acb->bh) { - qemu_bh_schedule(acb->bh); - } } /* Callback when all queued rados_aio requests are complete */ @@ -584,7 +583,6 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) static void rbd_aio_bh_cb(void *opaque) { RBDAIOCB *acb = opaque; - uint64_t buf = 1; if (!acb->write) { qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); @@ -594,8 +592,6 @@ static void rbd_aio_bh_cb(void *opaque) qemu_bh_delete(acb->bh); acb->bh = NULL; - if (write(acb->s->efd, &buf, sizeof(buf)) < 0) - error_report("failed writing to acb->s->efd\n"); qemu_aio_release(acb); } @@ -644,7 +640,7 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, last_segnr = ((off + size - 1) / s->objsize); acb->aiocnt = (last_segnr - segnr) + 1; - s->qemu_aio_count+=acb->aiocnt + 1; /* All the RADOSCB and the related RBDAIOCB */ + s->qemu_aio_count+=acb->aiocnt; /* All the RADOSCB and the related RBDAIOCB */ if (write && s->read_only) { acb->ret = -EROFS; -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html