Re: [PATCH V4 2/2] ceph: Implement writev/pwritev for sync operation.

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 09/12/2013 01:25 PM, majianpeng wrote:
> For writev/pwritev sync-operatoin, ceph only do the first iov.
> It don't think other iovs.Now implement this.
> I divided the write-sync-operation into two functions.One for
> direct-write,other for none-direct-sync-write.This is because for
> none-direct-sync-write we can merge iovs to one.But for direct-write,
> we can't merge iovs.
> 
> V4:
> 	reconstruct the code by Yan, Zheng
> 
> V2:
>   -using struct iov_iter replace clone iovs in ceph_sync_write.
> 
> Signed-off-by: Jianpeng Ma <majianpeng@xxxxxxxxx>
> Reviewed-by: Yan, Zheng <zheng.z.yan@xxxxxxxxx>
> ---
>  fs/ceph/file.c | 273 ++++++++++++++++++++++++++++++++++++++++-----------------
>  1 file changed, 193 insertions(+), 80 deletions(-)
> 
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 8f20eb4..38c4419 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -532,83 +532,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
>  	}
>  }
>  
> +
>  /*
> - * Synchronous write, straight from __user pointer or user pages (if
> - * O_DIRECT).
> + * Synchronous write, straight from __user pointer or user pages.
>   *
>   * If write spans object boundary, just do multiple writes.  (For a
>   * correct atomic write, we should e.g. take write locks on all
>   * objects, rollback on failure, etc.)
>   */
> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
> -			       size_t left, loff_t pos, loff_t *ppos)
> +static ssize_t
> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
> +		       unsigned long nr_segs, size_t count)
>  {
> +	struct file *file = iocb->ki_filp;
>  	struct inode *inode = file_inode(file);
>  	struct ceph_inode_info *ci = ceph_inode(inode);
>  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>  	struct ceph_snap_context *snapc;
>  	struct ceph_vino vino;
>  	struct ceph_osd_request *req;
> -	int num_ops = 1;
>  	struct page **pages;
>  	int num_pages;
> -	u64 len;
>  	int written = 0;
>  	int flags;
>  	int check_caps = 0;
> -	int page_align, io_align;
> -	unsigned long buf_align;
> +	int page_align;
>  	int ret;
>  	struct timespec mtime = CURRENT_TIME;
> -	bool own_pages = false;
> +	loff_t pos = iocb->ki_pos;
> +	struct iov_iter i;
>  
>  	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>  		return -EROFS;
>  
> -	dout("sync_write on file %p %lld~%u %s\n", file, pos,
> -	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
> +	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
> +	     (unsigned)count);
>  
> -	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>  	if (ret < 0)
>  		return ret;
>  
>  	ret = invalidate_inode_pages2_range(inode->i_mapping,
>  					    pos >> PAGE_CACHE_SHIFT,
> -					    (pos + left) >> PAGE_CACHE_SHIFT);
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
>  	if (ret < 0)
>  		dout("invalidate_inode_pages2_range returned %d\n", ret);
>  
>  	flags = CEPH_OSD_FLAG_ORDERSNAP |
>  		CEPH_OSD_FLAG_ONDISK |
>  		CEPH_OSD_FLAG_WRITE;
> -	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
> -		flags |= CEPH_OSD_FLAG_ACK;
> -	else
> -		num_ops++;	/* Also include a 'startsync' command. */
> -
> -	/*
> -	 * we may need to do multiple writes here if we span an object
> -	 * boundary.  this isn't atomic, unfortunately.  :(
> -	 */
> -more:
> -	io_align = pos & ~PAGE_MASK;
> -	buf_align = (unsigned long)data & ~PAGE_MASK;
> -	len = left;
>  
> -	snapc = ci->i_snap_realm->cached_context;
> -	vino = ceph_vino(inode);
> -	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> -				    vino, pos, &len, num_ops,
> -				    CEPH_OSD_OP_WRITE, flags, snapc,
> -				    ci->i_truncate_seq, ci->i_truncate_size,
> -				    false);
> -	if (IS_ERR(req))
> -		return PTR_ERR(req);
> +	iov_iter_init(&i, iov, nr_segs, count, 0);
> +
> +	while (iov_iter_count(&i) > 0) {
> +		void __user *data = i.iov->iov_base + i.iov_offset;
> +		u64 len = i.iov->iov_len - i.iov_offset;
> +
> +		page_align = (unsigned long)data & ~PAGE_MASK;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len,
> +					    2,/*include a 'startsync' command*/
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
>  
> -	/* write from beginning of first page, regardless of io alignment */
> -	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
> -	num_pages = calc_pages_for(page_align, len);
> -	if (file->f_flags & O_DIRECT) {
> +		num_pages = calc_pages_for(page_align, len);
>  		pages = ceph_get_direct_page_vector(data, num_pages, false);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
> @@ -620,60 +616,175 @@ more:
>  		 * may block.
>  		 */
>  		truncate_inode_pages_range(inode->i_mapping, pos,
> -					   (pos+len) | (PAGE_CACHE_SIZE-1));
> -	} else {
> +				   (pos+len) | (PAGE_CACHE_SIZE-1));
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> +						false, false);
> +
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +
> +		ceph_put_page_vector(pages, num_pages, false);
> +
> +out:
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +			iov_iter_advance(&i, (size_t)len);
> +
> +			if (pos > i_size_read(inode)) 
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);


