[PATCH v3 40/44] SUNRPC: Simplify TCP receive code by switching to using iterators

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Most of this code should also be reusable with other socket types.

Signed-off-by: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx>
---
 include/linux/sunrpc/xprtsock.h |  19 +-
 include/trace/events/sunrpc.h   |  15 +-
 net/sunrpc/xprtsock.c           | 694 +++++++++++++++-----------------
 3 files changed, 335 insertions(+), 393 deletions(-)

diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 005cfb6e7238..458bfe0137f5 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -31,15 +31,16 @@ struct sock_xprt {
 	 * State of TCP reply receive
 	 */
 	struct {
-		__be32		fraghdr,
+		struct {
+			__be32	fraghdr,
 				xid,
 				calldir;
+		} __attribute__((packed));
 
 		u32		offset,
 				len;
 
-		unsigned long	copied,
-				flags;
+		unsigned long	copied;
 	} recv;
 
 	/*
@@ -76,21 +77,9 @@ struct sock_xprt {
 	void			(*old_error_report)(struct sock *);
 };
 
-/*
- * TCP receive state flags
- */
-#define TCP_RCV_LAST_FRAG	(1UL << 0)
-#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
-#define TCP_RCV_COPY_XID	(1UL << 2)
-#define TCP_RCV_COPY_DATA	(1UL << 3)
-#define TCP_RCV_READ_CALLDIR	(1UL << 4)
-#define TCP_RCV_COPY_CALLDIR	(1UL << 5)
-
 /*
  * TCP RPC flags
  */
-#define TCP_RPC_REPLY		(1UL << 6)
-
 #define XPRT_SOCK_CONNECTING	1U
 #define XPRT_SOCK_DATA_READY	(2)
 #define XPRT_SOCK_UPD_TIMEOUT	(3)
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index 0aa347194e0f..19e08d12696c 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready,
 			__get_str(port), __entry->err, __entry->total)
 );
 
-#define rpc_show_sock_xprt_flags(flags) \
-	__print_flags(flags, "|", \
-		{ TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \
-		{ TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \
-		{ TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \
-		{ TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \
-		{ TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \
-		{ TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \
-		{ TCP_RPC_REPLY, "TCP_RPC_REPLY" })
-
 TRACE_EVENT(xs_tcp_data_recv,
 	TP_PROTO(struct sock_xprt *xs),
 
@@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv,
 		__string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR])
 		__string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT])
 		__field(u32, xid)
-		__field(unsigned long, flags)
 		__field(unsigned long, copied)
 		__field(unsigned int, reclen)
 		__field(unsigned long, offset)
@@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv,
 		__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
 		__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
 		__entry->xid = be32_to_cpu(xs->recv.xid);
-		__entry->flags = xs->recv.flags;
 		__entry->copied = xs->recv.copied;
 		__entry->reclen = xs->recv.len;
 		__entry->offset = xs->recv.offset;
 	),
 
-	TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu",
+	TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu",
 			__get_str(addr), __get_str(port), __entry->xid,
-			rpc_show_sock_xprt_flags(__entry->flags),
 			__entry->copied, __entry->reclen, __entry->offset)
 );
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index f16406228ead..5269ad98bb08 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -47,13 +47,13 @@
 #include <net/checksum.h>
 #include <net/udp.h>
 #include <net/tcp.h>
+#include <linux/bvec.h>
+#include <linux/uio.h>
 
 #include <trace/events/sunrpc.h>
 
 #include "sunrpc.h"
 
-#define RPC_TCP_READ_CHUNK_SZ	(3*512*1024)
-
 static void xs_close(struct rpc_xprt *xprt);
 static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
 		struct socket *sock);
@@ -325,6 +325,320 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
 		}
 }
 
