On Fri, May 4, 2018 at 11:04 PM, Ilya Dryomov <idryomov@xxxxxxxxx> wrote: > dio_get_pagev_size() and dio_get_pages_alloc() introduced in commit > b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD > request") assume that the passed iov_iter is ITER_IOVEC. This isn't > the case with splice where it ends up poking into the guts of ITER_BVEC > or ITER_PIPE iterators, causing lockups and crashes easily reproduced > with generic/095. > > Rather than trying to figure out gap alignment and stuff pages into > a page vector, add a helper for going from iov_iter to a bio_vec array > and make use of the new CEPH_OSD_DATA_TYPE_BVECS code. > > Fixes: b5b98989dc7e ("ceph: combine as many iovec as possile into one OSD request") > Link: http://tracker.ceph.com/issues/18130 > Signed-off-by: Ilya Dryomov <idryomov@xxxxxxxxx> > --- > fs/ceph/file.c | 193 +++++++++++++++++++++++++++++++++------------------------ > 1 file changed, 113 insertions(+), 80 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 8ce7849f3fbd..cb5daff99816 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -69,70 +69,99 @@ static __le32 ceph_flags_sys2wire(u32 flags) > * need to wait for MDS acknowledgement. > */ > > -/* > - * Calculate the length sum of direct io vectors that can > - * be combined into one page vector. > - */ > -static size_t dio_get_pagev_size(const struct iov_iter *it) > +static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, > + struct bio_vec *bvecs) > { > - const struct iovec *iov = it->iov; > - const struct iovec *iovend = iov + it->nr_segs; > - size_t size; > - > - size = iov->iov_len - it->iov_offset; > - /* > - * An iov can be page vectored when both the current tail > - * and the next base are page aligned. > - */ > - while (PAGE_ALIGNED((iov->iov_base + iov->iov_len)) && > - (++iov < iovend && PAGE_ALIGNED((iov->iov_base)))) { > - size += iov->iov_len; > - } > - dout("dio_get_pagevlen len = %zu\n", size); > - return size; > + size_t size = 0; > + int bvec_idx = 0; > + > + if (maxsize > iov_iter_count(iter)) > + maxsize = iov_iter_count(iter); > + > + while (size < maxsize) { > + struct page *pages[64]; /* DIO_PAGES */ > + ssize_t bytes; > + size_t start; > + int idx = 0; > + > + bytes = iov_iter_get_pages(iter, pages, maxsize - size, 64, > + &start); > + if (bytes < 0) > + return size ?: bytes; > + > + iov_iter_advance(iter, bytes); > + size += bytes; > + > + for ( ; bytes; idx++, bvec_idx++) { > + struct bio_vec bv = { > + .bv_page = pages[idx], > + .bv_len = min_t(int, bytes, PAGE_SIZE - start), > + .bv_offset = start, > + }; > + > + bvecs[bvec_idx] = bv; > + bytes -= bv.bv_len; > + start = 0; > + } > + } > + > + return size; > } > > /* > - * Allocate a page vector based on (@it, @nbytes). > - * The return value is the tuple describing a page vector, > - * that is (@pages, @page_align, @num_pages). > + * iov_iter_get_pages() only considers one iov_iter segment, no matter > + * what maxsize or maxpages are given. For ITER_BVEC that is a single > + * page. > + * > + * Attempt to get up to @maxsize bytes worth of pages from @iter. > + * Return the number of bytes in the created bio_vec array, or an error. > */ > -static struct page ** > -dio_get_pages_alloc(const struct iov_iter *it, size_t nbytes, > - size_t *page_align, int *num_pages) > +static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, > + struct bio_vec **bvecs, int *num_bvecs) > { > - struct iov_iter tmp_it = *it; > - size_t align; > - struct page **pages; > - int ret = 0, idx, npages; > + struct bio_vec *bv; > + size_t orig_count = iov_iter_count(iter); > + ssize_t bytes; > + int npages; > > - align = (unsigned long)(it->iov->iov_base + it->iov_offset) & > - (PAGE_SIZE - 1); > - npages = calc_pages_for(align, nbytes); > - pages = kvmalloc(sizeof(*pages) * npages, GFP_KERNEL); > - if (!pages) > - return ERR_PTR(-ENOMEM); > + iov_iter_truncate(iter, maxsize); > + npages = iov_iter_npages(iter, INT_MAX); > + iov_iter_reexpand(iter, orig_count); > > - for (idx = 0; idx < npages; ) { > - size_t start; > - ret = iov_iter_get_pages(&tmp_it, pages + idx, nbytes, > - npages - idx, &start); > - if (ret < 0) > - goto fail; > + /* > + * __iter_get_bvecs() may populate only part of the array -- zero it > + * out. > + */ > + bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); > + if (!bv) > + return -ENOMEM; > > - iov_iter_advance(&tmp_it, ret); > - nbytes -= ret; > - idx += (ret + start + PAGE_SIZE - 1) / PAGE_SIZE; > + bytes = __iter_get_bvecs(iter, maxsize, bv); > + if (bytes < 0) { > + /* > + * No pages were pinned -- just free the array. > + */ > + kvfree(bv); > + return bytes; > } > > - BUG_ON(nbytes != 0); > - *num_pages = npages; > - *page_align = align; > - dout("dio_get_pages_alloc: got %d pages align %zu\n", npages, align); > - return pages; > -fail: > - ceph_put_page_vector(pages, idx, false); > - return ERR_PTR(ret); > + *bvecs = bv; > + *num_bvecs = npages; > + return bytes; > +} > + > +static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) > +{ > + int i; > + > + for (i = 0; i < num_bvecs; i++) { > + if (bvecs[i].bv_page) { > + if (should_dirty) > + set_page_dirty_lock(bvecs[i].bv_page); > + put_page(bvecs[i].bv_page); > + } > + } > + kvfree(bvecs); > } > > /* > @@ -746,11 +775,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > struct inode *inode = req->r_inode; > struct ceph_aio_request *aio_req = req->r_priv; > struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); > - int num_pages = calc_pages_for((u64)osd_data->alignment, > - osd_data->length); > > - dout("ceph_aio_complete_req %p rc %d bytes %llu\n", > - inode, rc, osd_data->length); > + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); > + BUG_ON(!osd_data->num_bvecs); > + > + dout("ceph_aio_complete_req %p rc %d bytes %u\n", > + inode, rc, osd_data->bvec_pos.iter.bi_size); > > if (rc == -EOLDSNAPC) { > struct ceph_aio_work *aio_work; > @@ -768,9 +798,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > } else if (!aio_req->write) { > if (rc == -ENOENT) > rc = 0; > - if (rc >= 0 && osd_data->length > rc) { > - int zoff = osd_data->alignment + rc; > - int zlen = osd_data->length - rc; > + if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { > + struct iov_iter i; > + int zlen = osd_data->bvec_pos.iter.bi_size - rc; > + > /* > * If read is satisfied by single OSD request, > * it can pass EOF. Otherwise read is within > @@ -785,13 +816,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) > aio_req->total_len = rc + zlen; > } > > - if (zlen > 0) > - ceph_zero_page_vector_range(zoff, zlen, > - osd_data->pages); > + iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs, > + osd_data->num_bvecs, > + osd_data->bvec_pos.iter.bi_size); > + iov_iter_advance(&i, rc); > + iov_iter_zero(zlen, &i); > } > } > > - ceph_put_page_vector(osd_data->pages, num_pages, aio_req->should_dirty); > + put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, > + aio_req->should_dirty); > ceph_osdc_put_request(req); > > if (rc < 0) > @@ -879,7 +913,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_vino vino; > struct ceph_osd_request *req; > - struct page **pages; > + struct bio_vec *bvecs; > struct ceph_aio_request *aio_req = NULL; > int num_pages = 0; > int flags; > @@ -914,8 +948,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > } > > while (iov_iter_count(iter) > 0) { > - u64 size = dio_get_pagev_size(iter); > - size_t start = 0; > + u64 size = iov_iter_count(iter); > ssize_t len; > > if (write) > @@ -938,13 +971,14 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > break; > } > > - len = size; > - pages = dio_get_pages_alloc(iter, len, &start, &num_pages); > - if (IS_ERR(pages)) { > + len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); > + if (len < 0) { > ceph_osdc_put_request(req); > - ret = PTR_ERR(pages); > + ret = len; > break; > } > + if (len != size) > + osd_req_op_extent_update(req, 0, len); > > /* > * To simplify error handling, allow AIO when IO within i_size > @@ -977,8 +1011,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > req->r_mtime = mtime; > } > > - osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, > - false, false); > + osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); > > if (aio_req) { > aio_req->total_len += len; > @@ -991,7 +1024,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); > > pos += len; > - iov_iter_advance(iter, len); > continue; > } > > @@ -1004,25 +1036,26 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, > if (ret == -ENOENT) > ret = 0; > if (ret >= 0 && ret < len && pos + ret < size) { > + struct iov_iter i; > int zlen = min_t(size_t, len - ret, > size - pos - ret); > - ceph_zero_page_vector_range(start + ret, zlen, > - pages); > + > + iov_iter_bvec(&i, ITER_BVEC, bvecs, num_pages, > + len); > + iov_iter_advance(&i, ret); > + iov_iter_zero(zlen, &i); > ret += zlen; > } > if (ret >= 0) > len = ret; > } > > - ceph_put_page_vector(pages, num_pages, should_dirty); > - > + put_bvecs(bvecs, num_pages, should_dirty); > ceph_osdc_put_request(req); > if (ret < 0) > break; > > pos += len; > - iov_iter_advance(iter, len); > - > if (!write && pos >= size) > break; > > -- > 2.4.3 > Reviewed-by: "Yan, Zheng" <zyan@xxxxxxxxxx> Thanks for fixing this. Yan, Zheng > -- > To unsubscribe from this list: send the line "unsubscribe ceph-devel" in > the body of a message to majordomo@xxxxxxxxxxxxxxx > More majordomo info at http://vger.kernel.org/majordomo-info.html -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html