Apart from two cosmetic issues (see below), I think this patch is ready to replace the old rbd driver. You can add: Reviewed-by: Christian Brunner <chb@xxxxxx> Regards Christian 2011/5/24 Josh Durgin <josh.durgin@xxxxxxxxxxxxx>: > librbd stacks on top of librados to provide access > to rbd images. > > Using librbd simplifies the qemu code, and allows > qemu to use new versions of the rbd format > with few (if any) changes. > > Signed-off-by: Josh Durgin <josh.durgin@xxxxxxxxxxxxx> > Signed-off-by: Yehuda Sadeh <yehuda@xxxxxxxxxxxxxxx> > --- > block/rbd.c | 790 +++++++++++++++-------------------------------------- > block/rbd_types.h | 71 ----- > configure | 33 +-- > 3 files changed, 224 insertions(+), 670 deletions(-) > delete mode 100644 block/rbd_types.h > > diff --git a/block/rbd.c b/block/rbd.c > index 249a590..1c8e7c7 100644 > --- a/block/rbd.c > +++ b/block/rbd.c > @@ -1,20 +1,22 @@ > /* > * QEMU Block driver for RADOS (Ceph) > * > - * Copyright (C) 2010 Christian Brunner <chb@xxxxxx> > + * Copyright (C) 2010-2011 Christian Brunner <chb@xxxxxx>, > + * Josh Durgin <josh.durgin@xxxxxxxxxxxxx> > * > * This work is licensed under the terms of the GNU GPL, version 2. See > * the COPYING file in the top-level directory. > * > */ > > +#include <inttypes.h> > + > #include "qemu-common.h" > #include "qemu-error.h" > > -#include "rbd_types.h" > #include "block_int.h" > > -#include <rados/librados.h> > +#include <rbd/librbd.h> > > > > @@ -40,6 +42,12 @@ > > #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) > > +#define RBD_MAX_CONF_NAME_SIZE 128 > +#define RBD_MAX_CONF_VAL_SIZE 512 > +#define RBD_MAX_CONF_SIZE 1024 > +#define RBD_MAX_POOL_NAME_SIZE 128 > +#define RBD_MAX_SNAP_NAME_SIZE 128 > + > typedef struct RBDAIOCB { > BlockDriverAIOCB common; > QEMUBH *bh; > @@ -48,7 +56,6 @@ typedef struct RBDAIOCB { > char *bounce; > int write; > int64_t sector_num; > - int aiocnt; > int error; > struct BDRVRBDState *s; > int cancelled; > @@ -59,7 +66,7 @@ typedef struct RADOSCB { > RBDAIOCB *acb; > struct BDRVRBDState *s; > int done; > - int64_t segsize; > + int64_t size; > char *buf; > int ret; > } RADOSCB; > @@ -69,25 +76,22 @@ typedef struct RADOSCB { > > typedef struct BDRVRBDState { > int fds[2]; > - rados_pool_t pool; > - rados_pool_t header_pool; > - char name[RBD_MAX_OBJ_NAME_SIZE]; > - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; > - uint64_t size; > - uint64_t objsize; > + rados_t cluster; > + rados_ioctx_t io_ctx; > + rbd_image_t image; > + char name[RBD_MAX_IMAGE_NAME_SIZE]; > int qemu_aio_count; > + char *snap; > int event_reader_pos; > RADOSCB *event_rcb; > } BDRVRBDState; > > -typedef struct rbd_obj_header_ondisk RbdHeader1; > - > static void rbd_aio_bh_cb(void *opaque); > > -static int rbd_next_tok(char *dst, int dst_len, > - char *src, char delim, > - const char *name, > - char **p) > +static int qemu_rbd_next_tok(char *dst, int dst_len, > + char *src, char delim, > + const char *name, > + char **p) > { > int l; > char *end; > @@ -115,10 +119,10 @@ static int rbd_next_tok(char *dst, int dst_len, > return 0; > } > > -static int rbd_parsename(const char *filename, > - char *pool, int pool_len, > - char *snap, int snap_len, > - char *name, int name_len) > +static int qemu_rbd_parsename(const char *filename, > + char *pool, int pool_len, > + char *snap, int snap_len, > + char *name, int name_len) > { > const char *start; > char *p, *buf; > @@ -131,12 +135,12 @@ static int rbd_parsename(const char *filename, > buf = qemu_strdup(start); > p = buf; > > - ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); > + ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p); > if (ret < 0 || !p) { > ret = -EINVAL; > goto done; > } > - ret = rbd_next_tok(name, name_len, p, '@', "object name", &p); > + ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p); > if (ret < 0) { > goto done; > } > @@ -145,123 +149,35 @@ static int rbd_parsename(const char *filename, > goto done; > } > > - ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); > + ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p); > > done: > qemu_free(buf); > return ret; > } > > -static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc) > -{ > - uint32_t len = strlen(name); > - uint32_t len_le = cpu_to_le32(len); > - /* total_len = encoding op + name + empty buffer */ > - uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t); > - uint8_t *desc = NULL; > - > - desc = qemu_malloc(total_len); > - > - *tmap_desc = (char *)desc; > - > - *desc = op; > - desc++; > - memcpy(desc, &len_le, sizeof(len_le)); > - desc += sizeof(len_le); > - memcpy(desc, name, len); > - desc += len; > - len = 0; /* no need for endian conversion for 0 */ > - memcpy(desc, &len, sizeof(len)); > - desc += sizeof(len); > - > - return (char *)desc - *tmap_desc; > -} > - > -static void free_tmap_op(char *tmap_desc) > -{ > - qemu_free(tmap_desc); > -} > - > -static int rbd_register_image(rados_pool_t pool, const char *name) > -{ > - char *tmap_desc; > - const char *dir = RBD_DIRECTORY; > - int ret; > - > - ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc); > - if (ret < 0) { > - return ret; > - } > - > - ret = rados_tmap_update(pool, dir, tmap_desc, ret); > - free_tmap_op(tmap_desc); > - > - return ret; > -} > - > -static int touch_rbd_info(rados_pool_t pool, const char *info_oid) > -{ > - int r = rados_write(pool, info_oid, 0, NULL, 0); > - if (r < 0) { > - return r; > - } > - return 0; > -} > - > -static int rbd_assign_bid(rados_pool_t pool, uint64_t *id) > -{ > - uint64_t out[1]; > - const char *info_oid = RBD_INFO; > - > - *id = 0; > - > - int r = touch_rbd_info(pool, info_oid); > - if (r < 0) { > - return r; > - } > - > - r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL, > - 0, (char *)out, sizeof(out)); > - if (r < 0) { > - return r; > - } > - > - le64_to_cpus(out); > - *id = out[0]; > - > - return 0; > -} > - > -static int rbd_create(const char *filename, QEMUOptionParameter *options) > +static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options) > { > int64_t bytes = 0; > int64_t objsize; > - uint64_t size; > - time_t mtime; > - uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER; > - char pool[RBD_MAX_SEG_NAME_SIZE]; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - char name[RBD_MAX_OBJ_NAME_SIZE]; > - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; > + int obj_order = 0; > + char pool[RBD_MAX_POOL_NAME_SIZE]; > + char name[RBD_MAX_IMAGE_NAME_SIZE]; > + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; > char *snap = NULL; > - RbdHeader1 header; > - rados_pool_t p; > - uint64_t bid; > - uint32_t hi, lo; > + rados_t cluster; > + rados_ioctx_t io_ctx; > int ret; > > - if (rbd_parsename(filename, > - pool, sizeof(pool), > - snap_buf, sizeof(snap_buf), > - name, sizeof(name)) < 0) { > + if (qemu_rbd_parsename(filename, pool, sizeof(pool), > + snap_buf, sizeof(snap_buf), > + name, sizeof(name)) < 0) { > return -EINVAL; > } > if (snap_buf[0] != '\0') { > snap = snap_buf; > } > > - snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX); > - > /* Read out options */ > while (options && options->name) { > if (!strcmp(options->name, BLOCK_OPT_SIZE)) { > @@ -277,82 +193,55 @@ static int rbd_create(const char *filename, QEMUOptionParameter *options) > error_report("obj size too small"); > return -EINVAL; > } > - obj_order = ffs(objsize) - 1; > + obj_order = ffs(objsize) - 1; > } > } > options++; > } > > - memset(&header, 0, sizeof(header)); > - pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT); > - pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE); > - pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION); > - header.image_size = cpu_to_le64(bytes); > - header.options.order = obj_order; > - header.options.crypt_type = RBD_CRYPT_NONE; > - header.options.comp_type = RBD_COMP_NONE; > - header.snap_seq = 0; > - header.snap_count = 0; > - > - if (rados_initialize(0, NULL) < 0) { > + if (rados_create(&cluster, NULL) < 0) { > error_report("error initializing"); > return -EIO; > } > > - if (rados_open_pool(pool, &p)) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + if (rados_conf_read_file(cluster, NULL) < 0) { > + error_report("error reading config file"); > + rados_shutdown(cluster); > return -EIO; > } > > - /* check for existing rbd header file */ > - ret = rados_stat(p, n, &size, &mtime); > - if (ret == 0) { > - ret=-EEXIST; > - goto done; > - } > - > - ret = rbd_assign_bid(p, &bid); > - if (ret < 0) { > - error_report("failed assigning block id"); > - rados_deinitialize(); > + if (rados_connect(cluster) < 0) { > + error_report("error connecting"); > + rados_shutdown(cluster); > return -EIO; > } > - hi = bid >> 32; > - lo = bid & 0xFFFFFFFF; > - snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo); > > - /* create header file */ > - ret = rados_write(p, n, 0, (const char *)&header, sizeof(header)); > - if (ret < 0) { > - goto done; > + if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) { > + error_report("error opening pool %s", pool); > + rados_shutdown(cluster); > + return -EIO; > } > > - ret = rbd_register_image(p, name); > -done: > - rados_close_pool(p); > - rados_deinitialize(); > + ret = rbd_create(io_ctx, name, bytes, &obj_order); > + rados_ioctx_destroy(io_ctx); > + rados_shutdown(cluster); > > return ret; > } > > /* > - * This aio completion is being called from rbd_aio_event_reader() and > - * runs in qemu context. It schedules a bh, but just in case the aio > + * This aio completion is being called from qemu_rbd_aio_event_reader() > + * and runs in qemu context. It schedules a bh, but just in case the aio > * was not cancelled before. > */ > -static void rbd_complete_aio(RADOSCB *rcb) > +static void qemu_rbd_complete_aio(RADOSCB *rcb) > { > RBDAIOCB *acb = rcb->acb; > int64_t r; > > - acb->aiocnt--; > - > if (acb->cancelled) { > - if (!acb->aiocnt) { > - qemu_vfree(acb->bounce); > - qemu_aio_release(acb); > - } > + qemu_vfree(acb->bounce); > + qemu_aio_release(acb); > goto done; > } > > @@ -363,32 +252,25 @@ static void rbd_complete_aio(RADOSCB *rcb) > acb->ret = r; > acb->error = 1; > } else if (!acb->error) { > - acb->ret += rcb->segsize; > + acb->ret = rcb->size; > } > } else { > - if (r == -ENOENT) { > - memset(rcb->buf, 0, rcb->segsize); > - if (!acb->error) { > - acb->ret += rcb->segsize; > - } > - } else if (r < 0) { > - memset(rcb->buf, 0, rcb->segsize); > + if (r < 0) { > + memset(rcb->buf, 0, rcb->size); > acb->ret = r; > acb->error = 1; > - } else if (r < rcb->segsize) { > - memset(rcb->buf + r, 0, rcb->segsize - r); > + } else if (r < rcb->size) { > + memset(rcb->buf + r, 0, rcb->size - r); > if (!acb->error) { > - acb->ret += rcb->segsize; > + acb->ret = rcb->size; > } > } else if (!acb->error) { > - acb->ret += r; > + acb->ret = r; > } > } > /* Note that acb->bh can be NULL in case where the aio was cancelled */ > - if (!acb->aiocnt) { > - acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > - qemu_bh_schedule(acb->bh); > - } > + acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb); > + qemu_bh_schedule(acb->bh); > done: > qemu_free(rcb); > } > @@ -397,7 +279,7 @@ done: > * aio fd read handler. It runs in the qemu context and calls the > * completion handling of completed rados aio operations. > */ > -static void rbd_aio_event_reader(void *opaque) > +static void qemu_rbd_aio_event_reader(void *opaque) > { > BDRVRBDState *s = opaque; > > @@ -413,176 +295,74 @@ static void rbd_aio_event_reader(void *opaque) > s->event_reader_pos += ret; > if (s->event_reader_pos == sizeof(s->event_rcb)) { > s->event_reader_pos = 0; > - rbd_complete_aio(s->event_rcb); > - s->qemu_aio_count --; > + qemu_rbd_complete_aio(s->event_rcb); > + s->qemu_aio_count--; > } > } > } > } while (ret < 0 && errno == EINTR); > } > > -static int rbd_aio_flush_cb(void *opaque) > +static int qemu_rbd_aio_flush_cb(void *opaque) > { > BDRVRBDState *s = opaque; > > return (s->qemu_aio_count > 0); > } > > - > -static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header) > -{ > - uint32_t snap_count = le32_to_cpu(header->snap_count); > - rados_snap_t *snaps = NULL; > - rados_snap_t seq; > - uint32_t i; > - uint64_t snap_names_len = le64_to_cpu(header->snap_names_len); > - int r; > - rados_snap_t snapid = 0; > - > - if (snap_count) { > - const char *header_snap = (const char *)&header->snaps[snap_count]; > - const char *end = header_snap + snap_names_len; > - snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count); > - > - for (i=0; i < snap_count; i++) { > - snaps[i] = le64_to_cpu(header->snaps[i].id); > - > - if (snap && strcmp(snap, header_snap) == 0) { > - snapid = snaps[i]; > - } > - > - header_snap += strlen(header_snap) + 1; > - if (header_snap > end) { > - error_report("bad header, snapshot list broken"); > - } > - } > - } > - > - if (snap && !snapid) { > - error_report("snapshot not found"); > - qemu_free(snaps); > - return -ENOENT; > - } > - seq = le32_to_cpu(header->snap_seq); > - > - r = rados_set_snap_context(pool, seq, snaps, snap_count); > - > - rados_set_snap(pool, snapid); > - > - qemu_free(snaps); > - > - return r; > -} > - > -#define BUF_READ_START_LEN 4096 > - > -static int rbd_read_header(BDRVRBDState *s, char **hbuf) > -{ > - char *buf = NULL; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - uint64_t len = BUF_READ_START_LEN; > - int r; > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - > - buf = qemu_malloc(len); > - > - r = rados_read(s->header_pool, n, 0, buf, len); > - if (r < 0) { > - goto failed; > - } > - > - if (r < len) { > - goto done; > - } > - > - qemu_free(buf); > - buf = qemu_malloc(len); > - > - r = rados_stat(s->header_pool, n, &len, NULL); > - if (r < 0) { > - goto failed; > - } > - > - r = rados_read(s->header_pool, n, 0, buf, len); > - if (r < 0) { > - goto failed; > - } > - > -done: > - *hbuf = buf; > - return 0; > - > -failed: > - qemu_free(buf); > - return r; > -} > - > -static int rbd_open(BlockDriverState *bs, const char *filename, int flags) > +static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags) > { > BDRVRBDState *s = bs->opaque; > - RbdHeader1 *header; > - char pool[RBD_MAX_SEG_NAME_SIZE]; > - char snap_buf[RBD_MAX_SEG_NAME_SIZE]; > - char *snap = NULL; > - char *hbuf = NULL; > + char pool[RBD_MAX_POOL_NAME_SIZE]; > + char snap_buf[RBD_MAX_SNAP_NAME_SIZE]; > int r; > > - if (rbd_parsename(filename, pool, sizeof(pool), > - snap_buf, sizeof(snap_buf), > - s->name, sizeof(s->name)) < 0) { > + if (qemu_rbd_parsename(filename, pool, sizeof(pool), > + snap_buf, sizeof(snap_buf), > + s->name, sizeof(s->name)) < 0) { > return -EINVAL; > } > + s->snap = NULL; > if (snap_buf[0] != '\0') { > - snap = snap_buf; > + s->snap = qemu_strdup(snap_buf); > } > > - if ((r = rados_initialize(0, NULL)) < 0) { > + r = rados_create(&s->cluster, NULL); > + if (r < 0) { > error_report("error initializing"); > return r; > } > > - if ((r = rados_open_pool(pool, &s->pool))) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + r = rados_conf_read_file(s->cluster, NULL); > + if (r < 0) { > + error_report("error reading config file"); > + rados_shutdown(s->cluster); > return r; > } > > - if ((r = rados_open_pool(pool, &s->header_pool))) { > - error_report("error opening pool %s", pool); > - rados_deinitialize(); > + r = rados_connect(s->cluster); > + if (r < 0) { > + error_report("error connecting"); > + rados_shutdown(s->cluster); > return r; > } > > - if ((r = rbd_read_header(s, &hbuf)) < 0) { > - error_report("error reading header from %s", s->name); > - goto failed; > - } > - > - if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) { > - error_report("Invalid header signature"); > - r = -EMEDIUMTYPE; > - goto failed; > - } > - > - if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) { > - error_report("Unknown image version"); > - r = -EMEDIUMTYPE; > - goto failed; > + r = rados_ioctx_create(s->cluster, pool, &s->io_ctx); > + if (r < 0) { > + error_report("error opening pool %s", pool); > + rados_shutdown(s->cluster); > + return r; > } > > - header = (RbdHeader1 *) hbuf; > - s->size = le64_to_cpu(header->image_size); > - s->objsize = 1ULL << header->options.order; > - memcpy(s->block_name, header->block_name, sizeof(header->block_name)); > - > - r = rbd_set_snapc(s->pool, snap, header); > + r = rbd_open(s->io_ctx, s->name, &s->image, s->snap); > if (r < 0) { > - error_report("failed setting snap context: %s", strerror(-r)); > - goto failed; > + error_report("error reading header from %s", s->name); > + rados_ioctx_destroy(s->io_ctx); > + rados_shutdown(s->cluster); > + return r; > } > > - bs->read_only = (snap != NULL); > + bs->read_only = (s->snap != NULL); > > s->event_reader_pos = 0; > r = qemu_pipe(s->fds); > @@ -592,23 +372,20 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) > } > fcntl(s->fds[0], F_SETFL, O_NONBLOCK); > fcntl(s->fds[1], F_SETFL, O_NONBLOCK); > - qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL, > - rbd_aio_flush_cb, NULL, s); > + qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader, > + NULL, qemu_rbd_aio_flush_cb, NULL, s); > > - qemu_free(hbuf); > > return 0; > > failed: > - qemu_free(hbuf); > - > - rados_close_pool(s->header_pool); > - rados_close_pool(s->pool); > - rados_deinitialize(); > + rbd_close(s->image); > + rados_ioctx_destroy(s->io_ctx); > + rados_shutdown(s->cluster); > return r; > } > > -static void rbd_close(BlockDriverState *bs) > +static void qemu_rbd_close(BlockDriverState *bs) > { > BDRVRBDState *s = bs->opaque; > > @@ -617,16 +394,17 @@ static void rbd_close(BlockDriverState *bs) > qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL, > NULL); > > - rados_close_pool(s->header_pool); > - rados_close_pool(s->pool); > - rados_deinitialize(); > + rbd_close(s->image); > + rados_ioctx_destroy(s->io_ctx); > + qemu_free(s->snap); > + rados_shutdown(s->cluster); > } > > /* > * Cancel aio. Since we don't reference acb in a non qemu threads, > * it is safe to access it here. > */ > -static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) > +static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb) > { > RBDAIOCB *acb = (RBDAIOCB *) blockacb; > acb->cancelled = 1; > @@ -634,39 +412,28 @@ static void rbd_aio_cancel(BlockDriverAIOCB *blockacb) > > static AIOPool rbd_aio_pool = { > .aiocb_size = sizeof(RBDAIOCB), > - .cancel = rbd_aio_cancel, > + .cancel = qemu_rbd_aio_cancel, > }; > > -/* > - * This is the callback function for rados_aio_read and _write > - * > - * Note: this function is being called from a non qemu thread so > - * we need to be careful about what we do here. Generally we only > - * write to the block notification pipe, and do the rest of the > - * io completion handling from rbd_aio_event_reader() which > - * runs in a qemu context. > - */ > -static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) > +static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb) > { > - int ret; > - rcb->ret = rados_aio_get_return_value(c); > - rados_aio_release(c); > + int ret = 0; > while (1) { > fd_set wfd; > - int fd = rcb->s->fds[RBD_FD_WRITE]; > + int fd = s->fds[RBD_FD_WRITE]; > > - /* send the rcb pointer to the qemu thread that is responsible > - for the aio completion. Must do it in a qemu thread context */ > + /* send the op pointer to the qemu thread that is responsible > + for the aio/op completion. Must do it in a qemu thread context */ > ret = write(fd, (void *)&rcb, sizeof(rcb)); > if (ret >= 0) { > break; > } > if (errno == EINTR) { > continue; > - } > + } > if (errno != EAGAIN) { > break; > - } > + } > > FD_ZERO(&wfd); > FD_SET(fd, &wfd); > @@ -675,13 +442,31 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) > } while (ret < 0 && errno == EINTR); > } > > + return ret; > +} > + > +/* > + * This is the callback function for rbd_aio_read and _write > + * > + * Note: this function is being called from a non qemu thread so > + * we need to be careful about what we do here. Generally we only > + * write to the block notification pipe, and do the rest of the > + * io completion handling from qemu_rbd_aio_event_reader() which > + * runs in a qemu context. > + */ > +static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) > +{ > + int ret; > + rcb->ret = rbd_aio_get_return_value(c); > + rbd_aio_release(c); > + ret = qemu_rbd_send_pipe(rcb->s, rcb); > if (ret < 0) { > - error_report("failed writing to acb->s->fds\n"); > + error_report("failed writing to acb->s->fds"); > qemu_free(rcb); > } > } > > -/* Callback when all queued rados_aio requests are complete */ > +/* Callback when all queued rbd_aio requests are complete */ > > static void rbd_aio_bh_cb(void *opaque) > { > @@ -707,9 +492,7 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > { > RBDAIOCB *acb; > RADOSCB *rcb; > - rados_completion_t c; > - char n[RBD_MAX_SEG_NAME_SIZE]; > - int64_t segnr, segoffs, segsize, last_segnr; > + rbd_completion_t c; > int64_t off, size; > char *buf; > > @@ -719,7 +502,6 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > acb->write = write; > acb->qiov = qiov; > acb->bounce = qemu_blockalign(bs, qiov->size); > - acb->aiocnt = 0; > acb->ret = 0; > acb->error = 0; > acb->s = s; > @@ -734,95 +516,81 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, > > off = sector_num * BDRV_SECTOR_SIZE; > size = nb_sectors * BDRV_SECTOR_SIZE; > - segnr = off / s->objsize; > - segoffs = off % s->objsize; > - segsize = s->objsize - segoffs; > - > - last_segnr = ((off + size - 1) / s->objsize); > - acb->aiocnt = (last_segnr - segnr) + 1; > > - s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */ > + s->qemu_aio_count++; /* All the RADOSCB */ > > - while (size > 0) { > - if (size < segsize) { > - segsize = size; > - } > - > - snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name, > - segnr); > - > - rcb = qemu_malloc(sizeof(RADOSCB)); > - rcb->done = 0; > - rcb->acb = acb; > - rcb->segsize = segsize; > - rcb->buf = buf; > - rcb->s = acb->s; > - > - if (write) { > - rados_aio_create_completion(rcb, NULL, > - (rados_callback_t) rbd_finish_aiocb, > - &c); > - rados_aio_write(s->pool, n, segoffs, buf, segsize, c); > - } else { > - rados_aio_create_completion(rcb, > - (rados_callback_t) rbd_finish_aiocb, > - NULL, &c); > - rados_aio_read(s->pool, n, segoffs, buf, segsize, c); > - } > + rcb = qemu_malloc(sizeof(RADOSCB)); > + rcb->done = 0; > + rcb->acb = acb; > + rcb->buf = buf; > + rcb->s = acb->s; > + rcb->size = size; > > - buf += segsize; > - size -= segsize; > - segoffs = 0; > - segsize = s->objsize; > - segnr++; > + if (write) { > + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); > + rbd_aio_write(s->image, off, size, buf, c); > + } else { > + rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c); > + rbd_aio_read(s->image, off, size, buf, c); > } > > return &acb->common; > } > > -static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs, > - int64_t sector_num, QEMUIOVector * qiov, > - int nb_sectors, > - BlockDriverCompletionFunc * cb, > - void *opaque) > +static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs, > + int64_t sector_num, > + QEMUIOVector *qiov, > + int nb_sectors, > + BlockDriverCompletionFunc *cb, > + void *opaque) > { > return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0); > } > > -static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs, > - int64_t sector_num, QEMUIOVector * qiov, > - int nb_sectors, > - BlockDriverCompletionFunc * cb, > - void *opaque) > +static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs, > + int64_t sector_num, > + QEMUIOVector *qiov, > + int nb_sectors, > + BlockDriverCompletionFunc *cb, > + void *opaque) > { > return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); > } > > -static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi) > +static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi) > { > BDRVRBDState *s = bs->opaque; > - bdi->cluster_size = s->objsize; > + rbd_image_info_t info; > + int r; > + > + r = rbd_stat(s->image, &info, sizeof(info)); > + if (r < 0) { > + return r; > + } > + > + bdi->cluster_size = info.obj_size; > return 0; > } > > -static int64_t rbd_getlength(BlockDriverState * bs) > +static int64_t qemu_rbd_getlength(BlockDriverState *bs) > { > BDRVRBDState *s = bs->opaque; > + rbd_image_info_t info; > + int r; > > - return s->size; > + r = rbd_stat(s->image, &info, sizeof(info)); > + if (r < 0) { > + return r; > + } > + > + return info.size; > } > > -static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) > +static int qemu_rbd_snap_create(BlockDriverState *bs, > + QEMUSnapshotInfo *sn_info) > { > BDRVRBDState *s = bs->opaque; > - char inbuf[512], outbuf[128]; > - uint64_t snap_id; > int r; > - char *p = inbuf; > - char *end = inbuf + sizeof(inbuf); > - char n[RBD_MAX_SEG_NAME_SIZE]; > - char *hbuf = NULL; > - RbdHeader1 *header; > > if (sn_info->name[0] == '\0') { > return -EINVAL; /* we need a name for rbd snapshots */ > @@ -841,185 +609,59 @@ static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info) > return -ERANGE; > } > > - r = rados_selfmanaged_snap_create(s->header_pool, &snap_id); > + r = rbd_snap_create(s->image, sn_info->name); > if (r < 0) { > - error_report("failed to create snap id: %s", strerror(-r)); > + error_report("failed to create snap: %s", strerror(-r)); > return r; > } > > - *(uint32_t *)p = strlen(sn_info->name); > - cpu_to_le32s((uint32_t *)p); > - p += sizeof(uint32_t); > - strncpy(p, sn_info->name, end - p); > - p += strlen(p); > - if (p + sizeof(snap_id) > end) { > - error_report("invalid input parameter"); > - return -EINVAL; > - } > - > - *(uint64_t *)p = snap_id; > - cpu_to_le64s((uint64_t *)p); > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - > - r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf, > - sizeof(inbuf), outbuf, sizeof(outbuf)); > - if (r < 0) { > - error_report("rbd.snap_add execution failed failed: %s", strerror(-r)); > - return r; > - } > - > - sprintf(sn_info->id_str, "%s", sn_info->name); > - > - r = rbd_read_header(s, &hbuf); > - if (r < 0) { > - error_report("failed reading header: %s", strerror(-r)); > - return r; > - } > - > - header = (RbdHeader1 *) hbuf; > - r = rbd_set_snapc(s->pool, sn_info->name, header); > - if (r < 0) { > - error_report("failed setting snap context: %s", strerror(-r)); > - goto failed; > - } > - > - return 0; > - > -failed: > - qemu_free(header); > - return r; > -} > - > -static int decode32(char **p, const char *end, uint32_t *v) > -{ > - if (*p + 4 > end) { > - return -ERANGE; > - } > - > - *v = *(uint32_t *)(*p); > - le32_to_cpus(v); > - *p += 4; > return 0; > } > > -static int decode64(char **p, const char *end, uint64_t *v) > -{ > - if (*p + 8 > end) { > - return -ERANGE; > - } > - > - *v = *(uint64_t *)(*p); > - le64_to_cpus(v); > - *p += 8; > - return 0; > -} > - > -static int decode_str(char **p, const char *end, char **s) > -{ > - uint32_t len; > - int r; > - > - if ((r = decode32(p, end, &len)) < 0) { > - return r; > - } > - > - *s = qemu_malloc(len + 1); > - memcpy(*s, *p, len); > - *p += len; > - (*s)[len] = '\0'; > - > - return len; > -} > - > -static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab) > +static int qemu_rbd_snap_list(BlockDriverState *bs, > + QEMUSnapshotInfo **psn_tab) > { > BDRVRBDState *s = bs->opaque; > - char n[RBD_MAX_SEG_NAME_SIZE]; > QEMUSnapshotInfo *sn_info, *sn_tab = NULL; > - RbdHeader1 *header; > - char *hbuf = NULL; > - char *outbuf = NULL, *end, *buf; > - uint64_t len; > - uint64_t snap_seq; > - uint32_t snap_count; > int r, i; > + rbd_snap_info_t *snaps; > + int max_snaps = 100, snap_count; I think it would be nicer to have this defined at the beginning (e.g. RBD_MAX_SNAPS). > > - /* read header to estimate how much space we need to read the snap > - * list */ > - if ((r = rbd_read_header(s, &hbuf)) < 0) { > - goto done_err; > - } > - header = (RbdHeader1 *)hbuf; > - len = le64_to_cpu(header->snap_names_len); > - len += 1024; /* should have already been enough, but new snapshots might > - already been created since we read the header. just allocate > - a bit more, so that in most cases it'll suffice anyway */ > - qemu_free(hbuf); > - > - snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX); > - while (1) { > - qemu_free(outbuf); > - outbuf = qemu_malloc(len); > - > - r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0, > - outbuf, len); > + do { > + snaps = qemu_malloc(sizeof(*snaps) * max_snaps); > + r = rbd_snap_list(s->image, snaps, &max_snaps); > if (r < 0) { > - error_report("rbd.snap_list execution failed failed: %s", strerror(-r)); > - goto done_err; > + qemu_free(snaps); > } > - if (r != len) { > - break; > - } > + } while (r == -ERANGE); > > - /* if we're here, we probably raced with some snaps creation */ > - len *= 2; > + if (r <= 0) { > + return r; > } > - buf = outbuf; > - end = buf + len; > > - if ((r = decode64(&buf, end, &snap_seq)) < 0) { > - goto done_err; > - } > - if ((r = decode32(&buf, end, &snap_count)) < 0) { > - goto done_err; > - } > + snap_count = r; This isn't really necessary. We could just use snap_count above (or r below). > > sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo)); > - for (i = 0; i < snap_count; i++) { > - uint64_t id, image_size; > - char *snap_name; > > - if ((r = decode64(&buf, end, &id)) < 0) { > - goto done_err; > - } > - if ((r = decode64(&buf, end, &image_size)) < 0) { > - goto done_err; > - } > - if ((r = decode_str(&buf, end, &snap_name)) < 0) { > - goto done_err; > - } > + for (i = 0; i < snap_count; i++) { > + const char *snap_name = snaps[i].name; > > sn_info = sn_tab + i; > pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name); > pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name); > - qemu_free(snap_name); > > - sn_info->vm_state_size = image_size; > + sn_info->vm_state_size = snaps[i].size; > sn_info->date_sec = 0; > sn_info->date_nsec = 0; > sn_info->vm_clock_nsec = 0; > } > + rbd_snap_list_end(snaps); > + > *psn_tab = sn_tab; > - qemu_free(outbuf); > return snap_count; > -done_err: > - qemu_free(sn_tab); > - qemu_free(outbuf); > - return r; > } > > -static QEMUOptionParameter rbd_create_options[] = { > +static QEMUOptionParameter qemu_rbd_create_options[] = { > { > .name = BLOCK_OPT_SIZE, > .type = OPT_SIZE, > @@ -1036,19 +678,19 @@ static QEMUOptionParameter rbd_create_options[] = { > static BlockDriver bdrv_rbd = { > .format_name = "rbd", > .instance_size = sizeof(BDRVRBDState), > - .bdrv_file_open = rbd_open, > - .bdrv_close = rbd_close, > - .bdrv_create = rbd_create, > - .bdrv_get_info = rbd_getinfo, > - .create_options = rbd_create_options, > - .bdrv_getlength = rbd_getlength, > + .bdrv_file_open = qemu_rbd_open, > + .bdrv_close = qemu_rbd_close, > + .bdrv_create = qemu_rbd_create, > + .bdrv_get_info = qemu_rbd_getinfo, > + .create_options = qemu_rbd_create_options, > + .bdrv_getlength = qemu_rbd_getlength, > .protocol_name = "rbd", > > - .bdrv_aio_readv = rbd_aio_readv, > - .bdrv_aio_writev = rbd_aio_writev, > + .bdrv_aio_readv = qemu_rbd_aio_readv, > + .bdrv_aio_writev = qemu_rbd_aio_writev, > > - .bdrv_snapshot_create = rbd_snap_create, > - .bdrv_snapshot_list = rbd_snap_list, > + .bdrv_snapshot_create = qemu_rbd_snap_create, > + .bdrv_snapshot_list = qemu_rbd_snap_list, > }; > > static void bdrv_rbd_init(void) > diff --git a/block/rbd_types.h b/block/rbd_types.h > deleted file mode 100644 > index f4cca99..0000000 > --- a/block/rbd_types.h > +++ /dev/null > @@ -1,71 +0,0 @@ > -/* > - * Ceph - scalable distributed file system > - * > - * Copyright (C) 2004-2010 Sage Weil <sage@xxxxxxxxxxxx> > - * > - * This is free software; you can redistribute it and/or > - * modify it under the terms of the GNU Lesser General Public > - * License version 2.1, as published by the Free Software > - * Foundation. See file COPYING.LIB. > - * > - */ > - > -#ifndef CEPH_RBD_TYPES_H > -#define CEPH_RBD_TYPES_H > - > - > -/* > - * rbd image 'foo' consists of objects > - * foo.rbd - image metadata > - * foo.00000000 > - * foo.00000001 > - * ... - data > - */ > - > -#define RBD_SUFFIX ".rbd" > -#define RBD_DIRECTORY "rbd_directory" > -#define RBD_INFO "rbd_info" > - > -#define RBD_DEFAULT_OBJ_ORDER 22 /* 4MB */ > - > -#define RBD_MAX_OBJ_NAME_SIZE 96 > -#define RBD_MAX_BLOCK_NAME_SIZE 24 > -#define RBD_MAX_SEG_NAME_SIZE 128 > - > -#define RBD_COMP_NONE 0 > -#define RBD_CRYPT_NONE 0 > - > -#define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n" > -#define RBD_HEADER_SIGNATURE "RBD" > -#define RBD_HEADER_VERSION "001.005" > - > -struct rbd_info { > - uint64_t max_id; > -} __attribute__ ((packed)); > - > -struct rbd_obj_snap_ondisk { > - uint64_t id; > - uint64_t image_size; > -} __attribute__((packed)); > - > -struct rbd_obj_header_ondisk { > - char text[40]; > - char block_name[RBD_MAX_BLOCK_NAME_SIZE]; > - char signature[4]; > - char version[8]; > - struct { > - uint8_t order; > - uint8_t crypt_type; > - uint8_t comp_type; > - uint8_t unused; > - } __attribute__((packed)) options; > - uint64_t image_size; > - uint64_t snap_seq; > - uint32_t snap_count; > - uint32_t reserved; > - uint64_t snap_names_len; > - struct rbd_obj_snap_ondisk snaps[0]; > -} __attribute__((packed)); > - > - > -#endif > diff --git a/configure b/configure > index a318d37..378c238 100755 > --- a/configure > +++ b/configure > @@ -1917,41 +1917,24 @@ fi > if test "$rbd" != "no" ; then > cat > $TMPC <<EOF > #include <stdio.h> > -#include <rados/librados.h> > -int main(void) { rados_initialize(0, NULL); return 0; } > -EOF > - rbd_libs="-lrados" > - if compile_prog "" "$rbd_libs" ; then > - librados_too_old=no > - cat > $TMPC <<EOF > -#include <stdio.h> > -#include <rados/librados.h> > -#ifndef CEPH_OSD_TMAP_SET > -#error missing CEPH_OSD_TMAP_SET > -#endif > +#include <rbd/librbd.h> > int main(void) { > - int (*func)(const rados_pool_t pool, uint64_t *snapid) = rados_selfmanaged_snap_create; > - rados_initialize(0, NULL); > + rados_t cluster; > + rados_create(&cluster, NULL); > return 0; > } > EOF > - if compile_prog "" "$rbd_libs" ; then > - rbd=yes > - libs_tools="$rbd_libs $libs_tools" > - libs_softmmu="$rbd_libs $libs_softmmu" > - else > - rbd=no > - librados_too_old=yes > - fi > + rbd_libs="-lrbd -lrados" > + if compile_prog "" "$rbd_libs" ; then > + rbd=yes > + libs_tools="$rbd_libs $libs_tools" > + libs_softmmu="$rbd_libs $libs_softmmu" > else > if test "$rbd" = "yes" ; then > feature_not_found "rados block device" > fi > rbd=no > fi > - if test "$librados_too_old" = "yes" ; then > - echo "-> Your librados version is too old - upgrade needed to have rbd support" > - fi > fi > > ########################################## > -- > 1.7.2.3 > > > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html