>Reviewed-by: Yan, Zheng <zheng.z.yan@xxxxxxxxx> Thanks very much! Jianpeng Ma > >On 09/12/2013 01:54 PM, majianpeng wrote: >> For writev/pwritev sync-operatoin, ceph only do the first iov. >> It don't think other iovs.Now implement this. >> I divided the write-sync-operation into two functions.One for >> direct-write,other for none-direct-sync-write.This is because for >> none-direct-sync-write we can merge iovs to one.But for direct-write, >> we can't merge iovs. >> >> V4: >> reconstruct the code by Yan, Zheng >> V2: >> -using struct iov_iter replace clone iovs in ceph_sync_write. >> >> Signed-off-by: Jianpeng Ma <majianpeng@xxxxxxxxx> >> Reviewed-by: Yan, Zheng <zheng.z.yan@xxxxxxxxx> >> --- >> fs/ceph/file.c | 273 ++++++++++++++++++++++++++++++++++++++++----------------- >> 1 file changed, 193 insertions(+), 80 deletions(-) >> >> diff --git a/fs/ceph/file.c b/fs/ceph/file.c >> index 3de8982..5cf034e 100644 >> --- a/fs/ceph/file.c >> +++ b/fs/ceph/file.c >> @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) >> } >> } >> >> + >> /* >> - * Synchronous write, straight from __user pointer or user pages (if >> - * O_DIRECT). >> + * Synchronous write, straight from __user pointer or user pages. >> * >> * If write spans object boundary, just do multiple writes. (For a >> * correct atomic write, we should e.g. take write locks on all >> * objects, rollback on failure, etc.) >> */ >> -static ssize_t ceph_sync_write(struct file *file, const char __user *data, >> - size_t left, loff_t pos, loff_t *ppos) >> +static ssize_t >> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, >> + unsigned long nr_segs, size_t count) >> { >> + struct file *file = iocb->ki_filp; >> struct inode *inode = file_inode(file); >> struct ceph_inode_info *ci = ceph_inode(inode); >> struct ceph_fs_client *fsc = ceph_inode_to_client(inode); >> struct ceph_snap_context *snapc; >> struct ceph_vino vino; >> struct ceph_osd_request *req; >> - int num_ops = 1; >> struct page **pages; >> int num_pages; >> - u64 len; >> int written = 0; >> int flags; >> int check_caps = 0; >> - int page_align, io_align; >> - unsigned long buf_align; >> + int page_align; >> int ret; >> struct timespec mtime = CURRENT_TIME; >> - bool own_pages = false; >> + loff_t pos = iocb->ki_pos; >> + struct iov_iter i; >> >> if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) >> return -EROFS; >> >> - dout("sync_write on file %p %lld~%u %s\n", file, pos, >> - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); >> + dout("sync_direct_write on file %p %lld~%u\n", file, pos, >> + (unsigned)count); >> >> - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); >> + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); >> if (ret < 0) >> return ret; >> >> ret = invalidate_inode_pages2_range(inode->i_mapping, >> pos >> PAGE_CACHE_SHIFT, >> - (pos + left) >> PAGE_CACHE_SHIFT); >> + (pos + count) >> PAGE_CACHE_SHIFT); >> if (ret < 0) >> dout("invalidate_inode_pages2_range returned %d\n", ret); >> >> flags = CEPH_OSD_FLAG_ORDERSNAP | >> CEPH_OSD_FLAG_ONDISK | >> CEPH_OSD_FLAG_WRITE; >> - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) >> - flags |= CEPH_OSD_FLAG_ACK; >> - else >> - num_ops++; /* Also include a 'startsync' command. */ >> >> - /* >> - * we may need to do multiple writes here if we span an object >> - * boundary. this isn't atomic, unfortunately. :( >> - */ >> -more: >> - io_align = pos & ~PAGE_MASK; >> - buf_align = (unsigned long)data & ~PAGE_MASK; >> - len = left; >> - >> - snapc = ci->i_snap_realm->cached_context; >> - vino = ceph_vino(inode); >> - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> - vino, pos, &len, num_ops, >> - CEPH_OSD_OP_WRITE, flags, snapc, >> - ci->i_truncate_seq, ci->i_truncate_size, >> - false); >> - if (IS_ERR(req)) >> - return PTR_ERR(req); >> + iov_iter_init(&i, iov, nr_segs, count, 0); >> + >> + while (iov_iter_count(&i) > 0) { >> + void __user *data = i.iov->iov_base + i.iov_offset; >> + u64 len = i.iov->iov_len - i.iov_offset; >> + >> + page_align = (unsigned long)data & ~PAGE_MASK; >> + >> + snapc = ci->i_snap_realm->cached_context; >> + vino = ceph_vino(inode); >> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> + vino, pos, &len, >> + 2,/*include a 'startsync' command*/ >> + CEPH_OSD_OP_WRITE, flags, snapc, >> + ci->i_truncate_seq, >> + ci->i_truncate_size, >> + false); >> + if (IS_ERR(req)) { >> + ret = PTR_ERR(req); >> + goto out; >> + } >> >> - /* write from beginning of first page, regardless of io alignment */ >> - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; >> - num_pages = calc_pages_for(page_align, len); >> - if (file->f_flags & O_DIRECT) { >> + num_pages = calc_pages_for(page_align, len); >> pages = ceph_get_direct_page_vector(data, num_pages, false); >> if (IS_ERR(pages)) { >> ret = PTR_ERR(pages); >> @@ -577,60 +573,175 @@ more: >> * may block. >> */ >> truncate_inode_pages_range(inode->i_mapping, pos, >> - (pos+len) | (PAGE_CACHE_SIZE-1)); >> - } else { >> + (pos+len) | (PAGE_CACHE_SIZE-1)); >> + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, >> + false, false); >> + >> + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> + >> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> + if (!ret) >> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> + >> + ceph_put_page_vector(pages, num_pages, false); >> + >> +out: >> + ceph_osdc_put_request(req); >> + if (ret == 0) { >> + pos += len; >> + written += len; >> + iov_iter_advance(&i, (size_t)len); >> + >> + if (pos > i_size_read(inode)) { >> + check_caps = ceph_inode_set_size(inode, pos); >> + if (check_caps) >> + ceph_check_caps(ceph_inode(inode), >> + CHECK_CAPS_AUTHONLY, >> + NULL); >> + } >> + } else >> + break; >> + } >> + >> + if (ret != -EOLDSNAPC && written > 0) { >> + iocb->ki_pos = pos; >> + ret = written; >> + } >> + return ret; >> +} >> + >> + >> +/* >> + * Synchronous write, straight from __user pointer or user pages. >> + * >> + * If write spans object boundary, just do multiple writes. (For a >> + * correct atomic write, we should e.g. take write locks on all >> + * objects, rollback on failure, etc.) >> + */ >> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, >> + unsigned long nr_segs, size_t count) >> +{ >> + struct file *file = iocb->ki_filp; >> + struct inode *inode = file_inode(file); >> + struct ceph_inode_info *ci = ceph_inode(inode); >> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); >> + struct ceph_snap_context *snapc; >> + struct ceph_vino vino; >> + struct ceph_osd_request *req; >> + struct page **pages; >> + u64 len; >> + int num_pages; >> + int written = 0; >> + int flags; >> + int check_caps = 0; >> + int ret; >> + struct timespec mtime = CURRENT_TIME; >> + loff_t pos = iocb->ki_pos; >> + struct iov_iter i; >> + >> + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) >> + return -EROFS; >> + >> + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); >> + >> + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); >> + if (ret < 0) >> + return ret; >> + >> + ret = invalidate_inode_pages2_range(inode->i_mapping, >> + pos >> PAGE_CACHE_SHIFT, >> + (pos + count) >> PAGE_CACHE_SHIFT); >> + if (ret < 0) >> + dout("invalidate_inode_pages2_range returned %d\n", ret); >> + >> + flags = CEPH_OSD_FLAG_ORDERSNAP | >> + CEPH_OSD_FLAG_ONDISK | >> + CEPH_OSD_FLAG_WRITE | >> + CEPH_OSD_FLAG_ACK; >> + >> + iov_iter_init(&i, iov, nr_segs, count, 0); >> + >> + while ((len = iov_iter_count(&i)) > 0) { >> + size_t left; >> + int n; >> + >> + snapc = ci->i_snap_realm->cached_context; >> + vino = ceph_vino(inode); >> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> + vino, pos, &len, 1, >> + CEPH_OSD_OP_WRITE, flags, snapc, >> + ci->i_truncate_seq, >> + ci->i_truncate_size, >> + false); >> + if (IS_ERR(req)) { >> + ret = PTR_ERR(req); >> + goto out; >> + } >> + >> + /* >> + * write from beginning of first page, >> + * regardless of io alignment >> + */ >> + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; >> + >> pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); >> if (IS_ERR(pages)) { >> ret = PTR_ERR(pages); >> goto out; >> } >> - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); >> + >> + left = len; >> + for (n = 0; n < num_pages; n++) { >> + size_t plen = min(left, PAGE_SIZE); >> + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); >> + if (ret != plen) { >> + ret = -EFAULT; >> + break; >> + } >> + left -= ret; >> + iov_iter_advance(&i, ret); >> + } >> + >> if (ret < 0) { >> ceph_release_page_vector(pages, num_pages); >> goto out; >> } >> >> - if ((file->f_flags & O_SYNC) == 0) { >> - /* get a second commit callback */ >> - req->r_unsafe_callback = ceph_sync_write_unsafe; >> - req->r_inode = inode; >> - own_pages = true; >> - } >> - } >> - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, >> - false, own_pages); >> + /* get a second commit callback */ >> + req->r_unsafe_callback = ceph_sync_write_unsafe; >> + req->r_inode = inode; >> >> - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, >> + false, true); >> >> - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> - if (!ret) >> - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> >> - if (file->f_flags & O_DIRECT) >> - ceph_put_page_vector(pages, num_pages, false); >> - else if (file->f_flags & O_SYNC) >> - ceph_release_page_vector(pages, num_pages); >> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> + if (!ret) >> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> >> out: >> - ceph_osdc_put_request(req); >> - if (ret == 0) { >> - pos += len; >> - written += len; >> - left -= len; >> - data += len; >> - if (left) >> - goto more; >> + ceph_osdc_put_request(req); >> + if (ret == 0) { >> + pos += len; >> + written += len; >> + >> + if (pos > i_size_read(inode)) { >> + check_caps = ceph_inode_set_size(inode, pos); >> + if (check_caps) >> + ceph_check_caps(ceph_inode(inode), >> + CHECK_CAPS_AUTHONLY, >> + NULL); >> + } >> + } else >> + break; >> + } >> >> + if (ret != -EOLDSNAPC && written > 0) { >> ret = written; >> - *ppos = pos; >> - if (pos > i_size_read(inode)) >> - check_caps = ceph_inode_set_size(inode, pos); >> - if (check_caps) >> - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, >> - NULL); >> - } else if (ret != -EOLDSNAPC && written > 0) { >> - ret = written; >> + iocb->ki_pos = pos; >> } >> return ret; >> } >> @@ -772,11 +883,13 @@ retry_snap: >> inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); >> >> if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || >> - (iocb->ki_filp->f_flags & O_DIRECT) || >> - (fi->flags & CEPH_F_SYNC)) { >> + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { >> mutex_unlock(&inode->i_mutex); >> - written = ceph_sync_write(file, iov->iov_base, count, >> - pos, &iocb->ki_pos); >> + if (file->f_flags & O_DIRECT) >> + written = ceph_sync_direct_write(iocb, iov, >> + nr_segs, count); >> + else >> + written = ceph_sync_write(iocb, iov, nr_segs, count); >> if (written == -EOLDSNAPC) { >> dout("aio_write %p %llx.%llx %llu~%u" >> "got EOLDSNAPC, retrying\n", >> >ÿôèº{.nÇ+?·?®??+%?Ëÿ±éݶ¥?wÿº{.nÇ+?·?z?ÿuëÞ?ø§¶?¡Ü¨}©?²Æ zÚ&j:+v?¨þø¯ù®w¥þ?à2?Þ?¨èÚ&¢)ß¡«a¶Úÿÿûàz¿äz¹Þ?ú+?ù???Ý¢jÿ?wèþf