Re: [PATCH v2 3/5] ceph: fix potential races in ceph_uninline_data

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri, 2019-07-12 at 10:44 +0800, Yan, Zheng wrote:
> On Fri, Jul 12, 2019 at 3:17 AM Jeff Layton <jlayton@xxxxxxxxxx> wrote:
> > The current code will do the uninlining but it relies on the callers to
> > set the i_inline_version appropriately afterward. Multiple tasks can end
> > up attempting to uninline the data, and overwrite changes that follow
> > the first uninlining.
> > 
> > Protect against competing uninlining attempts by having the callers take
> > the i_truncate_mutex and then have ceph_uninline_data update the version
> > itself before dropping it. This means that we also need to have
> > ceph_uninline_data mark the caps dirty and pass back I_DIRTY_* flags if
> > any of them are newly dirty.
> > 
> > We can't mark the caps dirty though unless we actually have them, so
> > move the uninlining after the point where Fw caps are acquired in all of
> > the callers.
> > 
> > Finally, since we are doing a lockless check first in all cases, just
> > move that into ceph_uninline_data as well, and have the callers call it
> > unconditionally.
> > 
> > Signed-off-by: Jeff Layton <jlayton@xxxxxxxxxx>
> > ---
> >  fs/ceph/addr.c | 119 +++++++++++++++++++++++++++++++++----------------
> >  fs/ceph/file.c |  39 +++++++---------
> >  2 files changed, 97 insertions(+), 61 deletions(-)
> > 
> > diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> > index 038678963cf9..4606da82da6f 100644
> > --- a/fs/ceph/addr.c
> > +++ b/fs/ceph/addr.c
> > @@ -1531,7 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> >         loff_t off = page_offset(page);
> >         loff_t size = i_size_read(inode);
> >         size_t len;
> > -       int want, got, err;
> > +       int want, got, err, dirty;
> >         sigset_t oldset;
> >         vm_fault_t ret = VM_FAULT_SIGBUS;
> > 
> > @@ -1541,12 +1541,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         ceph_block_sigs(&oldset);
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > -               if (err < 0)
> > -                       goto out_free;
> > -       }
> > -
> >         if (off + PAGE_SIZE <= size)
> >                 len = PAGE_SIZE;
> >         else
> > @@ -1565,6 +1559,11 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> >         if (err < 0)
> >                 goto out_free;
> > 
> > +       err = ceph_uninline_data(inode, off == 0 ? page : NULL);
> > +       if (err < 0)
> > +               goto out_put_caps;
> > +       dirty = err;
> > +
> >         dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
> >              inode, off, len, ceph_cap_string(got));
> > 
> > @@ -1591,11 +1590,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         if (ret == VM_FAULT_LOCKED ||
> >             ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               int dirty;
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> > @@ -1603,6 +1600,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
> > 
> >         dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
> >              inode, off, len, ceph_cap_string(got), ret);
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> >  out_free:
> >         ceph_restore_sigs(&oldset);
> > @@ -1656,27 +1654,60 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
> >         }
> >  }
> > 
> > +/**
> > + * ceph_uninline_data - convert an inlined file to uninlined
> > + * @inode: inode to be uninlined
> > + * @page: optional pointer to first page in file
> > + *
> > + * Convert a file from inlined to non-inlined. We borrow the i_truncate_mutex
> > + * to serialize callers and prevent races. Returns either a negative error code
> > + * or a positive set of I_DIRTY_* flags that the caller should apply when
> > + * dirtying the inode.
> > + */
> >  int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >  {
> >         struct ceph_inode_info *ci = ceph_inode(inode);
> >         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> >         struct ceph_osd_request *req;
> > +       struct ceph_cap_flush *prealloc_cf = NULL;
> >         struct page *page = NULL;
> >         u64 len, inline_version;
> > -       int err = 0;
> > +       int ret = 0;
> >         bool from_pagecache = false;
> >         bool allocated_page = false;
> > 
> > +       /* Do a lockless check first -- paired with i_ceph_lock for changes */
> > +       inline_version = READ_ONCE(ci->i_inline_version);
> > +       if (likely(inline_version == CEPH_INLINE_NONE))
> > +               return 0;
> > +
> > +       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > +            inode, ceph_vinop(inode), inline_version);
> > +
> > +       mutex_lock(&ci->i_truncate_mutex);
> > +
> > +       /* Double check the version after taking mutex */
> >         spin_lock(&ci->i_ceph_lock);
> >         inline_version = ci->i_inline_version;
> >         spin_unlock(&ci->i_ceph_lock);
> > 
> > -       dout("uninline_data %p %llx.%llx inline_version %llu\n",
> > -            inode, ceph_vinop(inode), inline_version);
> > +       /* If someone beat us to the uninlining then just return. */
> > +       if (inline_version == CEPH_INLINE_NONE)
> > +               goto out;
> > 
> > -       if (inline_version == 1 || /* initial version, no data */
> I'd like to avoid this optimization. check i_size instead.
> 

This one is being removed already...

> > -           inline_version == CEPH_INLINE_NONE)
> > +       prealloc_cf = ceph_alloc_cap_flush();
> > +       if (!prealloc_cf) {
> > +               ret = -ENOMEM;
> >                 goto out;
> > +       }
> > +
> > +       /*
> > +        * Handle the initial version (1) as a a special case: switch the
> > +        * version to CEPH_INLINE_NONE, but we don't need to do any uninlining
> > +        * in that case since there is no data yet.
> > +        */
> > +       if (inline_version == 1)
> > +               goto out_set_vers;
> 
> ditto
> 