+static size_t
+xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
+{
+	size_t i,n;
+
+	if (!(buf->flags & XDRBUF_SPARSE_PAGES))
+		return want;
+	if (want > buf->page_len)
+		want = buf->page_len;
+	n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
+	for (i = 0; i < n; i++) {
+		if (buf->pages[i])
+			continue;
+		buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
+		if (!buf->pages[i]) {
+			buf->page_len = (i * PAGE_SIZE) - buf->page_base;
+			return buf->page_len;
+		}
+	}
+	return want;
+}
+
+static ssize_t
+xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
+{
+	ssize_t ret;
+	if (seek != 0)
+		iov_iter_advance(&msg->msg_iter, seek);
+	ret = sock_recvmsg(sock, msg, flags);
+	return ret > 0 ? ret + seek : ret;
+}
+
+static ssize_t
+xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
+		struct kvec *kvec, size_t count, size_t seek)
+{
+	iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count);
+	return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
+		struct bio_vec *bvec, unsigned long nr, size_t count,
+		size_t seek)
+{
+	iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count);
+	return xs_sock_recvmsg(sock, msg, flags, seek);
+}
+
+static ssize_t
+xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
+		size_t count)
+{
+	struct kvec kvec = { 0 };
+	return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
+}
+
+static ssize_t
+xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
+		struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
+{
+	size_t want, seek_init = seek, offset = 0;
+	ssize_t ret;
+
+	if (seek < buf->head[0].iov_len) {
+		want = min_t(size_t, count, buf->head[0].iov_len);
+		ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+		seek = 0;
+	} else {
+		seek -= buf->head[0].iov_len;
+		offset += buf->head[0].iov_len;
+	}
+	if (buf->page_len && seek < buf->page_len) {
+		want = min_t(size_t, count - offset, buf->page_len);
+		want = xs_alloc_sparse_pages(buf, want, GFP_NOWAIT);
+		ret = xs_read_bvec(sock, msg, flags, buf->bvec,
+				xdr_buf_pagecount(buf),
+				want + buf->page_base,
+				seek + buf->page_base);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+		seek = 0;
+	} else {
+		seek -= buf->page_len;
+		offset += buf->page_len;
+	}
+	if (buf->tail[0].iov_len && seek < buf->tail[0].iov_len) {
+		want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+		ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
+		if (ret <= 0)
+			goto sock_err;
+		offset += ret;
+		if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+			goto out;
+		if (ret != want)
+			goto eagain;
+	} else
+		offset += buf->tail[0].iov_len;
+	ret = -EMSGSIZE;
+	msg->msg_flags |= MSG_TRUNC;
+out:
+	*read = offset - seek_init;
+	return ret;
+eagain:
+	ret = -EAGAIN;
+	goto out;
+sock_err:
+	offset += seek;
+	goto out;
+}
+
+static void
+xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
+{
+	if (!transport->recv.copied) {
+		if (buf->head[0].iov_len >= transport->recv.offset)
+			memcpy(buf->head[0].iov_base,
+					&transport->recv.xid,
+					transport->recv.offset);
+		transport->recv.copied = transport->recv.offset;
+	}
+}
+
+static bool
+xs_read_stream_request_done(struct sock_xprt *transport)
+{
+	return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
+}
+
+static ssize_t
+xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
+		int flags, struct rpc_rqst *req)
+{
+	struct xdr_buf *buf = &req->rq_private_buf;
+	size_t want, read;
+	ssize_t ret;
+
+	xs_read_header(transport, buf);
+
+	want = transport->recv.len - transport->recv.offset;
+	ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+			transport->recv.copied + want, transport->recv.copied,
+			&read);
+	transport->recv.offset += read;
+	transport->recv.copied += read;
+	if (transport->recv.offset == transport->recv.len) {
+		if (xs_read_stream_request_done(transport))
+			msg->msg_flags |= MSG_EOR;
+		return transport->recv.copied;
+	}
+
+	switch (ret) {
+	case -EMSGSIZE:
+		return transport->recv.copied;
+	case 0:
+		return -ESHUTDOWN;
+	default:
+		if (ret < 0)
+			return ret;
+	}
+	return -EAGAIN;
+}
+
+static size_t
+xs_read_stream_headersize(bool isfrag)
+{
+	if (isfrag)
+		return sizeof(__be32);
+	return 3 * sizeof(__be32);
+}
+
+static ssize_t
+xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
+		int flags, size_t want, size_t seek)
+{
+	struct kvec kvec = {
+		.iov_base = &transport->recv.fraghdr,
+		.iov_len = want,
+	};
+	return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
+}
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct rpc_rqst *req;
+	ssize_t ret;
+
+	/* Look up and lock the request corresponding to the given XID */
+	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
+	if (!req) {
+		printk(KERN_WARNING "Callback slot table overflowed\n");
+		return -ESHUTDOWN;
+	}
+
+	ret = xs_read_stream_request(transport, msg, flags, req);
+	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+		xprt_complete_bc_request(req, ret);
+
+	return ret;
+}
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
+static ssize_t
+xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	return -ESHUTDOWN;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+static ssize_t
+xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
+{
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct rpc_rqst *req;
+	ssize_t ret = 0;
+
+	/* Look up and lock the request corresponding to the given XID */
+	spin_lock(&xprt->queue_lock);
+	req = xprt_lookup_rqst(xprt, transport->recv.xid);
+	if (!req) {
+		msg->msg_flags |= MSG_TRUNC;
+		goto out;
+	}
+	xprt_pin_rqst(req);
+	spin_unlock(&xprt->queue_lock);
+
+	ret = xs_read_stream_request(transport, msg, flags, req);
+
+	spin_lock(&xprt->queue_lock);
+	if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
+		xprt_complete_rqst(req->rq_task, ret);
+	xprt_unpin_rqst(req);
+out:
+	spin_unlock(&xprt->queue_lock);
+	return ret;
+}
+
+static ssize_t
+xs_read_stream(struct sock_xprt *transport, int flags)
+{
+	struct msghdr msg = { 0 };
+	size_t want, read = 0;
+	ssize_t ret = 0;
+
+	if (transport->recv.len == 0) {
+		want = xs_read_stream_headersize(transport->recv.copied != 0);
+		ret = xs_read_stream_header(transport, &msg, flags, want,
+				transport->recv.offset);
+		if (ret <= 0)
+			goto out_err;
+		transport->recv.offset = ret;
+		if (ret != want) {
+			ret = -EAGAIN;
+			goto out_err;
+		}
+		transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
+			RPC_FRAGMENT_SIZE_MASK;
+		transport->recv.offset -= sizeof(transport->recv.fraghdr);
+		read = ret;
+	}
+
+	switch (be32_to_cpu(transport->recv.calldir)) {
+	case RPC_CALL:
+		ret = xs_read_stream_call(transport, &msg, flags);
+		break;
+	case RPC_REPLY:
+		ret = xs_read_stream_reply(transport, &msg, flags);
+	}
+	if (msg.msg_flags & MSG_TRUNC) {
+		transport->recv.calldir = cpu_to_be32(-1);
+		transport->recv.copied = -1;
+	}
+	if (ret < 0)
+		goto out_err;
+	read += ret;
+	if (transport->recv.offset < transport->recv.len) {
+		ret = xs_read_discard(transport->sock, &msg, flags,
+				transport->recv.len - transport->recv.offset);
+		if (ret <= 0)
+			goto out_err;
+		transport->recv.offset += ret;
+		read += ret;
+	}
+	if (xs_read_stream_request_done(transport)) {
+		trace_xs_tcp_data_recv(transport);
+		transport->recv.copied = 0;
+	}
+	transport->recv.offset = 0;
+	transport->recv.len = 0;
+	return read;
+out_err:
+	switch (ret) {
+	case 0:
+	case -ESHUTDOWN:
+		xprt_force_disconnect(&transport->xprt);
+		return -ESHUTDOWN;
+	}
+	return ret;
+}
+
 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -484,6 +798,12 @@ static int xs_nospace(struct rpc_rqst *req)
 	return ret;
 }
 