"{ ... }" are needed here. rest change looks good.

Regards
Yan, Zheng

> +		} else
> +			break;
> +	}
> +
> +	if (ret != -EOLDSNAPC && written > 0) {
> +		iocb->ki_pos = pos;
> +		ret = written;
> +	}
> +	return ret;
> +}
> +
> +
> +/*
> + * Synchronous write, straight from __user pointer or user pages.
> + *
> + * If write spans object boundary, just do multiple writes.  (For a
> + * correct atomic write, we should e.g. take write locks on all
> + * objects, rollback on failure, etc.)
> + */
> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
> +			       unsigned long nr_segs, size_t count)
> +{
> +	struct file *file = iocb->ki_filp;
> +	struct inode *inode = file_inode(file);
> +	struct ceph_inode_info *ci = ceph_inode(inode);
> +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> +	struct ceph_snap_context *snapc;
> +	struct ceph_vino vino;
> +	struct ceph_osd_request *req;
> +	struct page **pages;
> +	u64 len;
> +	int num_pages;
> +	int written = 0;
> +	int flags;
> +	int check_caps = 0;
> +	int ret;
> +	struct timespec mtime = CURRENT_TIME;
> +	loff_t pos = iocb->ki_pos;
> +	struct iov_iter i;
> +
> +	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
> +		return -EROFS;
> +
> +	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
> +
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
> +	if (ret < 0)
> +		return ret;
> +
> +	ret = invalidate_inode_pages2_range(inode->i_mapping,
> +					    pos >> PAGE_CACHE_SHIFT,
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
> +	if (ret < 0)
> +		dout("invalidate_inode_pages2_range returned %d\n", ret);
> +
> +	flags = CEPH_OSD_FLAG_ORDERSNAP |
> +		CEPH_OSD_FLAG_ONDISK |
> +		CEPH_OSD_FLAG_WRITE |
> +		CEPH_OSD_FLAG_ACK;
> +
> +	iov_iter_init(&i, iov, nr_segs, count, 0);
> +
> +	while ((len = iov_iter_count(&i)) > 0) {
> +		size_t left;
> +		int n;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len, 1,
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
> +
> +		/*
> +		 * write from beginning of first page,
> +		 * regardless of io alignment
> +		 */
> +		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
> +
>  		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
>  			goto out;
>  		}
> -		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
> +
> +		left = len;
> +		for (n = 0; n < num_pages; n++) {
> +			size_t plen = min(left, PAGE_SIZE);
> +			ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
> +			if (ret != plen) {
> +				ret = -EFAULT;
> +				break;
> +			}
> +			left -= ret;
> +			iov_iter_advance(&i, ret);
> +		}
> +
>  		if (ret < 0) {
>  			ceph_release_page_vector(pages, num_pages);
>  			goto out;
>  		}
>  
> -		if ((file->f_flags & O_SYNC) == 0) {
> -			/* get a second commit callback */
> -			req->r_unsafe_callback = ceph_sync_write_unsafe;
> -			req->r_inode = inode;
> -			own_pages = true;
> -		}
> -	}
> -	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> -					false, own_pages);
> +		/* get a second commit callback */
> +		req->r_unsafe_callback = ceph_sync_write_unsafe;
> +		req->r_inode = inode;
>  
> -	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> -	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
> +						false, true);
>  
> -	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -	if (!ret)
> -		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>  
> -	if (file->f_flags & O_DIRECT)
> -		ceph_put_page_vector(pages, num_pages, false);
> -	else if (file->f_flags & O_SYNC)
> -		ceph_release_page_vector(pages, num_pages);
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  
>  out:
> -	ceph_osdc_put_request(req);
> -	if (ret == 0) {
> -		pos += len;
> -		written += len;
> -		left -= len;
> -		data += len;
> -		if (left)
> -			goto more;
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +
> +			if (pos > i_size_read(inode)) {
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);
> +			}
> +		} else {
> +			break;
> +		}
> +	}
>  
> +	if (ret != -EOLDSNAPC && written > 0) {
>  		ret = written;
> -		*ppos = pos;
> -		if (pos > i_size_read(inode))
> -			check_caps = ceph_inode_set_size(inode, pos);
> -		if (check_caps)
> -			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
> -					NULL);
> -	} else if (ret != -EOLDSNAPC && written > 0) {
> -		ret = written;
> +		iocb->ki_pos = pos;
>  	}
>  	return ret;
>  }
> @@ -827,11 +938,13 @@ retry_snap:
>  	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>  
>  	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
> -	    (iocb->ki_filp->f_flags & O_DIRECT) ||
> -	    (fi->flags & CEPH_F_SYNC)) {
> +	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
>  		mutex_unlock(&inode->i_mutex);
> -		written = ceph_sync_write(file, iov->iov_base, count,
> -					  pos, &iocb->ki_pos);
> +		if (file->f_flags & O_DIRECT)
> +			written = ceph_sync_direct_write(iocb, iov,
> +							 nr_segs, count);
> +		else
> +			written = ceph_sync_write(iocb, iov, nr_segs, count);
>  		if (written == -EOLDSNAPC) {
>  			dout("aio_write %p %llx.%llx %llu~%u"
>  				"got EOLDSNAPC, retrying\n",
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html




[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux