[RFC PATCH 25/28] rds: Use sendmsg(MSG_SPLICE_PAGES) rather than sendpage

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



When transmitting data, call down into TCP using a single sendmsg with
MSG_SPLICE_PAGES to indicate that content should be spliced rather than
performing several sendmsg and sendpage calls to transmit header and data
pages.

To make this work, the data is assembled in a bio_vec array and attached to
a BVEC-type iterator.  The header are copied into memory acquired from
zcopy_alloc() which just breaks a page up into small pieces that can be
freed with put_page().

Signed-off-by: David Howells <dhowells@xxxxxxxxxx>
cc: Santosh Shilimkar <santosh.shilimkar@xxxxxxxxxx>
cc: "David S. Miller" <davem@xxxxxxxxxxxxx>
cc: Eric Dumazet <edumazet@xxxxxxxxxx>
cc: Jakub Kicinski <kuba@xxxxxxxxxx>
cc: Paolo Abeni <pabeni@xxxxxxxxxx>
cc: Jens Axboe <axboe@xxxxxxxxx>
cc: Matthew Wilcox <willy@xxxxxxxxxxxxx>
cc: linux-rdma@xxxxxxxxxxxxxxx
cc: rds-devel@xxxxxxxxxxxxxx
cc: netdev@xxxxxxxxxxxxxxx
---
 net/rds/tcp_send.c | 80 ++++++++++++++++++++--------------------------
 1 file changed, 35 insertions(+), 45 deletions(-)

diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 8c4d1d6e9249..0d6eb85a930d 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -32,6 +32,7 @@
  */
 #include <linux/kernel.h>
 #include <linux/in.h>
+#include <linux/zcopy_alloc.h>
 #include <net/tcp.h>
 
 #include "rds_single_path.h"
@@ -52,29 +53,24 @@ void rds_tcp_xmit_path_complete(struct rds_conn_path *cp)
 	tcp_sock_set_cork(tc->t_sock->sk, false);
 }
 
-/* the core send_sem serializes this with other xmit and shutdown */
-static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
-{
-	struct kvec vec = {
-		.iov_base = data,
-		.iov_len = len,
-	};
-	struct msghdr msg = {
-		.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
-	};
-
-	return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
-}
-
 /* the core send_sem serializes this with other xmit and shutdown */
 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		 unsigned int hdr_off, unsigned int sg, unsigned int off)
 {
 	struct rds_conn_path *cp = rm->m_inc.i_conn_path;
 	struct rds_tcp_connection *tc = cp->cp_transport_data;
+	struct msghdr msg = {
+		.msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL,
+	};
+	struct bio_vec *bvec;
+	unsigned int i, size = 0, ix = 0;
+	bool free_hdr = false;
 	int done = 0;
-	int ret = 0;
-	int more;
+	int ret = -ENOMEM;
+
+	bvec = kmalloc_array(1 + sg, sizeof(struct bio_vec), GFP_KERNEL);
+	if (!bvec)
+		goto out;
 
 	if (hdr_off == 0) {
 		/*
@@ -101,41 +97,30 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 		/* see rds_tcp_write_space() */
 		set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
 
-		ret = rds_tcp_sendmsg(tc->t_sock,
-				      (void *)&rm->m_inc.i_hdr + hdr_off,
-				      sizeof(rm->m_inc.i_hdr) - hdr_off);
+		ret = zcopy_memdup(sizeof(rm->m_inc.i_hdr) - hdr_off,
+				   (void *)&rm->m_inc.i_hdr + hdr_off,
+				   &bvec[ix], GFP_KERNEL);
 		if (ret < 0)
 			goto out;
-		done += ret;
-		if (hdr_off + done != sizeof(struct rds_header))
-			goto out;
+		free_hdr = true;
+		size += bvec[ix].bv_len;
+		ix++;
 	}
 
-	more = rm->data.op_nents > 1 ? (MSG_MORE | MSG_SENDPAGE_NOTLAST) : 0;
-	while (sg < rm->data.op_nents) {
-		int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-
-		ret = tc->t_sock->ops->sendpage(tc->t_sock,
-						sg_page(&rm->data.op_sg[sg]),
-						rm->data.op_sg[sg].offset + off,
-						rm->data.op_sg[sg].length - off,
-						flags);
-		rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->data.op_sg[sg]),
-			 rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off,
-			 ret);
-		if (ret <= 0)
-			break;
-
-		off += ret;
-		done += ret;
-		if (off == rm->data.op_sg[sg].length) {
-			off = 0;
-			sg++;
-		}
-		if (sg == rm->data.op_nents - 1)
-			more = 0;
+	for (i = sg; i < rm->data.op_nents; i++) {
+		bvec_set_page(&bvec[ix],
+			      sg_page(&rm->data.op_sg[i]),
+			      rm->data.op_sg[i].length - off,
+			      rm->data.op_sg[i].offset + off);
+		off = 0;
+		size += bvec[ix].bv_len;
+		ix++;
 	}
 
+	iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, bvec, ix, size);
+	ret = sock_sendmsg(tc->t_sock, &msg);
+	rdsdebug("tcp sendmsg-splice %u,%u ret %d\n", ix, size, ret);
+
 out:
 	if (ret <= 0) {
 		/* write_space will hit after EAGAIN, all else fatal */
@@ -158,6 +143,11 @@ int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
 	}
 	if (done == 0)
 		done = ret;
+	if (bvec) {
+		if (free_hdr)
+			put_page(bvec[0].bv_page);
+		kfree(bvec);
+	}
 	return done;
 }
 





[Index of Archives]     [Linux ARM Kernel]     [Linux ARM]     [Linux Omap]     [Fedora ARM]     [IETF Annouce]     [Bugtraq]     [Linux OMAP]     [Linux MIPS]     [eCos]     [Asterisk Internet PBX]     [Linux API]

  Powered by Linux