Use sendmsg() and MSG_SPLICE_PAGES rather than sendpage in ceph when transmitting data. For the moment, this can only transmit one page at a time because of the architecture of net/ceph/, but if write_partial_message_data() can be given a bvec[] at a time by the iteration code, this would allow pages to be sent in a batch. Signed-off-by: David Howells <dhowells@xxxxxxxxxx> cc: Ilya Dryomov <idryomov@xxxxxxxxx> cc: Xiubo Li <xiubli@xxxxxxxxxx> cc: Jeff Layton <jlayton@xxxxxxxxxx> cc: "David S. Miller" <davem@xxxxxxxxxxxxx> cc: Eric Dumazet <edumazet@xxxxxxxxxx> cc: Jakub Kicinski <kuba@xxxxxxxxxx> cc: Paolo Abeni <pabeni@xxxxxxxxxx> cc: Jens Axboe <axboe@xxxxxxxxx> cc: Matthew Wilcox <willy@xxxxxxxxxxxxx> cc: ceph-devel@xxxxxxxxxxxxxxx cc: netdev@xxxxxxxxxxxxxxx --- net/ceph/messenger_v1.c | 58 ++++++++++++++--------------------------- 1 file changed, 19 insertions(+), 39 deletions(-) diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c index d664cb1593a7..b2d801a49122 100644 --- a/net/ceph/messenger_v1.c +++ b/net/ceph/messenger_v1.c @@ -74,37 +74,6 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, return r; } -/* - * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST - */ -static int ceph_tcp_sendpage(struct socket *sock, struct page *page, - int offset, size_t size, int more) -{ - ssize_t (*sendpage)(struct socket *sock, struct page *page, - int offset, size_t size, int flags); - int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; - int ret; - - /* - * sendpage cannot properly handle pages with page_count == 0, - * we need to fall back to sendmsg if that's the case. - * - * Same goes for slab pages: skb_can_coalesce() allows - * coalescing neighboring slab objects into a single frag which - * triggers one of hardened usercopy checks. - */ - if (sendpage_ok(page)) - sendpage = sock->ops->sendpage; - else - sendpage = sock_no_sendpage; - - ret = sendpage(sock, page, offset, size, flags); - if (ret == -EAGAIN) - ret = 0; - - return ret; -} - static void con_out_kvec_reset(struct ceph_connection *con) { BUG_ON(con->v1.out_skip); @@ -464,7 +433,6 @@ static int write_partial_message_data(struct ceph_connection *con) struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; u32 crc; dout("%s %p msg %p\n", __func__, con, msg); @@ -482,6 +450,10 @@ static int write_partial_message_data(struct ceph_connection *con) */ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; while (cursor->total_resid) { + struct bio_vec bvec; + struct msghdr msghdr = { + .msg_flags = MSG_SPLICE_PAGES | MSG_SENDPAGE_NOTLAST, + }; struct page *page; size_t page_offset; size_t length; @@ -494,9 +466,12 @@ static int write_partial_message_data(struct ceph_connection *con) page = ceph_msg_data_next(cursor, &page_offset, &length); if (length == cursor->total_resid) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, - more); + msghdr.msg_flags |= MSG_MORE; + + bvec_set_page(&bvec, page, length, page_offset); + iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, length); + + ret = sock_sendmsg(con->sock, &msghdr); if (ret <= 0) { if (do_datacrc) msg->footer.data_crc = cpu_to_le32(crc); @@ -526,7 +501,10 @@ static int write_partial_message_data(struct ceph_connection *con) */ static int write_partial_skip(struct ceph_connection *con) { - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; + struct bio_vec bvec; + struct msghdr msghdr = { + .msg_flags = MSG_SPLICE_PAGES | MSG_SENDPAGE_NOTLAST | MSG_MORE, + }; int ret; dout("%s %p %d left\n", __func__, con, con->v1.out_skip); @@ -534,9 +512,11 @@ static int write_partial_skip(struct ceph_connection *con) size_t size = min(con->v1.out_skip, (int)PAGE_SIZE); if (size == con->v1.out_skip) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, - more); + msghdr.msg_flags &= ~MSG_SENDPAGE_NOTLAST; + bvec_set_page(&bvec, ZERO_PAGE(0), size, 0); + iov_iter_bvec(&msghdr.msg_iter, ITER_SOURCE, &bvec, 1, size); + + ret = sock_sendmsg(con->sock, &msghdr); if (ret <= 0) goto out; con->v1.out_skip -= ret;