+static void
+xs_stream_prepare_request(struct rpc_rqst *req)
+{
+	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+}
+
 /*
  * Determine if the previous message in the stream was aborted before it
  * could complete transmission.
@@ -1157,263 +1477,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
 	xprt_force_disconnect(xprt);
 }
 
-static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-	size_t len, used;
-	char *p;
-
-	p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
-	len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-
-	transport->recv.len = ntohl(transport->recv.fraghdr);
-	if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
-		transport->recv.flags |= TCP_RCV_LAST_FRAG;
-	else
-		transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
-	transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
-
-	transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
-	transport->recv.offset = 0;
-
-	/* Sanity check of the record length */
-	if (unlikely(transport->recv.len < 8)) {
-		dprintk("RPC:       invalid TCP record fragment length\n");
-		xs_tcp_force_close(xprt);
-		return;
-	}
-	dprintk("RPC:       reading TCP record fragment of length %d\n",
-			transport->recv.len);
-}
-
-static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
-{
-	if (transport->recv.offset == transport->recv.len) {
-		transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
-		transport->recv.offset = 0;
-		if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
-			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-			transport->recv.flags |= TCP_RCV_COPY_XID;
-			transport->recv.copied = 0;
-		}
-	}
-}
-
-static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-	size_t len, used;
-	char *p;
-
-	len = sizeof(transport->recv.xid) - transport->recv.offset;
-	dprintk("RPC:       reading XID (%zu bytes)\n", len);
-	p = ((char *) &transport->recv.xid) + transport->recv.offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-	transport->recv.flags &= ~TCP_RCV_COPY_XID;
-	transport->recv.flags |= TCP_RCV_READ_CALLDIR;
-	transport->recv.copied = 4;
-	dprintk("RPC:       reading %s XID %08x\n",
-			(transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
-							      : "request with",
-			ntohl(transport->recv.xid));
-	xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
-				       struct xdr_skb_reader *desc)
-{
-	size_t len, used;
-	u32 offset;
-	char *p;
-
-	/*
-	 * We want transport->recv.offset to be 8 at the end of this routine
-	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
-	 * When this function is called for the first time,
-	 * transport->recv.offset is 4 (after having already read the xid).
-	 */
-	offset = transport->recv.offset - sizeof(transport->recv.xid);
-	len = sizeof(transport->recv.calldir) - offset;
-	dprintk("RPC:       reading CALL/REPLY flag (%zu bytes)\n", len);
-	p = ((char *) &transport->recv.calldir) + offset;
-	used = xdr_skb_read_bits(desc, p, len);
-	transport->recv.offset += used;
-	if (used != len)
-		return;
-	transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
-	/*
-	 * We don't yet have the XDR buffer, so we will write the calldir
-	 * out after we get the buffer from the 'struct rpc_rqst'
-	 */
-	switch (ntohl(transport->recv.calldir)) {
-	case RPC_REPLY:
-		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-		transport->recv.flags |= TCP_RCV_COPY_DATA;
-		transport->recv.flags |= TCP_RPC_REPLY;
-		break;
-	case RPC_CALL:
-		transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
-		transport->recv.flags |= TCP_RCV_COPY_DATA;
-		transport->recv.flags &= ~TCP_RPC_REPLY;
-		break;
-	default:
-		dprintk("RPC:       invalid request message type\n");
-		xs_tcp_force_close(&transport->xprt);
-	}
-	xs_tcp_check_fraghdr(transport);
-}
-
-static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
-				     struct xdr_skb_reader *desc,
-				     struct rpc_rqst *req)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct xdr_buf *rcvbuf;
-	size_t len;
-	ssize_t r;
-
-	rcvbuf = &req->rq_private_buf;
-
-	if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
-		/*
-		 * Save the RPC direction in the XDR buffer
-		 */
-		memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
-			&transport->recv.calldir,
-			sizeof(transport->recv.calldir));
-		transport->recv.copied += sizeof(transport->recv.calldir);
-		transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
-	}
-
-	len = desc->count;
-	if (len > transport->recv.len - transport->recv.offset)
-		desc->count = transport->recv.len - transport->recv.offset;
-	r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
-					  desc, xdr_skb_read_bits);
-
-	if (desc->count) {
-		/* Error when copying to the receive buffer,
-		 * usually because we weren't able to allocate
-		 * additional buffer pages. All we can do now
-		 * is turn off TCP_RCV_COPY_DATA, so the request
-		 * will not receive any additional updates,
-		 * and time out.
-		 * Any remaining data from this record will
-		 * be discarded.
-		 */
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-		dprintk("RPC:       XID %08x truncated request\n",
-				ntohl(transport->recv.xid));
-		dprintk("RPC:       xprt = %p, recv.copied = %lu, "
-				"recv.offset = %u, recv.len = %u\n",
-				xprt, transport->recv.copied,
-				transport->recv.offset, transport->recv.len);
-		return;
-	}
-
-	transport->recv.copied += r;
-	transport->recv.offset += r;
-	desc->count = len - r;
-
-	dprintk("RPC:       XID %08x read %zd bytes\n",
-			ntohl(transport->recv.xid), r);
-	dprintk("RPC:       xprt = %p, recv.copied = %lu, recv.offset = %u, "
-			"recv.len = %u\n", xprt, transport->recv.copied,
-			transport->recv.offset, transport->recv.len);
-
-	if (transport->recv.copied == req->rq_private_buf.buflen)
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	else if (transport->recv.offset == transport->recv.len) {
-		if (transport->recv.flags & TCP_RCV_LAST_FRAG)
-			transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	}
-}
-
-/*
- * Finds the request corresponding to the RPC xid and invokes the common
- * tcp read code to read the data.
- */
-static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
-				    struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct rpc_rqst *req;
-
-	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->recv.xid));
-
-	/* Find and lock the request corresponding to this xid */
-	spin_lock(&xprt->queue_lock);
-	req = xprt_lookup_rqst(xprt, transport->recv.xid);
-	if (!req) {
-		dprintk("RPC:       XID %08x request not found!\n",
-				ntohl(transport->recv.xid));
-		spin_unlock(&xprt->queue_lock);
-		return -1;
-	}
-	xprt_pin_rqst(req);
-	spin_unlock(&xprt->queue_lock);
-
-	xs_tcp_read_common(xprt, desc, req);
-
-	spin_lock(&xprt->queue_lock);
-	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-		xprt_complete_rqst(req->rq_task, transport->recv.copied);
-	xprt_unpin_rqst(req);
-	spin_unlock(&xprt->queue_lock);
-	return 0;
-}
-
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
-/*
- * Obtains an rpc_rqst previously allocated and invokes the common
- * tcp read code to read the data.  The result is placed in the callback
- * queue.
- * If we're unable to obtain the rpc_rqst we schedule the closing of the
- * connection and return -1.
- */
-static int xs_tcp_read_callback(struct rpc_xprt *xprt,
-				       struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct rpc_rqst *req;
-
-	/* Look up the request corresponding to the given XID */
-	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
-	if (req == NULL) {
-		printk(KERN_WARNING "Callback slot table overflowed\n");
-		xprt_force_disconnect(xprt);
-		return -1;
-	}
-
-	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
-	xs_tcp_read_common(xprt, desc, req);
-
-	if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
-		xprt_complete_bc_request(req, transport->recv.copied);
-
-	return 0;
-}
-
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-					struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-
-	return (transport->recv.flags & TCP_RPC_REPLY) ?
-		xs_tcp_read_reply(xprt, desc) :
-		xs_tcp_read_callback(xprt, desc);
-}
-
 static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 {
 	int ret;
@@ -1429,106 +1493,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
 {
 	return PAGE_SIZE;
 }
-#else
-static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
-					struct xdr_skb_reader *desc)
-{
-	return xs_tcp_read_reply(xprt, desc);
-}
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
-/*
- * Read data off the transport.  This can be either an RPC_CALL or an
- * RPC_REPLY.  Relay the processing to helper functions.
- */
-static void xs_tcp_read_data(struct rpc_xprt *xprt,
-				    struct xdr_skb_reader *desc)
-{
-	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-
-	if (_xs_tcp_read_data(xprt, desc) == 0)
-		xs_tcp_check_fraghdr(transport);
-	else {
-		/*
-		 * The transport_lock protects the request handling.
-		 * There's no need to hold it to update the recv.flags.
-		 */
-		transport->recv.flags &= ~TCP_RCV_COPY_DATA;
-	}
-}
-
-static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
-{
-	size_t len;
-
-	len = transport->recv.len - transport->recv.offset;
-	if (len > desc->count)
-		len = desc->count;
-	desc->count -= len;
-	desc->offset += len;
-	transport->recv.offset += len;
-	dprintk("RPC:       discarded %zu bytes\n", len);
-	xs_tcp_check_fraghdr(transport);
-}
-
-static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
-{
-	struct rpc_xprt *xprt = rd_desc->arg.data;
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-	struct xdr_skb_reader desc = {
-		.skb	= skb,
-		.offset	= offset,
-		.count	= len,
-	};
-	size_t ret;
-
-	dprintk("RPC:       xs_tcp_data_recv started\n");
-	do {
-		trace_xs_tcp_data_recv(transport);
-		/* Read in a new fragment marker if necessary */
-		/* Can we ever really expect to get completely empty fragments? */
-		if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
-			xs_tcp_read_fraghdr(xprt, &desc);
-			continue;
-		}
-		/* Read in the xid if necessary */
-		if (transport->recv.flags & TCP_RCV_COPY_XID) {
-			xs_tcp_read_xid(transport, &desc);
-			continue;
-		}
-		/* Read in the call/reply flag */
-		if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
-			xs_tcp_read_calldir(transport, &desc);
-			continue;
-		}
-		/* Read in the request data */
-		if (transport->recv.flags & TCP_RCV_COPY_DATA) {
-			xs_tcp_read_data(xprt, &desc);
-			continue;
-		}
-		/* Skip over any trailing bytes on short reads */
-		xs_tcp_read_discard(transport, &desc);
-	} while (desc.count);
-	ret = len - desc.count;
-	if (ret < rd_desc->count)
-		rd_desc->count -= ret;
-	else
-		rd_desc->count = 0;
-	trace_xs_tcp_data_recv(transport);
-	dprintk("RPC:       xs_tcp_data_recv done\n");
-	return ret;
-}
-
 static void xs_tcp_data_receive(struct sock_xprt *transport)
 {
 	struct rpc_xprt *xprt = &transport->xprt;
 	struct sock *sk;
-	read_descriptor_t rd_desc = {
-		.arg.data = xprt,
-	};
-	unsigned long total = 0;
-	int read = 0;
+	size_t read = 0;
+	ssize_t ret = 0;
 
 restart:
 	mutex_lock(&transport->recv_mutex);
@@ -1536,18 +1508,12 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 	if (sk == NULL)
 		goto out;
 
-	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
 	for (;;) {
-		rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
-		lock_sock(sk);
-		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		if (rd_desc.count != 0 || read < 0) {
-			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
-			release_sock(sk);
+		clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+		ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL);
+		if (ret < 0)
 			break;
-		}
-		release_sock(sk);
-		total += read;
+		read += ret;
 		if (need_resched()) {
 			mutex_unlock(&transport->recv_mutex);
 			cond_resched();
@@ -1558,7 +1524,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 		queue_work(xprtiod_workqueue, &transport->recv_worker);
 out:
 	mutex_unlock(&transport->recv_mutex);
-	trace_xs_tcp_data_ready(xprt, read, total);
+	trace_xs_tcp_data_ready(xprt, ret, read);
 }
 
 static void xs_tcp_data_receive_workfn(struct work_struct *work)
@@ -2380,7 +2346,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 	transport->recv.offset = 0;
 	transport->recv.len = 0;
 	transport->recv.copied = 0;
-	transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
 	transport->xmit.offset = 0;
 
 	/* Tell the socket layer to start connecting... */
@@ -2802,6 +2767,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
 	.connect		= xs_connect,
 	.buf_alloc		= rpc_malloc,
 	.buf_free		= rpc_free,
+	.prepare_request	= xs_stream_prepare_request,
 	.send_request		= xs_tcp_send_request,
 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
 	.close			= xs_tcp_shutdown,
-- 
2.17.1




[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux