Hi Yehuda, this is an updated version of the patch I've sent yesterday. It is now based on the current rbd branch and it is using qemu_cond_* and qemu_mutex_*. Regards, Christian --- Makefile.objs | 1 + block/rbd.c | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletions(-) diff --git a/Makefile.objs b/Makefile.objs index 56a13c1..e1b8513 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -12,6 +12,7 @@ block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o block-obj-$(CONFIG_POSIX) += compatfd.o +block-obj-$(CONFIG_RBD) += qemu-thread.o block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o diff --git a/block/rbd.c b/block/rbd.c index e7d4083..01786da 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -24,7 +24,7 @@ #include <rados/librados.h> #include <signal.h> - +#include <qemu-thread.h> int eventfd(unsigned int initval, int flags); @@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags); */ #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) +#define MAX_QUEUE_SIZE 33554432 // 32MB typedef struct RBDAIOCB { BlockDriverAIOCB common; @@ -82,6 +83,9 @@ typedef struct BDRVRBDState { uint64_t objsize; int qemu_aio_count; int read_only; + uint64_t queuesize; + QemuMutex *queue_mutex; + QemuCond *queue_threshold; } BDRVRBDState; typedef struct rbd_obj_header_ondisk RbdHeader1; @@ -487,6 +491,13 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) s->read_only = (snap != NULL); + s->queuesize = 0; + + s->queue_mutex = qemu_malloc(sizeof(QemuMutex)); + qemu_mutex_init(s->queue_mutex); + s->queue_threshold = qemu_malloc(sizeof(QemuCond)); + qemu_cond_init(s->queue_threshold); + s->efd = eventfd(0, 0); if (s->efd < 0) { error_report("error opening eventfd"); @@ -523,6 +534,12 @@ static void rbd_close(BlockDriverState *bs) { BDRVRBDState *s = bs->opaque; + // The following do not exist in qemu: + // qemu_cond_destroy(s->queue_threshold); + // qemu_mutex_destroy(s->queue_mutex); + qemu_free(s->queue_threshold); + qemu_free(s->queue_mutex); + rados_close_pool(s->header_pool); rados_close_pool(s->pool); rados_deinitialize(); @@ -613,6 +630,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) int i; acb->aiocnt--; + acb->s->queuesize -= rcb->segsize; + if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) { + qemu_mutex_lock(acb->s->queue_mutex); + qemu_cond_signal(acb->s->queue_threshold); + qemu_mutex_unlock(acb->s->queue_mutex); + } r = rados_aio_get_return_value(c); rados_aio_release(c); if (acb->write) { @@ -735,6 +758,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, rcb->segsize = segsize; rcb->buf = buf; + while (s->queuesize > MAX_QUEUE_SIZE) { + qemu_mutex_lock(s->queue_mutex); + qemu_cond_wait(s->queue_threshold, s->queue_mutex); + qemu_mutex_unlock(s->queue_mutex); + } + + s->queuesize += segsize; + if (write) { rados_aio_create_completion(rcb, NULL, (rados_callback_t) rbd_finish_aiocb, -- 1.6.5.2 -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html