Fixed this up in my tree, and removed the places that set the
i_inline_version to CEPH_INLINE_NONE in aio completion and cfr
codepaths. I went ahead and pushed the result to the ceph-client/testing 
branch.

Thanks to you and Luis for the review so far!

> >         if (provided_page) {
> >                 page = provided_page;
> > @@ -1703,20 +1734,20 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >         } else {
> >                 page = __page_cache_alloc(GFP_NOFS);
> >                 if (!page) {
> > -                       err = -ENOMEM;
> > +                       ret = -ENOMEM;
> >                         goto out;
> >                 }
> >                 allocated_page = true;
> >                 lock_page(page);
> > -               err = __ceph_do_getattr(inode, page,
> > +               ret = __ceph_do_getattr(inode, page,
> >                                         CEPH_STAT_CAP_INLINE_DATA, true);
> > -               if (err < 0) {
> > +               if (ret < 0) {
> >                         /* no inline data */
> > -                       if (err == -ENODATA)
> > -                               err = 0;
> > +                       if (ret == -ENODATA)
> > +                               ret = 0;
> >                         goto out;
> >                 }
> > -               len = err;
> > +               len = ret;
> >         }
> > 
> >         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> > @@ -1724,16 +1755,16 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                                     CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
> >                                     NULL, 0, 0, false);
> >         if (IS_ERR(req)) {
> > -               err = PTR_ERR(req);
> > +               ret = PTR_ERR(req);
> >                 goto out;
> >         }
> > 
> >         req->r_mtime = inode->i_mtime;
> > -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > -       if (!err)
> > -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> > +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > +       if (!ret)
> > +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> >         ceph_osdc_put_request(req);
> > -       if (err < 0)
> > +       if (ret < 0)
> >                 goto out;
> > 
> >         req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> > @@ -1742,7 +1773,7 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                                     NULL, ci->i_truncate_seq,
> >                                     ci->i_truncate_size, false);
> >         if (IS_ERR(req)) {
> > -               err = PTR_ERR(req);
> > +               ret = PTR_ERR(req);
> >                 goto out;
> >         }
> > 
> > @@ -1750,12 +1781,12 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> > 
> >         {
> >                 __le64 xattr_buf = cpu_to_le64(inline_version);
> > -               err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
> > +               ret = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
> >                                             "inline_version", &xattr_buf,
> >                                             sizeof(xattr_buf),
> >                                             CEPH_OSD_CMPXATTR_OP_GT,
> >                                             CEPH_OSD_CMPXATTR_MODE_U64);
> > -               if (err)
> > +               if (ret)
> >                         goto out_put;
> >         }
> > 
> > @@ -1763,22 +1794,31 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                 char xattr_buf[32];
> >                 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
> >                                          "%llu", inline_version);
> > -               err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
> > +               ret = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
> >                                             "inline_version",
> >                                             xattr_buf, xattr_len, 0, 0);
> > -               if (err)
> > +               if (ret)
> >                         goto out_put;
> >         }
> > 
> >         req->r_mtime = inode->i_mtime;
> > -       err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > -       if (!err)
> > -               err = ceph_osdc_wait_request(&fsc->client->osdc, req);
> > +       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> > +       if (!ret)
> > +               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> >  out_put:
> >         ceph_osdc_put_request(req);
> > -       if (err == -ECANCELED)
> > -               err = 0;
> > +       if (ret == -ECANCELED)
> > +               ret = 0;
> > +out_set_vers:
> > +       if (!ret) {
> > +               spin_lock(&ci->i_ceph_lock);
> > +               ci->i_inline_version = CEPH_INLINE_NONE;
> > +               ret = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                            &prealloc_cf);
> > +               spin_unlock(&ci->i_ceph_lock);
> > +       }
> >  out:
> > +       mutex_unlock(&ci->i_truncate_mutex);
> >         if (page) {
> >                 unlock_page(page);
> >                 if (from_pagecache)
> > @@ -1786,10 +1826,11 @@ int ceph_uninline_data(struct inode *inode, struct page *provided_page)
> >                 else if (allocated_page)
> >                         __free_pages(page, 0);
> >         }
> > +       ceph_free_cap_flush(prealloc_cf);
> > 
> >         dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
> > -            inode, ceph_vinop(inode), inline_version, err);
> > -       return err;
> > +            inode, ceph_vinop(inode), inline_version, ret);
> > +       return ret;
> >  }
> > 
> >  static const struct vm_operations_struct ceph_vmops = {
> > diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> > index 7bb090fa99d3..252aac44b8ce 100644
> > --- a/fs/ceph/file.c
> > +++ b/fs/ceph/file.c
> > @@ -1387,7 +1387,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> >         struct ceph_cap_flush *prealloc_cf;
> >         ssize_t count, written = 0;
> > -       int err, want, got;
> > +       int err, want, got, dirty;
> >         loff_t pos;
> >         loff_t limit = max(i_size_read(inode), fsc->max_file_size);
> > 
> > @@ -1438,12 +1438,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> > 
> >         inode_inc_iversion_raw(inode);
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               err = ceph_uninline_data(inode, NULL);
> > -               if (err < 0)
> > -                       goto out;
> > -       }
> > -
> >         /* FIXME: not complete since it doesn't account for being at quota */
> >         if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) {
> >                 err = -ENOSPC;
> > @@ -1462,6 +1456,11 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         if (err < 0)
> >                 goto out;
> > 
> > +       err = ceph_uninline_data(inode, NULL);
> > +       if (err < 0)
> > +               goto out_put_caps;
> > +       dirty = err;
> > +
> >         dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n",
> >              inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
> > 
> > @@ -1510,12 +1509,9 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         }
> > 
> >         if (written >= 0) {
> > -               int dirty;
> > -
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> > @@ -1526,6 +1522,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
> >         dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
> >              inode, ceph_vinop(inode), pos, (unsigned)count,
> >              ceph_cap_string(got));
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> > 
> >         if (written == -EOLDSNAPC) {
> > @@ -1762,12 +1759,6 @@ static long ceph_fallocate(struct file *file, int mode,
> >                 goto unlock;
> >         }
> > 
> > -       if (ci->i_inline_version != CEPH_INLINE_NONE) {
> > -               ret = ceph_uninline_data(inode, NULL);
> > -               if (ret < 0)
> > -                       goto unlock;
> > -       }
> > -
> >         size = i_size_read(inode);
> > 
> >         /* Are we punching a hole beyond EOF? */
> > @@ -1785,19 +1776,23 @@ static long ceph_fallocate(struct file *file, int mode,
> >         if (ret < 0)
> >                 goto unlock;
> > 
> > +       ret = ceph_uninline_data(inode, NULL);
> > +       if (ret < 0)
> > +               goto out_put_caps;
> > +       dirty = ret;
> > +
> >         ceph_zero_pagecache_range(inode, offset, length);
> >         ret = ceph_zero_objects(inode, offset, length);
> > 
> >         if (!ret) {
> >                 spin_lock(&ci->i_ceph_lock);
> > -               ci->i_inline_version = CEPH_INLINE_NONE;
> > -               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > -                                              &prealloc_cf);
> > +               dirty |= __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
> > +                                               &prealloc_cf);
> >                 spin_unlock(&ci->i_ceph_lock);
> >                 if (dirty)
> >                         __mark_inode_dirty(inode, dirty);
> >         }
> > -
> > +out_put_caps:
> >         ceph_put_cap_refs(ci, got);
> >  unlock:
> >         inode_unlock(inode);
> > --
> > 2.21.0
> > 

-- 
Jeff Layton <jlayton@xxxxxxxxxx>




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Ceph Dev]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]

  Powered by Linux