Hi Li, There is a version of fsx.c floating around that tests hole punching... have you tried running that on top of this patch? Ideally, we should build a test (ceph.git/qa/workunits/rbd/hole_punch.sh or similar) that tests the hole punch both with a default file layout and with a more complicated striping pattern (e.g. object_size=1048576 stripe_unit=65536 stripe_count=7). sage On Thu, 20 Jun 2013, Li Wang wrote: > This patch implements punch hole (fallocate) support for Ceph. > > Signed-off-by: Li Wang <liwang@xxxxxxxxxxxxxxx> > Signed-off-by: Yunchuan Wen <wenyunchuan@xxxxxxxxxxxxxxx> > --- > fs/ceph/file.c | 313 > +++++++++++++++++++++++++++++++++++++++++++++++++ > net/ceph/osd_client.c | 8 +- > 2 files changed, 319 insertions(+), 2 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 656e169..578e5fd 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -8,6 +8,7 @@ > #include <linux/namei.h> > #include <linux/writeback.h> > #include <linux/aio.h> > +#include <linux/falloc.h> > > #include "super.h" > #include "mds_client.h" > @@ -882,6 +883,317 @@ out: > return offset; > } > > +static inline void ceph_zero_partial_page(struct inode *inode, pgoff_t index, > unsigned start, unsigned size) > +{ > + struct page *page; > + > + page = find_lock_page(inode->i_mapping, index); > + if (page) { > + zero_user(page, start, size); > + unlock_page(page); > + page_cache_release(page); > + } > +} > + > +static void ceph_truncate_and_zero_page_cache(struct inode *inode, loff_t > offset, loff_t length) > +{ > + loff_t first_page; > + loff_t last_page; > + loff_t zero_len; > + > + first_page =((offset + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT) << > PAGE_CACHE_SHIFT; > + last_page = ((offset + length) >> PAGE_CACHE_SHIFT) << > PAGE_CACHE_SHIFT; > + if (last_page > first_page) { > + truncate_pagecache_range(inode, first_page, last_page - 1); > + } > + if (first_page > last_page) { > + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, > offset & (PAGE_CACHE_SIZE - 1), length); > + return; > + } > + /* > + * zero out the partial page that contains > + * the start of the hole > + */ > + zero_len = first_page - offset; > + if (zero_len > 0) { > + ceph_zero_partial_page(inode, offset >> PAGE_CACHE_SHIFT, > offset & (PAGE_CACHE_SIZE -1), zero_len); > + } > + /* > + * zero out the partial page that contains > + * the end of the hole > + */ > + zero_len = offset + length - last_page; > + if (zero_len > 0) { > + ceph_zero_partial_page(inode, (offset + length) >> > PAGE_CACHE_SHIFT, 0, zero_len); > + } > + /* > + * If i_size is contained in the last page, we need to > + * zero the partial page after i_size > + */ > + if (inode->i_size >> PAGE_CACHE_SHIFT == (offset + length) >> > PAGE_CACHE_SHIFT && inode->i_size % PAGE_CACHE_SIZE != 0) { > + zero_len = PAGE_CACHE_SIZE - > + (inode->i_size & (PAGE_CACHE_SIZE - 1)); > + if (zero_len > 0) { > + ceph_zero_partial_page(inode, inode->i_size >> > PAGE_CACHE_SHIFT, inode->i_size & (PAGE_CACHE_SIZE -1), zero_len); > + } > + } > +} > + > +static inline __u32 ceph_calculate_shift(__s64 size) > +{ > + int shift; > + > + if (size <= 0) > + return -1; > + if (size == 1) > + return 0; > + for (shift = 0; ;shift++) { > + if (2 << shift == size) > + break; > + } > + shift++; > + > + return shift; > +} > + > +static int ceph_delete_object(struct inode *inode, u64 offset, u64 *length) > +{ > + struct ceph_inode_info *ci = ceph_inode(inode); > + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > + struct ceph_osd_request *req; > + int ret = 0; > + > + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > + ceph_vino(inode), offset, length, 1, > + CEPH_OSD_OP_DELETE, CEPH_OSD_FLAG_ONDISK, > + NULL, > + ci->i_truncate_seq, ci->i_truncate_size, > + false); > + if (IS_ERR(req)) { > + ret = PTR_ERR(req); > + goto out; > + } > + > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) { > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > + } > + ceph_osdc_put_request(req); > + > + out: > + return ret; > +} > + > +static int ceph_zero_partial_object(struct inode *inode, loff_t offset, > loff_t *length) > +{ > + struct ceph_inode_info *ci = ceph_inode(inode); > + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > + struct ceph_osd_request *req; > + int ret = 0; > + > + if (length <= 0) > + goto out; > + > + > + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > + ceph_vino(inode), offset, length, 1, > + CEPH_OSD_OP_ZERO, CEPH_OSD_FLAG_WRITE | > CEPH_OSD_FLAG_ONDISK, > + NULL, > + ci->i_truncate_seq, ci->i_truncate_size, > + false); > + if (IS_ERR(req)) { > + ret = PTR_ERR(req); > + goto out; > + } > + > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) { > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > + } > + ceph_osdc_put_request(req); > + > + out: > + return ret; > +} > + > +static int ceph_zero_partial_object_set(struct inode *inode, loff_t start, > loff_t end) > +{ > + struct ceph_inode_info *ci = ceph_inode(inode); > + __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout); > + __u32 stripe_unit_shift = ceph_calculate_shift(stripe_unit_size); > + loff_t first_stripe_unit = ((start + stripe_unit_size -1 ) >> > stripe_unit_shift) << stripe_unit_shift; > + loff_t last_stripe_unit = ((end + 1) >> stripe_unit_shift) << > stripe_unit_shift; > + u64 i; > + loff_t length; > + int ret = 0; > + > + if (last_stripe_unit > first_stripe_unit) { > + for (i = first_stripe_unit; i < last_stripe_unit; i += > stripe_unit_size) { > + length = (u64) stripe_unit_size; > + ret = ceph_zero_partial_object(inode, i, &length); > + if (ret) > + goto out; > + } > + } > + if (first_stripe_unit > last_stripe_unit) { > + length = end - start + 1; > + ret = ceph_zero_partial_object(inode, start, &length); > + goto out; > + } > + length = first_stripe_unit - start; > + if (length > 0) { > + ret = ceph_zero_partial_object(inode, start, &length); > + if (ret) > + goto out; > + } > + length = end - last_stripe_unit + 1; > + if (length > 0) { > + ret = ceph_zero_partial_object(inode, last_stripe_unit, > &length); > + } > + > + out: > + return ret; > +} > + > +static int ceph_delete_and_zero_objects(struct file *file, loff_t offset, > loff_t length) > +{ > + struct ceph_file_info *fi = file->private_data; > + struct inode *inode = file->f_dentry->d_inode; > + struct ceph_inode_info *ci = ceph_inode(inode); > + __s32 stripe_unit_size = ceph_file_layout_su(ci->i_layout); > + __s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout); > + unsigned stripe_width = ceph_file_layout_stripe_width(&ci->i_layout); > + __s32 object_size = ceph_file_layout_object_size(ci->i_layout); > + __s32 object_set_size = object_size * stripe_count; > + __u32 object_set_shift = ceph_calculate_shift(object_set_size); > + __u32 stripe_unit_count_per_object = object_size / stripe_unit_size; > + loff_t first_object_set = ((offset + object_set_size - 1) >> > object_set_shift) << object_set_shift; > + loff_t last_object_set = ((offset + length) >> object_set_shift) << > object_set_shift; > + loff_t i, j; > + int want, got = 0; > + int dirty; > + u64 len; > + int ret = 0; > + > + if (fi->fmode & CEPH_FILE_MODE_LAZY) > + want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; > + else > + want = CEPH_CAP_FILE_BUFFER; > + > + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, offset + > length); > + if (ret < 0) > + return ret; > + if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) { > + ret = -EAGAIN; > + goto out; > + } > + > + /* [offset, offset+length] does not across object set bundary. > + * Yes, there are possibilities to delete some objects within > + * a object set, however, we want to keep it simple, not to incur > + * comprehensive calculation, so for a partial hole within a object > + * set, we zero only > + */ > + if (first_object_set > last_object_set) { > + ret = ceph_zero_partial_object_set(inode, offset, offset + > length - 1); > + goto out; > + } > + /* [offset, offset+length] contains at least one complete object set > */ > + if (last_object_set > first_object_set) { > + len = (u64)stripe_unit_size; > + /* > + * For the very first object, zero it instead of deleting it, > + * since there are attached metada on it > + */ > + if (first_object_set == 0) { > + for (i = 0; i < stripe_unit_count_per_object; i++) { > + ret = ceph_zero_partial_object(inode, > first_object_set + i*stripe_width, &len); > + if (ret) > + goto out; > + } > + } > + for (i = first_object_set; i < last_object_set; i += > object_set_size) { > + for (j = i; j < i + stripe_width; j += > stripe_unit_size) { > + /* skip the very first object */ > + if (j == 0) > + continue; > + ret = ceph_delete_object(inode, j, &len); > + /* object already deleted */ > + if (ret == -ENOENT) > + ret = 0; > + if (ret) > + goto out; > + } > + } > + } > + > + /* deal with the object set contains the start or the end of the hole > */ > + if (first_object_set - offset > 0) { > + ret = ceph_zero_partial_object_set(inode, offset, > first_object_set - 1); > + if (ret) > + goto out; > + } > + if (offset + length - last_object_set > 0) { > + ret = ceph_zero_partial_object_set(inode, last_object_set, > offset + length - 1); > + } > + > + out: > + if (ret == 0) { > + spin_lock(&ci->i_ceph_lock); > + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); > + spin_unlock(&ci->i_ceph_lock); > + if (dirty) > + __mark_inode_dirty(inode, dirty); > + } > + ceph_put_cap_refs(ci, got); > + return ret; > +} > + > +static int ceph_punch_hole(struct file *file, loff_t offset, loff_t length) > +{ > + struct inode *inode = file->f_dentry->d_inode; > + int ret = 0; > + > + if (!S_ISREG(inode->i_mode)) { > + return -EOPNOTSUPP; > + } > + if (IS_SWAPFILE(inode)) { > + return -ETXTBSY; > + } > + mutex_lock(&inode->i_mutex); > + > + /* No need to punch hole beyond i_size */ > + if (offset >= inode->i_size) > + goto out_unlock; > + > + /* > + * If the hole extends beyond i_size, set the hole > + * to end after the page that contains i_size > + */ > + if (offset + length > inode->i_size) { > + length = inode->i_size + > + PAGE_CACHE_SIZE - (inode->i_size & (PAGE_CACHE_SIZE - 1)) - > + offset; > + } > + > + ceph_truncate_and_zero_page_cache(inode, offset, length); > + ret = ceph_delete_and_zero_objects(file, offset, length); > + > + out_unlock: > + mutex_unlock(&inode->i_mutex); > + return ret; > +} > + > +static long ceph_fallocate(struct file *file, int mode, loff_t offset, loff_t > length) > +{ > + /* FALLOC_FL_PUNCH_HOLE must be used with FALLOC_FL_KEEP_SIZE */ > + if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) > + return -EOPNOTSUPP; > + if (mode & FALLOC_FL_PUNCH_HOLE) > + return ceph_punch_hole(file, offset, length); > + return -EOPNOTSUPP; > +} > + > const struct file_operations ceph_file_fops = { > .open = ceph_open, > .release = ceph_release, > @@ -898,5 +1210,6 @@ const struct file_operations ceph_file_fops = { > .splice_write = generic_file_splice_write, > .unlocked_ioctl = ceph_ioctl, > .compat_ioctl = ceph_ioctl, > + .fallocate = ceph_fallocate, > }; > > diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c > index 3a246a6..a6d9671 100644 > --- a/net/ceph/osd_client.c > +++ b/net/ceph/osd_client.c > @@ -503,7 +503,8 @@ void osd_req_op_extent_init(struct ceph_osd_request > *osd_req, > struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); > size_t payload_len = 0; > > - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); > + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && > + opcode != CEPH_OSD_OP_DELETE && opcode != > CEPH_OSD_OP_ZERO); > > op->extent.offset = offset; > op->extent.length = length; > @@ -631,6 +632,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, > break; > case CEPH_OSD_OP_READ: > case CEPH_OSD_OP_WRITE: > + case CEPH_OSD_OP_DELETE: > + case CEPH_OSD_OP_ZERO: > if (src->op == CEPH_OSD_OP_WRITE) > request_data_len = src->extent.length; > dst->extent.offset = cpu_to_le64(src->extent.offset); > @@ -715,7 +718,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct > ceph_osd_client *osdc, > u64 object_base; > int r; > > - BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); > + BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && > + opcode != CEPH_OSD_OP_DELETE && opcode != > CEPH_OSD_OP_ZERO); > > req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, > GFP_NOFS); > -- > 1.7.9.5 > > > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html