[PATCH 1/2] qemu-kvm/rbd: Implement aio

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Finally I've managed to get a working implementation of async-I/O for qemu.
(At least in my test setup things are looking good now.)

The attached patch requires a small modification of librados. I will
send a mail on this in a few minutes.

Christian 
---
 block/rbd.c |  179 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 178 insertions(+), 1 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 2177fcd..a3bfb4d 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -47,6 +47,25 @@
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
+typedef struct RBDAIOCB {
+        BlockDriverAIOCB common;
+        QEMUBH *bh;
+        int ret;
+        QEMUIOVector *qiov;
+        char *bounce;
+        int write;
+        int64_t sector_num;
+        int aiocnt;
+	int rccomplete;
+} RBDAIOCB;
+
+typedef struct RADOSCB {
+        int rcbid;
+        RBDAIOCB *acb;
+        int done;
+        int64_t segsize;
+        char *buf;
+} RADOSCB;
 
 typedef struct RBDRVRBDState {
 	rados_pool_t pool;
@@ -341,6 +360,162 @@ static int rbd_read(BlockDriverState *bs, int64_t sector_num,
 	return(0);
 }
 
+static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+    RBDAIOCB *acb = (RBDAIOCB *)blockacb;
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static AIOPool rbd_aio_pool = 
+{
+    .aiocb_size         = sizeof(RBDAIOCB),
+    .cancel             = rbd_aio_cancel,
+};
+
+static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) 
+{
+    RBDAIOCB *acb = rcb->acb;
+    int64_t r;
+    int i;
+
+    if (rados_aio_is_complete(c)) {
+        acb->aiocnt--;
+        r = rados_aio_get_return_value(c);
+        rados_aio_set_callback(c, NULL, NULL);
+        rados_aio_release(c);
+        if (acb->write) {
+            acb->ret += r;
+        } else {
+            if (r < 0) {
+                memset(rcb->buf, 0, rcb->segsize);
+                acb->ret += rcb->segsize;
+            } else if (r < rcb->segsize) {
+                memset(rcb->buf+r, 0, rcb->segsize-r);
+                acb->ret += rcb->segsize;
+            } else {
+                acb->ret += r;
+            }
+        }
+        qemu_free(rcb);
+        i=0;
+        while ((acb->aiocnt == 0) && !acb->rccomplete && i<5) {
+	    usleep(100);
+            i++;
+        }
+        if ((acb->aiocnt == 0) && acb->rccomplete && acb->bh) {
+            qemu_bh_schedule(acb->bh);
+	}
+    }
+}
+            
+static void rbd_aio_bh_cb(void *opaque) 
+{
+    RBDAIOCB *acb = opaque;
+
+    if (!acb->write) {
+        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
+    }
+    qemu_vfree(acb->bounce);
+    acb->common.cb(acb->common.opaque, acb->ret);
+    qemu_bh_delete(acb->bh);
+    acb->bh = NULL;
+    qemu_aio_release(acb);
+}
+
+static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
+                                           int64_t sector_num,
+                                           QEMUIOVector *qiov,
+                                           int nb_sectors,
+                                           BlockDriverCompletionFunc *cb,
+                                           void *opaque,
+                                           int write)
+{
+    RBDAIOCB *acb;
+    RADOSCB *rcb;
+    rados_completion_t c;
+    char n[RBD_MAX_SEG_NAME_SIZE];
+    int64_t segnr, segoffs, segsize;
+    int64_t off, size;
+    char *buf;
+
+    RBDRVRBDState *s = bs->opaque;
+
+    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
+    acb->write = write;
+    acb->qiov = qiov;
+    acb->bounce = qemu_blockalign(bs, qiov->size);
+    acb->aiocnt=0;
+    acb->ret=0;
+    acb->rccomplete=0;
+
+    if (!acb->bh) {
+        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
+    }
+
+    if (write) {
+        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
+    }
+
+    buf = acb->bounce;
+
+    off = sector_num * 512;
+    size = nb_sectors * 512;
+    segnr = (int64_t) (off / s->objsize);
+    segoffs = (int64_t) (off % s->objsize);
+    segsize  = (int64_t) (s->objsize - segoffs);
+
+    while (size > 0) {
+        if (size < segsize) {
+            segsize = size;
+        }
+
+        snprintf(n, RBD_MAX_SEG_NAME_SIZE, "%s.%012llx", s->name, (long long unsigned int) segnr);
+
+        rcb = qemu_malloc(sizeof(RADOSCB));
+        rcb->done = 0;
+        rcb->acb = acb;
+        rcb->segsize = segsize;
+        rcb->buf = buf;
+
+        acb->aiocnt++;
+
+        rados_aio_create_completion((rados_callback_t) rbd_finish_aiocb, rcb, &c);
+        if (write) {
+            rados_aio_write(s->pool, n, segoffs, buf , segsize, c);
+	} else {
+            rados_aio_read(s->pool, n, segoffs, buf , segsize, c);
+	}
+
+        buf += segsize;
+        size -= segsize;
+        segoffs = 0;
+        segsize = s->objsize;
+        segnr++;
+    }
+
+    acb->rccomplete=1;
+
+    return &acb->common;
+}
+
+
+static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
+}
+
+static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState *bs,
+        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+        BlockDriverCompletionFunc *cb, void *opaque)
+{
+    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
+}
+
+
 static int rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
 {
     RBDRVRBDState *s = bs->opaque;
@@ -381,6 +556,9 @@ static BlockDriver bdrv_rbd = {
 	.create_options = rbd_create_options,
 	.bdrv_getlength	= rbd_getlength,
 	.protocol_name	= "rbd",
+
+	.bdrv_aio_readv = rbd_aio_readv,
+	.bdrv_aio_writev= rbd_aio_writev,
 };
 
 static void bdrv_rbd_init(void) {
@@ -389,4 +567,3 @@ static void bdrv_rbd_init(void) {
 
 block_init(bdrv_rbd_init);
 
-
-- 
1.6.6.1


-- 
Christian Brunner                              MUC.DE e.V.
                                               Joseph-Dollinger-Bogen 14
                                               D-80807 Muenchen
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux