On Fri, 2019-07-12 at 10:44 +0800, Yan, Zheng wrote: > On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@xxxxxxxxxx> wrote: > > The current code will do the uninlining but it relies on the callers to > > set the i_inline_version appropriately afterward. Multiple tasks can end > > up attempting to uninline the data, and overwrite changes that follow > > the first uninlining. > > > > Protect against competing uninlining attempts by having the callers take > > the i_truncate_mutex and then have ceph_uninline_data update the version > > itself before dropping it. This means that we also need to have > > ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if > > any of them are newly dirty. > > > > We can't mark the caps dirty though unless we actually have them, so > > move the uninlining after the point where Fw caps are acquired in all of > > the callers. > > > > Finally, since we are doing a lockless check first in all cases, just > > move that into ceph_uninline_data as well, and have the callers call it > > unconditionally. > > > > Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx> > > --- > > fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++---------------- > > fs/ceph/file.c | 39 +++++++--------- > > 2 files changed, 97 insertions(+), 61 deletions(-) > > > > diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c > > index 038678963cf9..4606da82da6f 100644 > > --- a/fs/ceph/addr.c > > +++ b/fs/ceph/addr.c > > @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > loff_t off = page_offset(page); > > loff_t size = i_size_read(inode); > > size_t len; > > - int want, got, err; > > + int want, got, err, dirty; > > sigset_t oldset; > > vm_fault_t ret = VM_FAULT_SIGBUS; > > > > @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > > > ceph_block_sigs(&oldset); > > > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > > - err = ceph_uninline_data(inode, off == 0 ? page : NULL); > > - if (err < 0) > > - goto out_free; > > - } > > - > > if (off + PAGE_SIZE <= size) > > len = PAGE_SIZE; > > else > > @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > if (err < 0) > > goto out_free; > > > > + err = ceph_uninline_data(inode, off == 0 ? page : NULL); > > + if (err < 0) > > + goto out_put_caps; > > + dirty = err; > > + > > dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", > > inode, off, len, ceph_cap_string(got)); > > > > @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > > > if (ret == VM_FAULT_LOCKED || > > ci->i_inline_version != CEPH_INLINE_NONE) { > > - int dirty; > > spin_lock(&ci->i_ceph_lock); > > - ci->i_inline_version = CEPH_INLINE_NONE; > > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > - &prealloc_cf); > > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > + &prealloc_cf); > > spin_unlock(&ci->i_ceph_lock); > > if (dirty) > > __mark_inode_dirty(inode, dirty); > > @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > > > dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n", > > inode, off, len, ceph_cap_string(got), ret); > > +out_put_caps: > > ceph_put_cap_refs(ci, got); > > out_free: > > ceph_restore_sigs(&oldset); > > @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, > > } > > } > > > > +/** > > + * ceph_uninline_data - convert an inlined file to uninlined > > + * @inode: inode to be uninlined > > + * @page: optional pointer to first page in file > > + * > > + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex > > + * to serialize callers and prevent races. Returns either a negative error code > > + * or a positive set of I_DIRTY_* flags that the caller should apply when > > + * dirtying the inode. > > + */ > > int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > { > > struct ceph_inode_info *ci = ceph_inode(inode); > > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > > struct ceph_osd_request *req; > > + struct ceph_cap_flush *prealloc_cf = NULL; > > struct page *page = NULL; > > u64 len, inline_version; > > - int err = 0; > > + int ret = 0; > > bool from_pagecache = false; > > bool allocated_page = false; > > > > + /* Do a lockless check first -- paired with i_ceph_lock for changes */ > > + inline_version = READ_ONCE(ci->i_inline_version); > > + if (likely(inline_version == CEPH_INLINE_NONE)) > > + return 0; > > + > > + dout("uninline_data %p %llx.%llx inline_version %llu\n", > > + inode, ceph_vinop(inode), inline_version); > > + > > + mutex_lock(&ci->i_truncate_mutex); > > + > > + /* Double check the version after taking mutex */ > > spin_lock(&ci->i_ceph_lock); > > inline_version = ci->i_inline_version; > > spin_unlock(&ci->i_ceph_lock); > > > > - dout("uninline_data %p %llx.%llx inline_version %llu\n", > > - inode, ceph_vinop(inode), inline_version); > > + /* If someone beat us to the uninlining then just return. */ > > + if (inline_version == CEPH_INLINE_NONE) > > + goto out; > > > > - if (inline_version == 1 || /* initial version, no data */ > I'd like to avoid this optimization. check i_size instead. > This one is being removed already... > > - inline_version == CEPH_INLINE_NONE) > > + prealloc_cf = ceph_alloc_cap_flush(); > > + if (!prealloc_cf) { > > + ret = -ENOMEM; > > goto out; > > + } > > + > > + /* > > + * Handle the initial version (1) as a a special case: switch the > > + * version to CEPH_INLINE_NONE, but we don't need to do any uninlining > > + * in that case since there is no data yet. > > + */ > > + if (inline_version == 1) > > + goto out_set_vers; > > ditto > Fixed this up in my tree, and removed the places that set the i_inline_version to CEPH_INLINE_NONE in aio completion and cfr codepaths. I went ahead and pushed the result to the ceph-client/testing branch. Thanks to you and Luis for the review so far! > > if (provided_page) { > > page = provided_page; > > @@ -1703,20 +1734,20 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > } else { > > page = __page_cache_alloc(GFP_NOFS); > > if (!page) { > > - err = -ENOMEM; > > + ret = -ENOMEM; > > goto out; > > } > > allocated_page = true; > > lock_page(page); > > - err = __ceph_do_getattr(inode, page, > > + ret = __ceph_do_getattr(inode, page, > > CEPH_STAT_CAP_INLINE_DATA, true); > > - if (err < 0) { > > + if (ret < 0) { > > /* no inline data */ > > - if (err == -ENODATA) > > - err = 0; > > + if (ret == -ENODATA) > > + ret = 0; > > goto out; > > } > > - len = err; > > + len = ret; > > } > > > > req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > > @@ -1724,16 +1755,16 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, > > NULL, 0, 0, false); > > if (IS_ERR(req)) { > > - err = PTR_ERR(req); > > + ret = PTR_ERR(req); > > goto out; > > } > > > > req->r_mtime = inode->i_mtime; > > - err = ceph_osdc_start_request(&fsc->client->osdc, req, false); > > - if (!err) > > - err = ceph_osdc_wait_request(&fsc->client->osdc, req); > > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > > + if (!ret) > > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > > ceph_osdc_put_request(req); > > - if (err < 0) > > + if (ret < 0) > > goto out; > > > > req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > > @@ -1742,7 +1773,7 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > NULL, ci->i_truncate_seq, > > ci->i_truncate_size, false); > > if (IS_ERR(req)) { > > - err = PTR_ERR(req); > > + ret = PTR_ERR(req); > > goto out; > > } > > > > @@ -1750,12 +1781,12 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > > > { > > __le64 xattr_buf = cpu_to_le64(inline_version); > > - err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, > > + ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, > > "inline_version", &xattr_buf, > > sizeof(xattr_buf), > > CEPH_OSD_CMPXATTR_OP_GT, > > CEPH_OSD_CMPXATTR_MODE_U64); > > - if (err) > > + if (ret) > > goto out_put; > > } > > > > @@ -1763,22 +1794,31 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > char xattr_buf[32]; > > int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), > > "%llu", inline_version); > > - err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, > > + ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, > > "inline_version", > > xattr_buf, xattr_len, 0, 0); > > - if (err) > > + if (ret) > > goto out_put; > > } > > > > req->r_mtime = inode->i_mtime; > > - err = ceph_osdc_start_request(&fsc->client->osdc, req, false); > > - if (!err) > > - err = ceph_osdc_wait_request(&fsc->client->osdc, req); > > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > > + if (!ret) > > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > > out_put: > > ceph_osdc_put_request(req); > > - if (err == -ECANCELED) > > - err = 0; > > + if (ret == -ECANCELED) > > + ret = 0; > > +out_set_vers: > > + if (!ret) { > > + spin_lock(&ci->i_ceph_lock); > > + ci->i_inline_version = CEPH_INLINE_NONE; > > + ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > + &prealloc_cf); > > + spin_unlock(&ci->i_ceph_lock); > > + } > > out: > > + mutex_unlock(&ci->i_truncate_mutex); > > if (page) { > > unlock_page(page); > > if (from_pagecache) > > @@ -1786,10 +1826,11 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > else if (allocated_page) > > __free_pages(page, 0); > > } > > + ceph_free_cap_flush(prealloc_cf); > > > > dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", > > - inode, ceph_vinop(inode), inline_version, err); > > - return err; > > + inode, ceph_vinop(inode), inline_version, ret); > > + return ret; > > } > > > > static const struct vm_operations_struct ceph_vmops = { > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > > index 7bb090fa99d3..252aac44b8ce 100644 > > --- a/fs/ceph/file.c > > +++ b/fs/ceph/file.c > > @@ -1387,7 +1387,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > > struct ceph_cap_flush *prealloc_cf; > > ssize_t count, written = 0; > > - int err, want, got; > > + int err, want, got, dirty; > > loff_t pos; > > loff_t limit = max(i_size_read(inode), fsc->max_file_size); > > > > @@ -1438,12 +1438,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > > > inode_inc_iversion_raw(inode); > > > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > > - err = ceph_uninline_data(inode, NULL); > > - if (err < 0) > > - goto out; > > - } > > - > > /* FIXME: not complete since it doesn't account for being at quota */ > > if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) { > > err = -ENOSPC; > > @@ -1462,6 +1456,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > if (err < 0) > > goto out; > > > > + err = ceph_uninline_data(inode, NULL); > > + if (err < 0) > > + goto out_put_caps; > > + dirty = err; > > + > > dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n", > > inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); > > > > @@ -1510,12 +1509,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > } > > > > if (written >= 0) { > > - int dirty; > > - > > spin_lock(&ci->i_ceph_lock); > > - ci->i_inline_version = CEPH_INLINE_NONE; > > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > - &prealloc_cf); > > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > + &prealloc_cf); > > spin_unlock(&ci->i_ceph_lock); > > if (dirty) > > __mark_inode_dirty(inode, dirty); > > @@ -1526,6 +1522,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", > > inode, ceph_vinop(inode), pos, (unsigned)count, > > ceph_cap_string(got)); > > +out_put_caps: > > ceph_put_cap_refs(ci, got); > > > > if (written == -EOLDSNAPC) { > > @@ -1762,12 +1759,6 @@ static long ceph_fallocate(struct file *file, int mode, > > goto unlock; > > } > > > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > > - ret = ceph_uninline_data(inode, NULL); > > - if (ret < 0) > > - goto unlock; > > - } > > - > > size = i_size_read(inode); > > > > /* Are we punching a hole beyond EOF? */ > > @@ -1785,19 +1776,23 @@ static long ceph_fallocate(struct file *file, int mode, > > if (ret < 0) > > goto unlock; > > > > + ret = ceph_uninline_data(inode, NULL); > > + if (ret < 0) > > + goto out_put_caps; > > + dirty = ret; > > + > > ceph_zero_pagecache_range(inode, offset, length); > > ret = ceph_zero_objects(inode, offset, length); > > > > if (!ret) { > > spin_lock(&ci->i_ceph_lock); > > - ci->i_inline_version = CEPH_INLINE_NONE; > > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > - &prealloc_cf); > > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > > + &prealloc_cf); > > spin_unlock(&ci->i_ceph_lock); > > if (dirty) > > __mark_inode_dirty(inode, dirty); > > } > > - > > +out_put_caps: > > ceph_put_cap_refs(ci, got); > > unlock: > > inode_unlock(inode); > > -- > > 2.21.0 > > -- Jeff Layton <jlayton@xxxxxxxxxx>