On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@xxxxxxxxxx> wrote: > > The current code will do the uninlining but it relies on the callers to > set the i_inline_version appropriately afterward. Multiple tasks can end > up attempting to uninline the data, and overwrite changes that follow > the first uninlining. > > Protect against competing uninlining attempts by having the callers take > the i_truncate_mutex and then have ceph_uninline_data update the version > itself before dropping it. This means that we also need to have > ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if > any of them are newly dirty. > > We can't mark the caps dirty though unless we actually have them, so > move the uninlining after the point where Fw caps are acquired in all of > the callers. > > Finally, since we are doing a lockless check first in all cases, just > move that into ceph_uninline_data as well, and have the callers call it > unconditionally. > > Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx> > --- > fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++---------------- > fs/ceph/file.c | 39 +++++++--------- > 2 files changed, 97 insertions(+), 61 deletions(-) > > diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c > index 038678963cf9..4606da82da6f 100644 > --- a/fs/ceph/addr.c > +++ b/fs/ceph/addr.c > @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > loff_t off = page_offset(page); > loff_t size = i_size_read(inode); > size_t len; > - int want, got, err; > + int want, got, err, dirty; > sigset_t oldset; > vm_fault_t ret = VM_FAULT_SIGBUS; > > @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > ceph_block_sigs(&oldset); > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > - err = ceph_uninline_data(inode, off == 0 ? page : NULL); > - if (err < 0) > - goto out_free; > - } > - > if (off + PAGE_SIZE <= size) > len = PAGE_SIZE; > else > @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > if (err < 0) > goto out_free; > > + err = ceph_uninline_data(inode, off == 0 ? page : NULL); > + if (err < 0) > + goto out_put_caps; > + dirty = err; > + > dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", > inode, off, len, ceph_cap_string(got)); > > @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > if (ret == VM_FAULT_LOCKED || > ci->i_inline_version != CEPH_INLINE_NONE) { > - int dirty; > spin_lock(&ci->i_ceph_lock); > - ci->i_inline_version = CEPH_INLINE_NONE; > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > - &prealloc_cf); > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > + &prealloc_cf); > spin_unlock(&ci->i_ceph_lock); > if (dirty) > __mark_inode_dirty(inode, dirty); > @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) > > dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n", > inode, off, len, ceph_cap_string(got), ret); > +out_put_caps: > ceph_put_cap_refs(ci, got); > out_free: > ceph_restore_sigs(&oldset); > @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, > } > } > > +/** > + * ceph_uninline_data - convert an inlined file to uninlined > + * @inode: inode to be uninlined > + * @page: optional pointer to first page in file > + * > + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex > + * to serialize callers and prevent races. Returns either a negative error code > + * or a positive set of I_DIRTY_* flags that the caller should apply when > + * dirtying the inode. > + */ > int ceph_uninline_data(struct inode *inode, struct page *provided_page) > { > struct ceph_inode_info *ci = ceph_inode(inode); > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_osd_request *req; > + struct ceph_cap_flush *prealloc_cf = NULL; > struct page *page = NULL; > u64 len, inline_version; > - int err = 0; > + int ret = 0; > bool from_pagecache = false; > bool allocated_page = false; > > + /* Do a lockless check first -- paired with i_ceph_lock for changes */ > + inline_version = READ_ONCE(ci->i_inline_version); > + if (likely(inline_version == CEPH_INLINE_NONE)) > + return 0; > + > + dout("uninline_data %p %llx.%llx inline_version %llu\n", > + inode, ceph_vinop(inode), inline_version); > + > + mutex_lock(&ci->i_truncate_mutex); > + > + /* Double check the version after taking mutex */ > spin_lock(&ci->i_ceph_lock); > inline_version = ci->i_inline_version; > spin_unlock(&ci->i_ceph_lock); > > - dout("uninline_data %p %llx.%llx inline_version %llu\n", > - inode, ceph_vinop(inode), inline_version); > + /* If someone beat us to the uninlining then just return. */ > + if (inline_version == CEPH_INLINE_NONE) > + goto out; > > - if (inline_version == 1 || /* initial version, no data */ I'd like to avoid this optimization. check i_size instead. > - inline_version == CEPH_INLINE_NONE) > + prealloc_cf = ceph_alloc_cap_flush(); > + if (!prealloc_cf) { > + ret = -ENOMEM; > goto out; > + } > + > + /* > + * Handle the initial version (1) as a a special case: switch the > + * version to CEPH_INLINE_NONE, but we don't need to do any uninlining > + * in that case since there is no data yet. > + */ > + if (inline_version == 1) > + goto out_set_vers; ditto > > if (provided_page) { > page = provided_page; > @@ -1703,20 +1734,20 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > } else { > page = __page_cache_alloc(GFP_NOFS); > if (!page) { > - err = -ENOMEM; > + ret = -ENOMEM; > goto out; > } > allocated_page = true; > lock_page(page); > - err = __ceph_do_getattr(inode, page, > + ret = __ceph_do_getattr(inode, page, > CEPH_STAT_CAP_INLINE_DATA, true); > - if (err < 0) { > + if (ret < 0) { > /* no inline data */ > - if (err == -ENODATA) > - err = 0; > + if (ret == -ENODATA) > + ret = 0; > goto out; > } > - len = err; > + len = ret; > } > > req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > @@ -1724,16 +1755,16 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, > NULL, 0, 0, false); > if (IS_ERR(req)) { > - err = PTR_ERR(req); > + ret = PTR_ERR(req); > goto out; > } > > req->r_mtime = inode->i_mtime; > - err = ceph_osdc_start_request(&fsc->client->osdc, req, false); > - if (!err) > - err = ceph_osdc_wait_request(&fsc->client->osdc, req); > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > ceph_osdc_put_request(req); > - if (err < 0) > + if (ret < 0) > goto out; > > req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > @@ -1742,7 +1773,7 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > NULL, ci->i_truncate_seq, > ci->i_truncate_size, false); > if (IS_ERR(req)) { > - err = PTR_ERR(req); > + ret = PTR_ERR(req); > goto out; > } > > @@ -1750,12 +1781,12 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > > { > __le64 xattr_buf = cpu_to_le64(inline_version); > - err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, > + ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, > "inline_version", &xattr_buf, > sizeof(xattr_buf), > CEPH_OSD_CMPXATTR_OP_GT, > CEPH_OSD_CMPXATTR_MODE_U64); > - if (err) > + if (ret) > goto out_put; > } > > @@ -1763,22 +1794,31 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > char xattr_buf[32]; > int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), > "%llu", inline_version); > - err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, > + ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, > "inline_version", > xattr_buf, xattr_len, 0, 0); > - if (err) > + if (ret) > goto out_put; > } > > req->r_mtime = inode->i_mtime; > - err = ceph_osdc_start_request(&fsc->client->osdc, req, false); > - if (!err) > - err = ceph_osdc_wait_request(&fsc->client->osdc, req); > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > out_put: > ceph_osdc_put_request(req); > - if (err == -ECANCELED) > - err = 0; > + if (ret == -ECANCELED) > + ret = 0; > +out_set_vers: > + if (!ret) { > + spin_lock(&ci->i_ceph_lock); > + ci->i_inline_version = CEPH_INLINE_NONE; > + ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > + &prealloc_cf); > + spin_unlock(&ci->i_ceph_lock); > + } > out: > + mutex_unlock(&ci->i_truncate_mutex); > if (page) { > unlock_page(page); > if (from_pagecache) > @@ -1786,10 +1826,11 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page) > else if (allocated_page) > __free_pages(page, 0); > } > + ceph_free_cap_flush(prealloc_cf); > > dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", > - inode, ceph_vinop(inode), inline_version, err); > - return err; > + inode, ceph_vinop(inode), inline_version, ret); > + return ret; > } > > static const struct vm_operations_struct ceph_vmops = { > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 7bb090fa99d3..252aac44b8ce 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -1387,7 +1387,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_cap_flush *prealloc_cf; > ssize_t count, written = 0; > - int err, want, got; > + int err, want, got, dirty; > loff_t pos; > loff_t limit = max(i_size_read(inode), fsc->max_file_size); > > @@ -1438,12 +1438,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > > inode_inc_iversion_raw(inode); > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > - err = ceph_uninline_data(inode, NULL); > - if (err < 0) > - goto out; > - } > - > /* FIXME: not complete since it doesn't account for being at quota */ > if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) { > err = -ENOSPC; > @@ -1462,6 +1456,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > if (err < 0) > goto out; > > + err = ceph_uninline_data(inode, NULL); > + if (err < 0) > + goto out_put_caps; > + dirty = err; > + > dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n", > inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); > > @@ -1510,12 +1509,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > } > > if (written >= 0) { > - int dirty; > - > spin_lock(&ci->i_ceph_lock); > - ci->i_inline_version = CEPH_INLINE_NONE; > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > - &prealloc_cf); > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > + &prealloc_cf); > spin_unlock(&ci->i_ceph_lock); > if (dirty) > __mark_inode_dirty(inode, dirty); > @@ -1526,6 +1522,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) > dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", > inode, ceph_vinop(inode), pos, (unsigned)count, > ceph_cap_string(got)); > +out_put_caps: > ceph_put_cap_refs(ci, got); > > if (written == -EOLDSNAPC) { > @@ -1762,12 +1759,6 @@ static long ceph_fallocate(struct file *file, int mode, > goto unlock; > } > > - if (ci->i_inline_version != CEPH_INLINE_NONE) { > - ret = ceph_uninline_data(inode, NULL); > - if (ret < 0) > - goto unlock; > - } > - > size = i_size_read(inode); > > /* Are we punching a hole beyond EOF? */ > @@ -1785,19 +1776,23 @@ static long ceph_fallocate(struct file *file, int mode, > if (ret < 0) > goto unlock; > > + ret = ceph_uninline_data(inode, NULL); > + if (ret < 0) > + goto out_put_caps; > + dirty = ret; > + > ceph_zero_pagecache_range(inode, offset, length); > ret = ceph_zero_objects(inode, offset, length); > > if (!ret) { > spin_lock(&ci->i_ceph_lock); > - ci->i_inline_version = CEPH_INLINE_NONE; > - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > - &prealloc_cf); > + dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, > + &prealloc_cf); > spin_unlock(&ci->i_ceph_lock); > if (dirty) > __mark_inode_dirty(inode, dirty); > } > - > +out_put_caps: > ceph_put_cap_refs(ci, got); > unlock: > inode_unlock(inode); > -- > 2.21.0 >