On Mon, 2018-09-17 at 09:03 -0400, Trond Myklebust wrote: > Most of this code should also be reusable with other socket types. > > Signed-off-by: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx> > --- > include/linux/sunrpc/xprtsock.h | 19 +- > include/trace/events/sunrpc.h | 15 +- > net/sunrpc/xprtsock.c | 694 +++++++++++++++--------------- > -- > 3 files changed, 335 insertions(+), 393 deletions(-) > > diff --git a/include/linux/sunrpc/xprtsock.h > b/include/linux/sunrpc/xprtsock.h > index 005cfb6e7238..458bfe0137f5 100644 > --- a/include/linux/sunrpc/xprtsock.h > +++ b/include/linux/sunrpc/xprtsock.h > @@ -31,15 +31,16 @@ struct sock_xprt { > * State of TCP reply receive > */ > struct { > - __be32 fraghdr, > + struct { > + __be32 fraghdr, > xid, > calldir; > + } __attribute__((packed)); > > u32 offset, > len; > > - unsigned long copied, > - flags; > + unsigned long copied; > } recv; > > /* > @@ -76,21 +77,9 @@ struct sock_xprt { > void (*old_error_report)(struct sock *); > }; > > -/* > - * TCP receive state flags > - */ > -#define TCP_RCV_LAST_FRAG (1UL << 0) > -#define TCP_RCV_COPY_FRAGHDR (1UL << 1) > -#define TCP_RCV_COPY_XID (1UL << 2) > -#define TCP_RCV_COPY_DATA (1UL << 3) > -#define TCP_RCV_READ_CALLDIR (1UL << 4) > -#define TCP_RCV_COPY_CALLDIR (1UL << 5) > - > /* > * TCP RPC flags > */ > -#define TCP_RPC_REPLY (1UL << 6) > - > #define XPRT_SOCK_CONNECTING 1U > #define XPRT_SOCK_DATA_READY (2) > #define XPRT_SOCK_UPD_TIMEOUT (3) > diff --git a/include/trace/events/sunrpc.h > b/include/trace/events/sunrpc.h > index 0aa347194e0f..19e08d12696c 100644 > --- a/include/trace/events/sunrpc.h > +++ b/include/trace/events/sunrpc.h > @@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready, > __get_str(port), __entry->err, __entry->total) > ); > > -#define rpc_show_sock_xprt_flags(flags) \ > - __print_flags(flags, "|", \ > - { TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \ > - { TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \ > - { TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \ > - { TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \ > - { TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \ > - { TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \ > - { TCP_RPC_REPLY, "TCP_RPC_REPLY" }) > - > TRACE_EVENT(xs_tcp_data_recv, > TP_PROTO(struct sock_xprt *xs), > > @@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv, > __string(addr, xs- > >xprt.address_strings[RPC_DISPLAY_ADDR]) > __string(port, xs- > >xprt.address_strings[RPC_DISPLAY_PORT]) > __field(u32, xid) > - __field(unsigned long, flags) > __field(unsigned long, copied) > __field(unsigned int, reclen) > __field(unsigned long, offset) > @@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv, > __assign_str(addr, xs- > >xprt.address_strings[RPC_DISPLAY_ADDR]); > __assign_str(port, xs- > >xprt.address_strings[RPC_DISPLAY_PORT]); > __entry->xid = be32_to_cpu(xs->recv.xid); > - __entry->flags = xs->recv.flags; > __entry->copied = xs->recv.copied; > __entry->reclen = xs->recv.len; > __entry->offset = xs->recv.offset; > ), > > - TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu > reclen=%u offset=%lu", > + TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u > offset=%lu", > __get_str(addr), __get_str(port), __entry->xid, > - rpc_show_sock_xprt_flags(__entry->flags), > __entry->copied, __entry->reclen, __entry- > >offset) > ); > > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c > index f16406228ead..5269ad98bb08 100644 > --- a/net/sunrpc/xprtsock.c > +++ b/net/sunrpc/xprtsock.c > @@ -47,13 +47,13 @@ > #include <net/checksum.h> > #include <net/udp.h> > #include <net/tcp.h> > +#include <linux/bvec.h> > +#include <linux/uio.h> > > #include <trace/events/sunrpc.h> > > #include "sunrpc.h" > > -#define RPC_TCP_READ_CHUNK_SZ (3*512*1024) > - > static void xs_close(struct rpc_xprt *xprt); > static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, > struct socket *sock); > @@ -325,6 +325,320 @@ static void xs_free_peer_addresses(struct > rpc_xprt *xprt) > } > } > > +static size_t > +xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp) > +{ > + size_t i,n; > + > + if (!(buf->flags & XDRBUF_SPARSE_PAGES)) > + return want; > + if (want > buf->page_len) > + want = buf->page_len; > + n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT; > + for (i = 0; i < n; i++) { > + if (buf->pages[i]) > + continue; > + buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp); > + if (!buf->pages[i]) { > + buf->page_len = (i * PAGE_SIZE) - buf- > >page_base; > + return buf->page_len; > + } > + } > + return want; > +} > + > +static ssize_t > +xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, > size_t seek) > +{ > + ssize_t ret; > + if (seek != 0) > + iov_iter_advance(&msg->msg_iter, seek); > + ret = sock_recvmsg(sock, msg, flags); > + return ret > 0 ? ret + seek : ret; > +} > + > +static ssize_t > +xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags, > + struct kvec *kvec, size_t count, size_t seek) > +{ > + iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, > count); > + return xs_sock_recvmsg(sock, msg, flags, seek); > +} > + > +static ssize_t > +xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags, > + struct bio_vec *bvec, unsigned long nr, size_t count, > + size_t seek) > +{ > + iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, > count); > + return xs_sock_recvmsg(sock, msg, flags, seek); > +} > + > +static ssize_t > +xs_read_discard(struct socket *sock, struct msghdr *msg, int flags, > + size_t count) > +{ > + struct kvec kvec = { 0 }; > + return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, > 0); > +} > + > +static ssize_t > +xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, > + struct xdr_buf *buf, size_t count, size_t seek, size_t > *read) > +{ > + size_t want, seek_init = seek, offset = 0; > + ssize_t ret; > + > + if (seek < buf->head[0].iov_len) { > + want = min_t(size_t, count, buf->head[0].iov_len); > + ret = xs_read_kvec(sock, msg, flags, &buf->head[0], > want, seek); > + if (ret <= 0) > + goto sock_err; > + offset += ret; > + if (offset == count || msg->msg_flags & > (MSG_EOR|MSG_TRUNC)) > + goto out; > + if (ret != want) > + goto eagain; > + seek = 0; > + } else { > + seek -= buf->head[0].iov_len; > + offset += buf->head[0].iov_len; > + } > + if (buf->page_len && seek < buf->page_len) { > + want = min_t(size_t, count - offset, buf->page_len); > + want = xs_alloc_sparse_pages(buf, want, GFP_NOWAIT); > + ret = xs_read_bvec(sock, msg, flags, buf->bvec, > + xdr_buf_pagecount(buf), > + want + buf->page_base, > + seek + buf->page_base); > + if (ret <= 0) > + goto sock_err; > + offset += ret; There is a bug here that has been fixed up in the linux-nfs.org testing branch. > + if (offset == count || msg->msg_flags & > (MSG_EOR|MSG_TRUNC)) > + goto out; > + if (ret != want) > + goto eagain; > + seek = 0; > + } else { > + seek -= buf->page_len; > + offset += buf->page_len; > + } > + if (buf->tail[0].iov_len && seek < buf->tail[0].iov_len) { > + want = min_t(size_t, count - offset, buf- > >tail[0].iov_len); > + ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], > want, seek); > + if (ret <= 0) > + goto sock_err; > + offset += ret; > + if (offset == count || msg->msg_flags & > (MSG_EOR|MSG_TRUNC)) > + goto out; > + if (ret != want) > + goto eagain; > + } else > + offset += buf->tail[0].iov_len; > + ret = -EMSGSIZE; > + msg->msg_flags |= MSG_TRUNC; > +out: > + *read = offset - seek_init; > + return ret; > +eagain: > + ret = -EAGAIN; > + goto out; > +sock_err: > + offset += seek; > + goto out; > +} > + > +static void > +xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf) > +{ > + if (!transport->recv.copied) { > + if (buf->head[0].iov_len >= transport->recv.offset) > + memcpy(buf->head[0].iov_base, > + &transport->recv.xid, > + transport->recv.offset); > + transport->recv.copied = transport->recv.offset; > + } > +} > + > +static bool > +xs_read_stream_request_done(struct sock_xprt *transport) > +{ > + return transport->recv.fraghdr & > cpu_to_be32(RPC_LAST_STREAM_FRAGMENT); > +} > + > +static ssize_t > +xs_read_stream_request(struct sock_xprt *transport, struct msghdr > *msg, > + int flags, struct rpc_rqst *req) > +{ > + struct xdr_buf *buf = &req->rq_private_buf; > + size_t want, read; > + ssize_t ret; > + > + xs_read_header(transport, buf); > + > + want = transport->recv.len - transport->recv.offset; > + ret = xs_read_xdr_buf(transport->sock, msg, flags, buf, > + transport->recv.copied + want, transport- > >recv.copied, > + &read); > + transport->recv.offset += read; > + transport->recv.copied += read; > + if (transport->recv.offset == transport->recv.len) { > + if (xs_read_stream_request_done(transport)) > + msg->msg_flags |= MSG_EOR; > + return transport->recv.copied; > + } > + > + switch (ret) { > + case -EMSGSIZE: > + return transport->recv.copied; > + case 0: > + return -ESHUTDOWN; > + default: > + if (ret < 0) > + return ret; > + } > + return -EAGAIN; > +} > + > +static size_t > +xs_read_stream_headersize(bool isfrag) > +{ > + if (isfrag) > + return sizeof(__be32); > + return 3 * sizeof(__be32); > +} > + > +static ssize_t > +xs_read_stream_header(struct sock_xprt *transport, struct msghdr > *msg, > + int flags, size_t want, size_t seek) > +{ > + struct kvec kvec = { > + .iov_base = &transport->recv.fraghdr, > + .iov_len = want, > + }; > + return xs_read_kvec(transport->sock, msg, flags, &kvec, want, > seek); > +} > + > +#if defined(CONFIG_SUNRPC_BACKCHANNEL) > +static ssize_t > +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, > int flags) > +{ > + struct rpc_xprt *xprt = &transport->xprt; > + struct rpc_rqst *req; > + ssize_t ret; > + > + /* Look up and lock the request corresponding to the given XID > */ > + req = xprt_lookup_bc_request(xprt, transport->recv.xid); > + if (!req) { > + printk(KERN_WARNING "Callback slot table > overflowed\n"); > + return -ESHUTDOWN; > + } > + > + ret = xs_read_stream_request(transport, msg, flags, req); > + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) > + xprt_complete_bc_request(req, ret); > + > + return ret; > +} > +#else /* CONFIG_SUNRPC_BACKCHANNEL */ > +static ssize_t > +xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, > int flags) > +{ > + return -ESHUTDOWN; > +} > +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ > + > +static ssize_t > +xs_read_stream_reply(struct sock_xprt *transport, struct msghdr > *msg, int flags) > +{ > + struct rpc_xprt *xprt = &transport->xprt; > + struct rpc_rqst *req; > + ssize_t ret = 0; > + > + /* Look up and lock the request corresponding to the given XID > */ > + spin_lock(&xprt->queue_lock); > + req = xprt_lookup_rqst(xprt, transport->recv.xid); > + if (!req) { > + msg->msg_flags |= MSG_TRUNC; > + goto out; > + } > + xprt_pin_rqst(req); > + spin_unlock(&xprt->queue_lock); > + > + ret = xs_read_stream_request(transport, msg, flags, req); > + > + spin_lock(&xprt->queue_lock); > + if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) > + xprt_complete_rqst(req->rq_task, ret); > + xprt_unpin_rqst(req); > +out: > + spin_unlock(&xprt->queue_lock); > + return ret; > +} > + > +static ssize_t > +xs_read_stream(struct sock_xprt *transport, int flags) > +{ > + struct msghdr msg = { 0 }; > + size_t want, read = 0; > + ssize_t ret = 0; > + > + if (transport->recv.len == 0) { > + want = xs_read_stream_headersize(transport->recv.copied > != 0); > + ret = xs_read_stream_header(transport, &msg, flags, > want, > + transport->recv.offset); > + if (ret <= 0) > + goto out_err; > + transport->recv.offset = ret; > + if (ret != want) { > + ret = -EAGAIN; > + goto out_err; > + } > + transport->recv.len = be32_to_cpu(transport- > >recv.fraghdr) & > + RPC_FRAGMENT_SIZE_MASK; > + transport->recv.offset -= sizeof(transport- > >recv.fraghdr); > + read = ret; > + } > + > + switch (be32_to_cpu(transport->recv.calldir)) { > + case RPC_CALL: > + ret = xs_read_stream_call(transport, &msg, flags); > + break; > + case RPC_REPLY: > + ret = xs_read_stream_reply(transport, &msg, flags); > + } > + if (msg.msg_flags & MSG_TRUNC) { > + transport->recv.calldir = cpu_to_be32(-1); > + transport->recv.copied = -1; > + } > + if (ret < 0) > + goto out_err; > + read += ret; > + if (transport->recv.offset < transport->recv.len) { > + ret = xs_read_discard(transport->sock, &msg, flags, > + transport->recv.len - transport- > >recv.offset); > + if (ret <= 0) > + goto out_err; > + transport->recv.offset += ret; > + read += ret; ...and another bug here. > + } > + if (xs_read_stream_request_done(transport)) { > + trace_xs_tcp_data_recv(transport); > + transport->recv.copied = 0; > + } > + transport->recv.offset = 0; > + transport->recv.len = 0; > + return read; > +out_err: > + switch (ret) { > + case 0: > + case -ESHUTDOWN: > + xprt_force_disconnect(&transport->xprt); > + return -ESHUTDOWN; > + } > + return ret; > +} > + > #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) > > static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, > int addrlen, struct kvec *vec, unsigned int base, int more) > @@ -484,6 +798,12 @@ static int xs_nospace(struct rpc_rqst *req) > return ret; > } > > +static void > +xs_stream_prepare_request(struct rpc_rqst *req) > +{ > + req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, > GFP_NOIO); > +} > + > /* > * Determine if the previous message in the stream was aborted > before it > * could complete transmission. > @@ -1157,263 +1477,7 @@ static void xs_tcp_force_close(struct > rpc_xprt *xprt) > xprt_force_disconnect(xprt); > } > > -static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct > xdr_skb_reader *desc) > -{ > - struct sock_xprt *transport = container_of(xprt, struct > sock_xprt, xprt); > - size_t len, used; > - char *p; > - > - p = ((char *) &transport->recv.fraghdr) + transport- > >recv.offset; > - len = sizeof(transport->recv.fraghdr) - transport->recv.offset; > - used = xdr_skb_read_bits(desc, p, len); > - transport->recv.offset += used; > - if (used != len) > - return; > - > - transport->recv.len = ntohl(transport->recv.fraghdr); > - if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT) > - transport->recv.flags |= TCP_RCV_LAST_FRAG; > - else > - transport->recv.flags &= ~TCP_RCV_LAST_FRAG; > - transport->recv.len &= RPC_FRAGMENT_SIZE_MASK; > - > - transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR; > - transport->recv.offset = 0; > - > - /* Sanity check of the record length */ > - if (unlikely(transport->recv.len < 8)) { > - dprintk("RPC: invalid TCP record fragment > length\n"); > - xs_tcp_force_close(xprt); > - return; > - } > - dprintk("RPC: reading TCP record fragment of length > %d\n", > - transport->recv.len); > -} > - > -static void xs_tcp_check_fraghdr(struct sock_xprt *transport) > -{ > - if (transport->recv.offset == transport->recv.len) { > - transport->recv.flags |= TCP_RCV_COPY_FRAGHDR; > - transport->recv.offset = 0; > - if (transport->recv.flags & TCP_RCV_LAST_FRAG) { > - transport->recv.flags &= ~TCP_RCV_COPY_DATA; > - transport->recv.flags |= TCP_RCV_COPY_XID; > - transport->recv.copied = 0; > - } > - } > -} > - > -static inline void xs_tcp_read_xid(struct sock_xprt *transport, > struct xdr_skb_reader *desc) > -{ > - size_t len, used; > - char *p; > - > - len = sizeof(transport->recv.xid) - transport->recv.offset; > - dprintk("RPC: reading XID (%zu bytes)\n", len); > - p = ((char *) &transport->recv.xid) + transport->recv.offset; > - used = xdr_skb_read_bits(desc, p, len); > - transport->recv.offset += used; > - if (used != len) > - return; > - transport->recv.flags &= ~TCP_RCV_COPY_XID; > - transport->recv.flags |= TCP_RCV_READ_CALLDIR; > - transport->recv.copied = 4; > - dprintk("RPC: reading %s XID %08x\n", > - (transport->recv.flags & TCP_RPC_REPLY) ? > "reply for" > - : > "request with", > - ntohl(transport->recv.xid)); > - xs_tcp_check_fraghdr(transport); > -} > - > -static inline void xs_tcp_read_calldir(struct sock_xprt *transport, > - struct xdr_skb_reader *desc) > -{ > - size_t len, used; > - u32 offset; > - char *p; > - > - /* > - * We want transport->recv.offset to be 8 at the end of this > routine > - * (4 bytes for the xid and 4 bytes for the call/reply flag). > - * When this function is called for the first time, > - * transport->recv.offset is 4 (after having already read the > xid). > - */ > - offset = transport->recv.offset - sizeof(transport->recv.xid); > - len = sizeof(transport->recv.calldir) - offset; > - dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", > len); > - p = ((char *) &transport->recv.calldir) + offset; > - used = xdr_skb_read_bits(desc, p, len); > - transport->recv.offset += used; > - if (used != len) > - return; > - transport->recv.flags &= ~TCP_RCV_READ_CALLDIR; > - /* > - * We don't yet have the XDR buffer, so we will write the > calldir > - * out after we get the buffer from the 'struct rpc_rqst' > - */ > - switch (ntohl(transport->recv.calldir)) { > - case RPC_REPLY: > - transport->recv.flags |= TCP_RCV_COPY_CALLDIR; > - transport->recv.flags |= TCP_RCV_COPY_DATA; > - transport->recv.flags |= TCP_RPC_REPLY; > - break; > - case RPC_CALL: > - transport->recv.flags |= TCP_RCV_COPY_CALLDIR; > - transport->recv.flags |= TCP_RCV_COPY_DATA; > - transport->recv.flags &= ~TCP_RPC_REPLY; > - break; > - default: > - dprintk("RPC: invalid request message type\n"); > - xs_tcp_force_close(&transport->xprt); > - } > - xs_tcp_check_fraghdr(transport); > -} > - > -static inline void xs_tcp_read_common(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc, > - struct rpc_rqst *req) > -{ > - struct sock_xprt *transport = > - container_of(xprt, struct sock_xprt, > xprt); > - struct xdr_buf *rcvbuf; > - size_t len; > - ssize_t r; > - > - rcvbuf = &req->rq_private_buf; > - > - if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) { > - /* > - * Save the RPC direction in the XDR buffer > - */ > - memcpy(rcvbuf->head[0].iov_base + transport- > >recv.copied, > - &transport->recv.calldir, > - sizeof(transport->recv.calldir)); > - transport->recv.copied += sizeof(transport- > >recv.calldir); > - transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR; > - } > - > - len = desc->count; > - if (len > transport->recv.len - transport->recv.offset) > - desc->count = transport->recv.len - transport- > >recv.offset; > - r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied, > - desc, xdr_skb_read_bits); > - > - if (desc->count) { > - /* Error when copying to the receive buffer, > - * usually because we weren't able to allocate > - * additional buffer pages. All we can do now > - * is turn off TCP_RCV_COPY_DATA, so the request > - * will not receive any additional updates, > - * and time out. > - * Any remaining data from this record will > - * be discarded. > - */ > - transport->recv.flags &= ~TCP_RCV_COPY_DATA; > - dprintk("RPC: XID %08x truncated request\n", > - ntohl(transport->recv.xid)); > - dprintk("RPC: xprt = %p, recv.copied = %lu, " > - "recv.offset = %u, recv.len = %u\n", > - xprt, transport->recv.copied, > - transport->recv.offset, transport- > >recv.len); > - return; > - } > - > - transport->recv.copied += r; > - transport->recv.offset += r; > - desc->count = len - r; > - > - dprintk("RPC: XID %08x read %zd bytes\n", > - ntohl(transport->recv.xid), r); > - dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = > %u, " > - "recv.len = %u\n", xprt, transport- > >recv.copied, > - transport->recv.offset, transport->recv.len); > - > - if (transport->recv.copied == req->rq_private_buf.buflen) > - transport->recv.flags &= ~TCP_RCV_COPY_DATA; > - else if (transport->recv.offset == transport->recv.len) { > - if (transport->recv.flags & TCP_RCV_LAST_FRAG) > - transport->recv.flags &= ~TCP_RCV_COPY_DATA; > - } > -} > - > -/* > - * Finds the request corresponding to the RPC xid and invokes the > common > - * tcp read code to read the data. > - */ > -static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc) > -{ > - struct sock_xprt *transport = > - container_of(xprt, struct sock_xprt, > xprt); > - struct rpc_rqst *req; > - > - dprintk("RPC: read reply XID %08x\n", ntohl(transport- > >recv.xid)); > - > - /* Find and lock the request corresponding to this xid */ > - spin_lock(&xprt->queue_lock); > - req = xprt_lookup_rqst(xprt, transport->recv.xid); > - if (!req) { > - dprintk("RPC: XID %08x request not found!\n", > - ntohl(transport->recv.xid)); > - spin_unlock(&xprt->queue_lock); > - return -1; > - } > - xprt_pin_rqst(req); > - spin_unlock(&xprt->queue_lock); > - > - xs_tcp_read_common(xprt, desc, req); > - > - spin_lock(&xprt->queue_lock); > - if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) > - xprt_complete_rqst(req->rq_task, transport- > >recv.copied); > - xprt_unpin_rqst(req); > - spin_unlock(&xprt->queue_lock); > - return 0; > -} > - > #if defined(CONFIG_SUNRPC_BACKCHANNEL) > -/* > - * Obtains an rpc_rqst previously allocated and invokes the common > - * tcp read code to read the data. The result is placed in the > callback > - * queue. > - * If we're unable to obtain the rpc_rqst we schedule the closing of > the > - * connection and return -1. > - */ > -static int xs_tcp_read_callback(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc) > -{ > - struct sock_xprt *transport = > - container_of(xprt, struct sock_xprt, > xprt); > - struct rpc_rqst *req; > - > - /* Look up the request corresponding to the given XID */ > - req = xprt_lookup_bc_request(xprt, transport->recv.xid); > - if (req == NULL) { > - printk(KERN_WARNING "Callback slot table > overflowed\n"); > - xprt_force_disconnect(xprt); > - return -1; > - } > - > - dprintk("RPC: read callback XID %08x\n", ntohl(req- > >rq_xid)); > - xs_tcp_read_common(xprt, desc, req); > - > - if (!(transport->recv.flags & TCP_RCV_COPY_DATA)) > - xprt_complete_bc_request(req, transport->recv.copied); > - > - return 0; > -} > - > -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc) > -{ > - struct sock_xprt *transport = > - container_of(xprt, struct sock_xprt, > xprt); > - > - return (transport->recv.flags & TCP_RPC_REPLY) ? > - xs_tcp_read_reply(xprt, desc) : > - xs_tcp_read_callback(xprt, desc); > -} > - > static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) > { > int ret; > @@ -1429,106 +1493,14 @@ static size_t xs_tcp_bc_maxpayload(struct > rpc_xprt *xprt) > { > return PAGE_SIZE; > } > -#else > -static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc) > -{ > - return xs_tcp_read_reply(xprt, desc); > -} > #endif /* CONFIG_SUNRPC_BACKCHANNEL */ > > -/* > - * Read data off the transport. This can be either an RPC_CALL or > an > - * RPC_REPLY. Relay the processing to helper functions. > - */ > -static void xs_tcp_read_data(struct rpc_xprt *xprt, > - struct xdr_skb_reader *desc) > -{ > - struct sock_xprt *transport = > - container_of(xprt, struct sock_xprt, > xprt); > - > - if (_xs_tcp_read_data(xprt, desc) == 0) > - xs_tcp_check_fraghdr(transport); > - else { > - /* > - * The transport_lock protects the request handling. > - * There's no need to hold it to update the recv.flags. > - */ > - transport->recv.flags &= ~TCP_RCV_COPY_DATA; > - } > -} > - > -static inline void xs_tcp_read_discard(struct sock_xprt *transport, > struct xdr_skb_reader *desc) > -{ > - size_t len; > - > - len = transport->recv.len - transport->recv.offset; > - if (len > desc->count) > - len = desc->count; > - desc->count -= len; > - desc->offset += len; > - transport->recv.offset += len; > - dprintk("RPC: discarded %zu bytes\n", len); > - xs_tcp_check_fraghdr(transport); > -} > - > -static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct > sk_buff *skb, unsigned int offset, size_t len) > -{ > - struct rpc_xprt *xprt = rd_desc->arg.data; > - struct sock_xprt *transport = container_of(xprt, struct > sock_xprt, xprt); > - struct xdr_skb_reader desc = { > - .skb = skb, > - .offset = offset, > - .count = len, > - }; > - size_t ret; > - > - dprintk("RPC: xs_tcp_data_recv started\n"); > - do { > - trace_xs_tcp_data_recv(transport); > - /* Read in a new fragment marker if necessary */ > - /* Can we ever really expect to get completely empty > fragments? */ > - if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) { > - xs_tcp_read_fraghdr(xprt, &desc); > - continue; > - } > - /* Read in the xid if necessary */ > - if (transport->recv.flags & TCP_RCV_COPY_XID) { > - xs_tcp_read_xid(transport, &desc); > - continue; > - } > - /* Read in the call/reply flag */ > - if (transport->recv.flags & TCP_RCV_READ_CALLDIR) { > - xs_tcp_read_calldir(transport, &desc); > - continue; > - } > - /* Read in the request data */ > - if (transport->recv.flags & TCP_RCV_COPY_DATA) { > - xs_tcp_read_data(xprt, &desc); > - continue; > - } > - /* Skip over any trailing bytes on short reads */ > - xs_tcp_read_discard(transport, &desc); > - } while (desc.count); > - ret = len - desc.count; > - if (ret < rd_desc->count) > - rd_desc->count -= ret; > - else > - rd_desc->count = 0; > - trace_xs_tcp_data_recv(transport); > - dprintk("RPC: xs_tcp_data_recv done\n"); > - return ret; > -} > - > static void xs_tcp_data_receive(struct sock_xprt *transport) > { > struct rpc_xprt *xprt = &transport->xprt; > struct sock *sk; > - read_descriptor_t rd_desc = { > - .arg.data = xprt, > - }; > - unsigned long total = 0; > - int read = 0; > + size_t read = 0; > + ssize_t ret = 0; > > restart: > mutex_lock(&transport->recv_mutex); > @@ -1536,18 +1508,12 @@ static void xs_tcp_data_receive(struct > sock_xprt *transport) > if (sk == NULL) > goto out; > > - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ > for (;;) { > - rd_desc.count = RPC_TCP_READ_CHUNK_SZ; > - lock_sock(sk); > - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); > - if (rd_desc.count != 0 || read < 0) { > - clear_bit(XPRT_SOCK_DATA_READY, &transport- > >sock_state); > - release_sock(sk); > + clear_bit(XPRT_SOCK_DATA_READY, &transport- > >sock_state); > + ret = xs_read_stream(transport, MSG_DONTWAIT | > MSG_NOSIGNAL); > + if (ret < 0) > break; > - } > - release_sock(sk); > - total += read; > + read += ret; > if (need_resched()) { > mutex_unlock(&transport->recv_mutex); > cond_resched(); > @@ -1558,7 +1524,7 @@ static void xs_tcp_data_receive(struct > sock_xprt *transport) > queue_work(xprtiod_workqueue, &transport->recv_worker); > out: > mutex_unlock(&transport->recv_mutex); > - trace_xs_tcp_data_ready(xprt, read, total); > + trace_xs_tcp_data_ready(xprt, ret, read); > } > > static void xs_tcp_data_receive_workfn(struct work_struct *work) > @@ -2380,7 +2346,6 @@ static int xs_tcp_finish_connecting(struct > rpc_xprt *xprt, struct socket *sock) > transport->recv.offset = 0; > transport->recv.len = 0; > transport->recv.copied = 0; > - transport->recv.flags = TCP_RCV_COPY_FRAGHDR | > TCP_RCV_COPY_XID; > transport->xmit.offset = 0; > > /* Tell the socket layer to start connecting... */ > @@ -2802,6 +2767,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = { > .connect = xs_connect, > .buf_alloc = rpc_malloc, > .buf_free = rpc_free, > + .prepare_request = xs_stream_prepare_request, > .send_request = xs_tcp_send_request, > .set_retrans_timeout = xprt_set_retrans_timeout_def, > .close = xs_tcp_shutdown, -- Trond Myklebust Linux NFS client maintainer, Hammerspace trond.myklebust@xxxxxxxxxxxxxxx