Don't use the messenger-v1 Tx loop for databuf/iter data blobs, which sends page fragments individually, but rather pass the entire iterator to the socket in one go. This uses the loop inside of tcp_sendmsg() to do the work and allows TCP to make better choices. Signed-off-by: David Howells <dhowells@xxxxxxxxxx> cc: Viacheslav Dubeyko <slava@xxxxxxxxxxx> cc: Alex Markuze <amarkuze@xxxxxxxxxx> cc: Ilya Dryomov <idryomov@xxxxxxxxx> cc: ceph-devel@xxxxxxxxxxxxxxx cc: linux-fsdevel@xxxxxxxxxxxxxxx --- include/linux/ceph/messenger.h | 1 + net/ceph/messenger.c | 1 + net/ceph/messenger_v1.c | 76 ++++++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 864aad369c91..1b646d0dff39 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -255,6 +255,7 @@ struct ceph_msg_data_cursor { }; struct { struct iov_iter iov_iter; + struct iov_iter crc_iter; unsigned int lastlen; }; }; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 02439b38ec94..dc8082575e4f 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -975,6 +975,7 @@ static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor, struct ceph_msg_data *data = cursor->data; cursor->iov_iter = data->iter; + cursor->crc_iter = data->iter; cursor->lastlen = 0; iov_iter_truncate(&cursor->iov_iter, length); cursor->resid = iov_iter_count(&cursor->iov_iter); diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c index 0cb61c76b9b8..d6464ac62b09 100644 --- a/net/ceph/messenger_v1.c +++ b/net/ceph/messenger_v1.c @@ -3,6 +3,7 @@ #include <linux/bvec.h> #include <linux/crc32c.h> +#include <linux/iov_iter.h> #include <linux/net.h> #include <linux/socket.h> #include <net/sock.h> @@ -74,6 +75,21 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, return r; } +static int ceph_tcp_sock_sendmsg(struct socket *sock, struct iov_iter *iter, + unsigned int flags) +{ + struct msghdr msg = { + .msg_iter = *iter, + .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | flags, + }; + int r; + + r = sock_sendmsg(sock, &msg); + if (r == -EAGAIN) + r = 0; + return r; +} + /* * @more: MSG_MORE or 0. */ @@ -455,6 +471,24 @@ static int write_partial_kvec(struct ceph_connection *con) return ret; /* done! */ } +static size_t ceph_crc_from_iter(void *iter_from, size_t progress, + size_t len, void *priv, void *priv2) +{ + u32 *crc = priv; + + *crc = crc32c(*crc, iter_from, len); + return 0; +} + +static void ceph_calc_crc(struct iov_iter *iter, size_t count, u32 *crc) +{ + size_t done; + + done = iterate_and_advance_kernel(iter, count, crc, NULL, + ceph_crc_from_iter); + WARN_ON(done != count); +} + /* * Write as much message data payload as we can. If we finish, queue * up the footer. @@ -467,7 +501,7 @@ static int write_partial_message_data(struct ceph_connection *con) struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - u32 crc; + u32 crc = 0; dout("%s %p msg %p\n", __func__, con, msg); @@ -484,9 +518,6 @@ static int write_partial_message_data(struct ceph_connection *con) */ crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; while (cursor->total_resid) { - struct page *page; - size_t page_offset; - size_t length; int ret; if (!cursor->resid) { @@ -494,17 +525,36 @@ static int write_partial_message_data(struct ceph_connection *con) continue; } - page = ceph_msg_data_next(cursor, &page_offset, &length); - ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, - MSG_MORE); - if (ret <= 0) { - if (do_datacrc) - msg->footer.data_crc = cpu_to_le32(crc); + if (cursor->data->type == CEPH_MSG_DATA_DATABUF || + cursor->data->type == CEPH_MSG_DATA_ITER) { + ret = ceph_tcp_sock_sendmsg(con->sock, &cursor->iov_iter, + MSG_MORE); + if (ret <= 0) { + if (do_datacrc) + msg->footer.data_crc = cpu_to_le32(crc); - return ret; + return ret; + } + if (do_datacrc && cursor->need_crc) + ceph_calc_crc(&cursor->crc_iter, ret, &crc); + } else { + struct page *page; + size_t page_offset; + size_t length; + + page = ceph_msg_data_next(cursor, &page_offset, &length); + ret = ceph_tcp_sendpage(con->sock, page, page_offset, + length, MSG_MORE); + if (ret <= 0) { + if (do_datacrc) + msg->footer.data_crc = cpu_to_le32(crc); + + return ret; + } + if (do_datacrc && cursor->need_crc) + crc = ceph_crc32c_page(crc, page, page_offset, + length); } - if (do_datacrc && cursor->need_crc) - crc = ceph_crc32c_page(crc, page, page_offset, length); ceph_msg_data_advance(cursor, (size_t)ret); }