On Thu, Oct 7, 2010 at 2:04 PM, Anthony Liguori <anthony@xxxxxxxxxxxxx> wrote: > On 10/07/2010 03:47 PM, Yehuda Sadeh Weinraub wrote: >>> >>> How is that possible? Are the callbacks delivered in the context of a >>> different thread? If so, don't you need locking? >>> >> >> Not sure I'm completely following you. The callbacks are delivered in >> the context of a different thread, but won't run concurrently. > > Concurrently to what? How do you prevent them from running concurrently > with qemu? There are two types of callbacks. The first is for rados aio completions, and the second one is the one added later for the fd glue layer. The first callback, called by librados whenever aio completes, runs in the context of a single librados thread: +static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) +{ + RBDAIOCB *acb = rcb->acb; rcb is per a single aio. Was created before and will be destroyed here, whereas acb is shared between a few aios, however, it was generated before the first aio was created. + int64_t r; + uint64_t buf = 1; + int i; + + acb->aiocnt--; acb->aiocnt has been set before initiating all the aios, so it's ok to touch it now. Same goes to all acb fields. + r = rados_aio_get_return_value(c); + rados_aio_release(c); + if (acb->write) { + if (r < 0) { + acb->ret = r; + acb->error = 1; + } else if (!acb->error) { + acb->ret += rcb->segsize; + } + } else { + if (r == -ENOENT) { + memset(rcb->buf, 0, rcb->segsize); + if (!acb->error) { + acb->ret += rcb->segsize; + } + } else if (r < 0) { + acb->ret = r; + acb->error = 1; + } else if (r < rcb->segsize) { + memset(rcb->buf + r, 0, rcb->segsize - r); + if (!acb->error) { + acb->ret += rcb->segsize; + } + } else if (!acb->error) { + acb->ret += r; + } + } + if (write(acb->s->efd, &buf, sizeof(buf)) < 0) This will wake up the io_read() + error_report("failed writing to acb->s->efd\n"); + qemu_free(rcb); + i = 0; + if (!acb->aiocnt && acb->bh) { + qemu_bh_schedule(acb->bh); This is the only qemu related call in here, seems safe to call it. + } +} The scheduled bh function will be called only after all aios that relate to this specific aio set are done, so the following seems ok, as there's no more acb references. +static void rbd_aio_bh_cb(void *opaque) +{ + RBDAIOCB *acb = opaque; + uint64_t buf = 1; + + if (!acb->write) { + qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size); + } + qemu_vfree(acb->bounce); + acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); + qemu_bh_delete(acb->bh); + acb->bh = NULL; + + if (write(acb->s->efd, &buf, sizeof(buf)) < 0) + error_report("failed writing to acb->s->efd\n"); + qemu_aio_release(acb); +} Now, the second ones are the io_read(), in which we have our glue fd. We send uint64 per each completed io +static void rbd_aio_completion_cb(void *opaque) +{ + BDRVRBDState *s = opaque; + + uint64_t val; + ssize_t ret; + + do { + if ((ret = read(s->efd, &val, sizeof(val))) > 0) { + s->qemu_aio_count -= val; There is an issue here with s->qemu_aio_count which needs to be protected by a mutex. Other than that, it just reads from s->efd. + } + } while (ret < 0 && errno == EINTR); + + return; +} + +static int rbd_aio_flush_cb(void *opaque) +{ + BDRVRBDState *s = opaque; + + return (s->qemu_aio_count > 0); Same here as with the previous one, needs a mutex around s->qemu_aio_count. +} > > If you saw lock ups, I bet that's what it was from. > As I explained before, before introducing the fd glue layer, the lack of fd associated with our block device caused that there was no way for qemu to check whether all aios were flushed or not, which didn't work well when doing migration/savevm. Thanks, Yehuda -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html