Re: [PATCH 19/20] rbd: support for object-map and fast-diff

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Jun 25, 2019 at 10:42 AM Ilya Dryomov <idryomov@xxxxxxxxx> wrote:
>
> Speed up reads, discards and zeroouts through RBD_OBJ_FLAG_MAY_EXIST
> and RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT based on object map.
>
> Invalid object maps are not trusted, but still updated.  Note that we
> never iterate, resize or invalidate object maps.  If object-map feature
> is enabled but object map fails to load, we just fail the requester
> (either "rbd map" or I/O, by way of post-acquire action).
>
> Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx>
> ---
>  drivers/block/rbd.c                  | 721 ++++++++++++++++++++++++++-
>  drivers/block/rbd_types.h            |  10 +
>  include/linux/ceph/cls_lock_client.h |   3 +
>  include/linux/ceph/striper.h         |   2 +
>  net/ceph/cls_lock_client.c           |  45 ++
>  net/ceph/striper.c                   |  17 +
>  6 files changed, 795 insertions(+), 3 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 671041b67957..756595f5fbc9 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -115,6 +115,8 @@ static int atomic_dec_return_safe(atomic_t *v)
>  #define RBD_FEATURE_LAYERING           (1ULL<<0)
>  #define RBD_FEATURE_STRIPINGV2         (1ULL<<1)
>  #define RBD_FEATURE_EXCLUSIVE_LOCK     (1ULL<<2)
> +#define RBD_FEATURE_OBJECT_MAP         (1ULL<<3)
> +#define RBD_FEATURE_FAST_DIFF          (1ULL<<4)
>  #define RBD_FEATURE_DEEP_FLATTEN       (1ULL<<5)
>  #define RBD_FEATURE_DATA_POOL          (1ULL<<7)
>  #define RBD_FEATURE_OPERATIONS         (1ULL<<8)
> @@ -122,6 +124,8 @@ static int atomic_dec_return_safe(atomic_t *v)
>  #define RBD_FEATURES_ALL       (RBD_FEATURE_LAYERING |         \
>                                  RBD_FEATURE_STRIPINGV2 |       \
>                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
> +                                RBD_FEATURE_OBJECT_MAP |       \
> +                                RBD_FEATURE_FAST_DIFF |        \
>                                  RBD_FEATURE_DEEP_FLATTEN |     \
>                                  RBD_FEATURE_DATA_POOL |        \
>                                  RBD_FEATURE_OPERATIONS)
> @@ -227,6 +231,8 @@ enum obj_operation_type {
>  #define RBD_OBJ_FLAG_DELETION                  (1U << 0)
>  #define RBD_OBJ_FLAG_COPYUP_ENABLED            (1U << 1)
>  #define RBD_OBJ_FLAG_COPYUP_ZEROS              (1U << 2)
> +#define RBD_OBJ_FLAG_MAY_EXIST                 (1U << 3)
> +#define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT      (1U << 4)
>
>  enum rbd_obj_read_state {
>         RBD_OBJ_READ_START = 1,
> @@ -261,14 +267,18 @@ enum rbd_obj_read_state {
>   */
>  enum rbd_obj_write_state {
>         RBD_OBJ_WRITE_START = 1,
> +       RBD_OBJ_WRITE_PRE_OBJECT_MAP,
>         RBD_OBJ_WRITE_OBJECT,
>         __RBD_OBJ_WRITE_COPYUP,
>         RBD_OBJ_WRITE_COPYUP,
> +       RBD_OBJ_WRITE_POST_OBJECT_MAP,
>  };
>
>  enum rbd_obj_copyup_state {
>         RBD_OBJ_COPYUP_START = 1,
>         RBD_OBJ_COPYUP_READ_PARENT,
> +       __RBD_OBJ_COPYUP_OBJECT_MAPS,
> +       RBD_OBJ_COPYUP_OBJECT_MAPS,
>         __RBD_OBJ_COPYUP_WRITE_OBJECT,
>         RBD_OBJ_COPYUP_WRITE_OBJECT,
>  };
> @@ -419,6 +429,11 @@ struct rbd_device {
>         int                     acquire_err;
>         struct completion       releasing_wait;
>
> +       spinlock_t              object_map_lock;
> +       u8                      *object_map;
> +       u64                     object_map_size;        /* in objects */
> +       u64                     object_map_flags;
> +
>         struct workqueue_struct *task_wq;
>
>         struct rbd_spec         *parent_spec;
> @@ -620,6 +635,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
>                                 u8 *order, u64 *snap_size);
>  static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
>                 u64 *snap_features);
> +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
>
>  static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
>  static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
> @@ -1768,6 +1784,467 @@ static void rbd_img_request_destroy(struct kref *kref)
>         kmem_cache_free(rbd_img_request_cache, img_request);
>  }
>
> +#define BITS_PER_OBJ   2
> +#define OBJS_PER_BYTE  (BITS_PER_BYTE / BITS_PER_OBJ)
> +#define OBJ_MASK       ((1 << BITS_PER_OBJ) - 1)
> +
> +static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
> +                                  u64 *index, u8 *shift)
> +{
> +       u32 off;
> +
> +       rbd_assert(objno < rbd_dev->object_map_size);
> +       *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
> +       *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
> +}
> +
> +static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
> +{
> +       u64 index;
> +       u8 shift;
> +
> +       lockdep_assert_held(&rbd_dev->object_map_lock);
> +       __rbd_object_map_index(rbd_dev, objno, &index, &shift);
> +       return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
> +}
> +
> +static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
> +{
> +       u64 index;
> +       u8 shift;
> +       u8 *p;
> +
> +       lockdep_assert_held(&rbd_dev->object_map_lock);
> +       rbd_assert(!(val & ~OBJ_MASK));
> +
> +       __rbd_object_map_index(rbd_dev, objno, &index, &shift);
> +       p = &rbd_dev->object_map[index];
> +       *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
> +}
> +
> +static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
> +{
> +       u8 state;
> +
> +       spin_lock(&rbd_dev->object_map_lock);
> +       state = __rbd_object_map_get(rbd_dev, objno);
> +       spin_unlock(&rbd_dev->object_map_lock);
> +       return state;
> +}
> +
> +static bool use_object_map(struct rbd_device *rbd_dev)
> +{
> +       return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
> +               !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
> +}
> +
> +static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
> +{
> +       u8 state;
> +
> +       /* fall back to default logic if object map is disabled or invalid */
> +       if (!use_object_map(rbd_dev))
> +               return true;
> +
> +       state = rbd_object_map_get(rbd_dev, objno);
> +       return state != OBJECT_NONEXISTENT;
> +}
> +
> +static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
> +                               struct ceph_object_id *oid)
> +{
> +       if (snap_id == CEPH_NOSNAP)
> +               ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
> +                               rbd_dev->spec->image_id);
> +       else
> +               ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
> +                               rbd_dev->spec->image_id, snap_id);
> +}
> +
> +static int rbd_object_map_lock(struct rbd_device *rbd_dev)
> +{
> +       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> +       CEPH_DEFINE_OID_ONSTACK(oid);
> +       u8 lock_type;
> +       char *lock_tag;
> +       struct ceph_locker *lockers;
> +       u32 num_lockers;
> +       bool broke_lock = false;
> +       int ret;
> +
> +       rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
> +
> +again:
> +       ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
> +                           CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
> +       if (ret != -EBUSY || broke_lock) {
> +               if (ret == -EEXIST)
> +                       ret = 0; /* already locked by myself */
> +               if (ret)
> +                       rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
> +               return ret;
> +
> +       }
> +
> +       ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
> +                                RBD_LOCK_NAME, &lock_type, &lock_tag,
> +                                &lockers, &num_lockers);
> +       if (ret) {
> +               if (ret == -ENOENT)
> +                       goto again;
> +
> +               rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
> +               return ret;
> +       }
> +
> +       kfree(lock_tag);
> +       if (num_lockers == 0)
> +               goto again;
> +
> +       rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
> +                ENTITY_NAME(lockers[0].id.name));
> +
> +       ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
> +                                 RBD_LOCK_NAME, lockers[0].id.cookie,
> +                                 &lockers[0].id.name);
> +       ceph_free_lockers(lockers, num_lockers);
> +       if (ret) {
> +               if (ret == -ENOENT)
> +                       goto again;
> +
> +               rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
> +               return ret;
> +       }
> +
> +       broke_lock = true;
> +       goto again;
> +}
> +
> +static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
> +{
> +       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> +       CEPH_DEFINE_OID_ONSTACK(oid);
> +       int ret;
> +
> +       rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
> +
> +       ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
> +                             "");
> +       if (ret && ret != -ENOENT)
> +               rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
> +}
> +
> +static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
> +{
> +       u8 struct_v;
> +       u32 struct_len;
> +       u32 header_len;
> +       void *header_end;
> +       int ret;
> +
> +       ceph_decode_32_safe(p, end, header_len, e_inval);
> +       header_end = *p + header_len;
> +
> +       ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
> +                                 &struct_len);
> +       if (ret)
> +               return ret;
> +
> +       ceph_decode_64_safe(p, end, *object_map_size, e_inval);
> +
> +       *p = header_end;
> +       return 0;
> +
> +e_inval:
> +       return -EINVAL;
> +}
> +
> +static int __rbd_object_map_load(struct rbd_device *rbd_dev)
> +{
> +       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> +       CEPH_DEFINE_OID_ONSTACK(oid);
> +       struct page **pages;
> +       void *p, *end;
> +       size_t reply_len;
> +       u64 num_objects;
> +       u64 object_map_bytes;
> +       u64 object_map_size;
> +       int num_pages;
> +       int ret;
> +
> +       rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
> +
> +       num_objects = ceph_get_num_objects(&rbd_dev->layout,
> +                                          rbd_dev->mapping.size);
> +       object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
> +                                           BITS_PER_BYTE);
> +       num_pages = calc_pages_for(0, object_map_bytes) + 1;
> +       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
> +       if (IS_ERR(pages))
> +               return PTR_ERR(pages);
> +
> +       reply_len = num_pages * PAGE_SIZE;
> +       rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
> +       ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
> +                            "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
> +                            NULL, 0, pages, &reply_len);
> +       if (ret)
> +               goto out;
> +
> +       p = page_address(pages[0]);
> +       end = p + min(reply_len, (size_t)PAGE_SIZE);
> +       ret = decode_object_map_header(&p, end, &object_map_size);
> +       if (ret)
> +               goto out;
> +
> +       if (object_map_size != num_objects) {
> +               rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
> +                        object_map_size, num_objects);
> +               ret = -EINVAL;
> +               goto out;
> +       }
> +
> +       if (offset_in_page(p) + object_map_bytes > reply_len) {
> +               ret = -EINVAL;
> +               goto out;
> +       }
> +
> +       rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
> +       if (!rbd_dev->object_map) {
> +               ret = -ENOMEM;
> +               goto out;
> +       }
> +
> +       rbd_dev->object_map_size = object_map_size;
> +       ceph_copy_from_page_vector(pages, rbd_dev->object_map,
> +                                  offset_in_page(p), object_map_bytes);
> +
> +out:
> +       ceph_release_page_vector(pages, num_pages);
> +       return ret;
> +}
> +
> +static void rbd_object_map_free(struct rbd_device *rbd_dev)
> +{
> +       kvfree(rbd_dev->object_map);
> +       rbd_dev->object_map = NULL;
> +       rbd_dev->object_map_size = 0;
> +}
> +
> +static int rbd_object_map_load(struct rbd_device *rbd_dev)
> +{
> +       int ret;
> +
> +       ret = __rbd_object_map_load(rbd_dev);
> +       if (ret)
> +               return ret;
> +
> +       ret = rbd_dev_v2_get_flags(rbd_dev);
> +       if (ret) {
> +               rbd_object_map_free(rbd_dev);
> +               return ret;
> +       }
> +
> +       if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
> +               rbd_warn(rbd_dev, "object map is invalid");
> +
> +       return 0;
> +}
> +
> +static int rbd_object_map_open(struct rbd_device *rbd_dev)
> +{
> +       int ret;
> +
> +       ret = rbd_object_map_lock(rbd_dev);

Only lock/unlock if rbd_dev->spec.snap_id == CEPH_NOSNAP?

> +       if (ret)
> +               return ret;
> +
> +       ret = rbd_object_map_load(rbd_dev);
> +       if (ret) {
> +               rbd_object_map_unlock(rbd_dev);
> +               return ret;
> +       }
> +
> +       return 0;
> +}
> +
> +static void rbd_object_map_close(struct rbd_device *rbd_dev)
> +{
> +       rbd_object_map_free(rbd_dev);
> +       rbd_object_map_unlock(rbd_dev);
> +}
> +
> +/*
> + * This function needs snap_id (or more precisely just something to
> + * distinguish between HEAD and snapshot object maps), new_state and
> + * current_state that were passed to rbd_object_map_update().
> + *
> + * To avoid allocating and stashing a context we piggyback on the OSD
> + * request.  A HEAD update has two ops (assert_locked).  For new_state
> + * and current_state we decode our own object_map_update op, encoded in
> + * rbd_cls_object_map_update().

Decoding the OSD request seems a little awkward. Since you would only
update the in-memory state for the HEAD revision, could you just stash
these fields in the "rbd_object_request" struct? Then in
"rbd_object_map_update", set the callback to either a
"rbd_object_map_snapshot_callback" callback or
"rbd_object_map_head_callback".

> + */
> +static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
> +                                       struct ceph_osd_request *osd_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       struct ceph_osd_data *osd_data;
> +       u64 objno;
> +       u8 state, new_state, current_state;
> +       bool has_current_state;
> +       void *p;
> +
> +       if (osd_req->r_result)
> +               return osd_req->r_result;
> +
> +       /*
> +        * Nothing to do for a snapshot object map.
> +        */
> +       if (osd_req->r_num_ops == 1)
> +               return 0;
> +
> +       /*
> +        * Update in-memory HEAD object map.
> +        */
> +       rbd_assert(osd_req->r_num_ops == 2);
> +       osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
> +       rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
> +
> +       p = page_address(osd_data->pages[0]);
> +       objno = ceph_decode_64(&p);
> +       rbd_assert(objno == obj_req->ex.oe_objno);
> +       rbd_assert(ceph_decode_64(&p) == objno + 1);
> +       new_state = ceph_decode_8(&p);
> +       has_current_state = ceph_decode_8(&p);
> +       if (has_current_state)
> +               current_state = ceph_decode_8(&p);
> +
> +       spin_lock(&rbd_dev->object_map_lock);
> +       state = __rbd_object_map_get(rbd_dev, objno);
> +       if (!has_current_state || current_state == state ||
> +           (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
> +               __rbd_object_map_set(rbd_dev, objno, new_state);
> +       spin_unlock(&rbd_dev->object_map_lock);
> +
> +       return 0;
> +}
> +
> +static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
> +{
> +       struct rbd_obj_request *obj_req = osd_req->r_priv;
> +       int result;
> +
> +       dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
> +            osd_req->r_result, obj_req);
> +
> +       result = rbd_object_map_update_finish(obj_req, osd_req);
> +       rbd_obj_handle_request(obj_req, result);
> +}
> +
> +static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
> +{
> +       u8 state = rbd_object_map_get(rbd_dev, objno);
> +
> +       if (state == new_state ||
> +           (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
> +           (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
> +               return false;
> +
> +       return true;
> +}
> +
> +static int rbd_cls_object_map_update(struct ceph_osd_request *req,
> +                                    int which, u64 objno, u8 new_state,
> +                                    const u8 *current_state)
> +{
> +       struct page **pages;
> +       void *p, *start;
> +       int ret;
> +
> +       ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
> +       if (ret)
> +               return ret;
> +
> +       pages = ceph_alloc_page_vector(1, GFP_NOIO);
> +       if (IS_ERR(pages))
> +               return PTR_ERR(pages);
> +
> +       p = start = page_address(pages[0]);
> +       ceph_encode_64(&p, objno);
> +       ceph_encode_64(&p, objno + 1);
> +       ceph_encode_8(&p, new_state);
> +       if (current_state) {
> +               ceph_encode_8(&p, 1);
> +               ceph_encode_8(&p, *current_state);
> +       } else {
> +               ceph_encode_8(&p, 0);
> +       }
> +
> +       osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
> +                                         false, true);
> +       return 0;
> +}
> +
> +/*
> + * Return:
> + *   0 - object map update sent
> + *   1 - object map update isn't needed
> + *  <0 - error
> + */
> +static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
> +                                u8 new_state, const u8 *current_state)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> +       struct ceph_osd_request *req;
> +       int num_ops = 1;
> +       int which = 0;
> +       int ret;
> +
> +       if (snap_id == CEPH_NOSNAP) {
> +               if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
> +                       return 1;
> +
> +               num_ops++; /* assert_locked */
> +       }
> +
> +       req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
> +       if (!req)
> +               return -ENOMEM;
> +
> +       list_add_tail(&req->r_unsafe_item, &obj_req->osd_reqs);
> +       req->r_callback = rbd_object_map_callback;
> +       req->r_priv = obj_req;
> +
> +       rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
> +       ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
> +       req->r_flags = CEPH_OSD_FLAG_WRITE;
> +       ktime_get_real_ts64(&req->r_mtime);
> +
> +       if (snap_id == CEPH_NOSNAP) {
> +               /*
> +                * Protect against possible race conditions during lock
> +                * ownership transitions.
> +                */
> +               ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
> +                                            CEPH_CLS_LOCK_EXCLUSIVE, "", "");
> +               if (ret)
> +                       return ret;
> +       }
> +
> +       ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
> +                                       new_state, current_state);
> +       if (ret)
> +               return ret;
> +
> +       ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
> +       if (ret)
> +               return ret;
> +
> +       ceph_osdc_start_request(osdc, req, false);
> +       return 0;
> +}
> +
>  static void prune_extents(struct ceph_file_extent *img_extents,
>                           u32 *num_img_extents, u64 overlap)
>  {
> @@ -1975,6 +2452,7 @@ static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
>         if (ret)
>                 return ret;
>
> +       obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
>         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
>                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
>
> @@ -2022,6 +2500,7 @@ static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
>         if (rbd_obj_copyup_enabled(obj_req))
>                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
>         if (!obj_req->num_img_extents) {
> +               obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
>                 if (rbd_obj_is_entire(obj_req))
>                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
>         }
> @@ -2407,6 +2886,20 @@ static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
>         queue_work(rbd_wq, &img_req->work);
>  }
>
> +static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +
> +       if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
> +               obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
> +               return true;
> +       }
> +
> +       dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
> +            obj_req->ex.oe_objno);
> +       return false;
> +}
> +
>  static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
>  {
>         struct ceph_osd_request *osd_req;
> @@ -2482,10 +2975,17 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
>         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>         int ret;
>
> +again:
>         switch (obj_req->read_state) {
>         case RBD_OBJ_READ_START:
>                 rbd_assert(!*result);
>
> +               if (!rbd_obj_may_exist(obj_req)) {
> +                       *result = -ENOENT;
> +                       obj_req->read_state = RBD_OBJ_READ_OBJECT;
> +                       goto again;
> +               }
> +
>                 ret = rbd_obj_read_object(obj_req);
>                 if (ret) {
>                         *result = ret;
> @@ -2536,6 +3036,44 @@ static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
>         }
>  }
>
> +static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +
> +       if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
> +               obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
> +
> +       if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
> +           (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
> +               dout("%s %p noop for nonexistent\n", __func__, obj_req);
> +               return true;
> +       }
> +
> +       return false;
> +}
> +
> +/*
> + * Return:
> + *   0 - object map update sent
> + *   1 - object map update isn't needed
> + *  <0 - error
> + */
> +static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       u8 new_state;
> +
> +       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
> +               return 1;
> +
> +       if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
> +               new_state = OBJECT_PENDING;
> +       else
> +               new_state = OBJECT_EXISTS;
> +
> +       return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
> +}
> +
>  static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
>  {
>         struct ceph_osd_request *osd_req;
> @@ -2706,6 +3244,41 @@ static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
>         return rbd_obj_read_from_parent(obj_req);
>  }
>
> +static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       struct ceph_snap_context *snapc = obj_req->img_request->snapc;
> +       u8 new_state;
> +       u32 i;
> +       int ret;
> +
> +       rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
> +
> +       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
> +               return;
> +
> +       if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
> +               return;
> +
> +       for (i = 0; i < snapc->num_snaps; i++) {
> +               if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
> +                   i + 1 < snapc->num_snaps)
> +                       new_state = OBJECT_EXISTS_CLEAN;
> +               else
> +                       new_state = OBJECT_EXISTS;
> +
> +               ret = rbd_object_map_update(obj_req, snapc->snaps[i],
> +                                           new_state, NULL);
> +               if (ret < 0) {
> +                       obj_req->pending.result = ret;
> +                       return;
> +               }
> +
> +               rbd_assert(!ret);
> +               obj_req->pending.num_pending++;
> +       }
> +}
> +
>  static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
>  {
>         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
> @@ -2749,6 +3322,7 @@ static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
>
>  static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
>  {
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>         int ret;
>
>  again:
> @@ -2776,6 +3350,25 @@ static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
>                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
>                 }
>
> +               rbd_obj_copyup_object_maps(obj_req);
> +               if (!obj_req->pending.num_pending) {
> +                       *result = obj_req->pending.result;
> +                       obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
> +                       goto again;
> +               }
> +               obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
> +               return false;
> +       case __RBD_OBJ_COPYUP_OBJECT_MAPS:
> +               if (!pending_result_dec(&obj_req->pending, result))
> +                       return false;
> +               /* fall through */
> +       case RBD_OBJ_COPYUP_OBJECT_MAPS:
> +               if (*result) {
> +                       rbd_warn(rbd_dev, "snap object map update failed: %d",
> +                                *result);
> +                       return true;
> +               }
> +
>                 rbd_obj_copyup_write_object(obj_req);
>                 if (!obj_req->pending.num_pending) {
>                         *result = obj_req->pending.result;
> @@ -2795,6 +3388,27 @@ static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
>         }
>  }
>
> +/*
> + * Return:
> + *   0 - object map update sent
> + *   1 - object map update isn't needed
> + *  <0 - error
> + */
> +static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
> +{
> +       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       u8 current_state = OBJECT_PENDING;
> +
> +       if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
> +               return 1;
> +
> +       if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
> +               return 1;
> +
> +       return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
> +                                    &current_state);
> +}
> +
>  static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
>  {
>         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> @@ -2805,6 +3419,24 @@ static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
>         case RBD_OBJ_WRITE_START:
>                 rbd_assert(!*result);
>
> +               if (rbd_obj_write_is_noop(obj_req))
> +                       return true;

Does this properly handle the case where it has a parent overlap? If
the child object doesn't exist, we would still want to perform the
copyup (if required), correct?

> +               ret = rbd_obj_write_pre_object_map(obj_req);
> +               if (ret < 0) {
> +                       *result = ret;
> +                       return true;
> +               }
> +               obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
> +               if (ret > 0)
> +                       goto again;
> +               return false;
> +       case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
> +               if (*result) {
> +                       rbd_warn(rbd_dev, "pre object map update failed: %d",
> +                                *result);
> +                       return true;
> +               }
>                 ret = rbd_obj_write_object(obj_req);
>                 if (ret) {
>                         *result = ret;
> @@ -2837,8 +3469,23 @@ static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
>                         return false;
>                 /* fall through */
>         case RBD_OBJ_WRITE_COPYUP:
> -               if (*result)
> +               if (*result) {
>                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
> +                       return true;
> +               }
> +               ret = rbd_obj_write_post_object_map(obj_req);
> +               if (ret < 0) {
> +                       *result = ret;
> +                       return true;
> +               }
> +               obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
> +               if (ret > 0)
> +                       goto again;
> +               return false;
> +       case RBD_OBJ_WRITE_POST_OBJECT_MAP:
> +               if (*result)
> +                       rbd_warn(rbd_dev, "post object map update failed: %d",
> +                                *result);
>                 return true;
>         default:
>                 BUG();
> @@ -2892,7 +3539,8 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req)
>                 return false;
>
>         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
> -       if (rbd_dev->opts->lock_on_read)
> +       if (rbd_dev->opts->lock_on_read ||
> +           (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
>                 return true;
>
>         return rbd_img_is_write(img_req);
> @@ -3427,7 +4075,7 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
>                 if (ret)
>                         goto out; /* request lock or error */
>
> -               rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
> +               rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
>                          ENTITY_NAME(lockers[0].id.name));
>
>                 ret = ceph_monc_blacklist_add(&client->monc,
> @@ -3454,6 +4102,19 @@ static int rbd_try_lock(struct rbd_device *rbd_dev)
>         return ret;
>  }
>
> +static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
> +{
> +       int ret;
> +
> +       if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
> +               ret = rbd_object_map_open(rbd_dev);
> +               if (ret)
> +                       return ret;
> +       }
> +
> +       return 0;
> +}
> +
>  /*
>   * Return:
>   *   0 - lock acquired
> @@ -3497,6 +4158,17 @@ static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
>         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
>         rbd_assert(list_empty(&rbd_dev->running_list));
>
> +       ret = rbd_post_acquire_action(rbd_dev);
> +       if (ret) {
> +               rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
> +               /*
> +                * Can't stay in RBD_LOCK_STATE_LOCKED because
> +                * rbd_lock_add_request() would let the request through,
> +                * assuming that e.g. object map is locked and loaded.
> +                */
> +               rbd_unlock(rbd_dev);
> +       }
> +
>  out:
>         wake_requests(rbd_dev, ret);
>         up_write(&rbd_dev->lock_rwsem);
> @@ -3570,10 +4242,17 @@ static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
>         return true;
>  }
>
> +static void rbd_pre_release_action(struct rbd_device *rbd_dev)
> +{
> +       if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
> +               rbd_object_map_close(rbd_dev);
> +}
> +
>  static void __rbd_release_lock(struct rbd_device *rbd_dev)
>  {
>         rbd_assert(list_empty(&rbd_dev->running_list));
>
> +       rbd_pre_release_action(rbd_dev);
>         rbd_unlock(rbd_dev);
>  }
>
> @@ -4860,6 +5539,8 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
>         init_completion(&rbd_dev->acquire_wait);
>         init_completion(&rbd_dev->releasing_wait);
>
> +       spin_lock_init(&rbd_dev->object_map_lock);
> +
>         rbd_dev->dev.bus = &rbd_bus_type;
>         rbd_dev->dev.type = &rbd_device_type;
>         rbd_dev->dev.parent = &rbd_root_dev;
> @@ -5041,6 +5722,32 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
>                                                 &rbd_dev->header.features);
>  }
>
> +/*
> + * These are generic image flags, but since they are used only for
> + * object map, store them in rbd_dev->object_map_flags.
> + *
> + * For the same reason, this function is called only on object map
> + * (re)load and not on header refresh.
> + */
> +static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
> +{
> +       __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
> +       __le64 flags;
> +       int ret;
> +
> +       ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
> +                                 &rbd_dev->header_oloc, "get_flags",
> +                                 &snapid, sizeof(snapid),
> +                                 &flags, sizeof(flags));
> +       if (ret < 0)
> +               return ret;
> +       if (ret < sizeof(flags))
> +               return -EBADMSG;
> +
> +       rbd_dev->object_map_flags = le64_to_cpu(flags);
> +       return 0;
> +}
> +
>  struct parent_image_info {
>         u64             pool_id;
>         const char      *pool_ns;
> @@ -6014,6 +6721,7 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
>         struct rbd_image_header *header;
>
>         rbd_dev_parent_put(rbd_dev);
> +       rbd_object_map_free(rbd_dev);
>         rbd_dev_mapping_clear(rbd_dev);
>
>         /* Free dynamic fields from the header, then zero it out */
> @@ -6263,6 +6971,13 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
>         if (ret)
>                 goto err_out_probe;
>
> +       if (rbd_dev->spec->snap_id != CEPH_NOSNAP &&
> +           (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
> +               ret = rbd_object_map_load(rbd_dev);
> +               if (ret)
> +                       goto err_out_probe;
> +       }
> +
>         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
>                 ret = rbd_dev_v2_parent_info(rbd_dev);
>                 if (ret)
> diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
> index 62ff50d3e7a6..ac98ab6ccd3b 100644
> --- a/drivers/block/rbd_types.h
> +++ b/drivers/block/rbd_types.h
> @@ -18,6 +18,7 @@
>  /* For format version 2, rbd image 'foo' consists of objects
>   *   rbd_id.foo                - id of image
>   *   rbd_header.<id>   - image metadata
> + *   rbd_object_map.<id> - optional image object map
>   *   rbd_data.<id>.0000000000000000
>   *   rbd_data.<id>.0000000000000001
>   *   ...               - data
> @@ -25,6 +26,7 @@
>   */
>
>  #define RBD_HEADER_PREFIX      "rbd_header."
> +#define RBD_OBJECT_MAP_PREFIX  "rbd_object_map."
>  #define RBD_ID_PREFIX          "rbd_id."
>  #define RBD_V2_DATA_FORMAT     "%s.%016llx"
>
> @@ -39,6 +41,14 @@ enum rbd_notify_op {
>         RBD_NOTIFY_OP_HEADER_UPDATE      = 3,
>  };
>
> +#define OBJECT_NONEXISTENT     0
> +#define OBJECT_EXISTS          1
> +#define OBJECT_PENDING         2
> +#define OBJECT_EXISTS_CLEAN    3
> +
> +#define RBD_FLAG_OBJECT_MAP_INVALID    (1ULL << 0)
> +#define RBD_FLAG_FAST_DIFF_INVALID     (1ULL << 1)
> +
>  /*
>   * For format version 1, rbd image 'foo' consists of objects
>   *   foo.rbd           - image metadata
> diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
> index bea6c77d2093..17bc7584d1fe 100644
> --- a/include/linux/ceph/cls_lock_client.h
> +++ b/include/linux/ceph/cls_lock_client.h
> @@ -52,4 +52,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
>                        char *lock_name, u8 *type, char **tag,
>                        struct ceph_locker **lockers, u32 *num_lockers);
>
> +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
> +                          char *lock_name, u8 type, char *cookie, char *tag);
> +
>  #endif
> diff --git a/include/linux/ceph/striper.h b/include/linux/ceph/striper.h
> index cbd0d24b7148..3486636c0e6e 100644
> --- a/include/linux/ceph/striper.h
> +++ b/include/linux/ceph/striper.h
> @@ -66,4 +66,6 @@ int ceph_extent_to_file(struct ceph_file_layout *l,
>                         struct ceph_file_extent **file_extents,
>                         u32 *num_file_extents);
>
> +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size);
> +
>  #endif
> diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
> index 56bbfe01e3ac..99cce6f3ec48 100644
> --- a/net/ceph/cls_lock_client.c
> +++ b/net/ceph/cls_lock_client.c
> @@ -6,6 +6,7 @@
>
>  #include <linux/ceph/cls_lock_client.h>
>  #include <linux/ceph/decode.h>
> +#include <linux/ceph/libceph.h>
>
>  /**
>   * ceph_cls_lock - grab rados lock for object
> @@ -375,3 +376,47 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
>         return ret;
>  }
>  EXPORT_SYMBOL(ceph_cls_lock_info);
> +
> +int ceph_cls_assert_locked(struct ceph_osd_request *req, int which,
> +                          char *lock_name, u8 type, char *cookie, char *tag)
> +{
> +       int assert_op_buf_size;
> +       int name_len = strlen(lock_name);
> +       int cookie_len = strlen(cookie);
> +       int tag_len = strlen(tag);
> +       struct page **pages;
> +       void *p, *end;
> +       int ret;
> +
> +       assert_op_buf_size = name_len + sizeof(__le32) +
> +                            cookie_len + sizeof(__le32) +
> +                            tag_len + sizeof(__le32) +
> +                            sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
> +       if (assert_op_buf_size > PAGE_SIZE)
> +               return -E2BIG;
> +
> +       ret = osd_req_op_cls_init(req, which, "lock", "assert_locked");
> +       if (ret)
> +               return ret;
> +
> +       pages = ceph_alloc_page_vector(1, GFP_NOIO);
> +       if (IS_ERR(pages))
> +               return PTR_ERR(pages);
> +
> +       p = page_address(pages[0]);
> +       end = p + assert_op_buf_size;
> +
> +       /* encode cls_lock_assert_op struct */
> +       ceph_start_encoding(&p, 1, 1,
> +                           assert_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
> +       ceph_encode_string(&p, end, lock_name, name_len);
> +       ceph_encode_8(&p, type);
> +       ceph_encode_string(&p, end, cookie, cookie_len);
> +       ceph_encode_string(&p, end, tag, tag_len);
> +       WARN_ON(p != end);
> +
> +       osd_req_op_cls_request_data_pages(req, which, pages, assert_op_buf_size,
> +                                         0, false, true);
> +       return 0;
> +}
> +EXPORT_SYMBOL(ceph_cls_assert_locked);
> diff --git a/net/ceph/striper.c b/net/ceph/striper.c
> index c36462dc86b7..3b3fa75d1189 100644
> --- a/net/ceph/striper.c
> +++ b/net/ceph/striper.c
> @@ -259,3 +259,20 @@ int ceph_extent_to_file(struct ceph_file_layout *l,
>         return 0;
>  }
>  EXPORT_SYMBOL(ceph_extent_to_file);
> +
> +u64 ceph_get_num_objects(struct ceph_file_layout *l, u64 size)
> +{
> +       u64 period = (u64)l->stripe_count * l->object_size;
> +       u64 num_periods = DIV64_U64_ROUND_UP(size, period);
> +       u64 remainder_bytes;
> +       u64 remainder_objs = 0;
> +
> +       div64_u64_rem(size, period, &remainder_bytes);
> +       if (remainder_bytes > 0 &&
> +           remainder_bytes < (u64)l->stripe_count * l->stripe_unit)
> +               remainder_objs = l->stripe_count -
> +                           DIV_ROUND_UP_ULL(remainder_bytes, l->stripe_unit);
> +
> +       return num_periods * l->stripe_count - remainder_objs;
> +}
> +EXPORT_SYMBOL(ceph_get_num_objects);
> --
> 2.19.2
>

Nit: might have been nice to break this one commit into several smaller commits.

-- 
Jason



[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux