tsh_size was added to accommodate transports that send a pre-amble before each RPC message. However, this assumes the pre-amble is fixed in size, which isn't true for some transports. That makes tsh_size not very generic. Also I'd like to make the estimation of RPC send and receive buffer sizes more precise. tsh_size doesn't currently appear to be accounted for at all by call_allocate. Therefore let's just remove the tsh_size concept, and make the only transports that have a non-zero tsh_size employ a direct approach. Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx> --- I've taken an approach where only the transports that need a record marker need to pay the expense. I don't want to add another indirect call in the generic path. However, I've found that during heavy multi-threaded workloads, occasionally this mechanism inserts a record marker in the middle of a Call message. include/linux/sunrpc/xprt.h | 7 --- net/sunrpc/auth_gss/auth_gss.c | 3 - net/sunrpc/clnt.c | 1 net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 1 net/sunrpc/xprtrdma/transport.c | 1 net/sunrpc/xprtsock.c | 69 +++++++++++++++++----------- 6 files changed, 43 insertions(+), 39 deletions(-) diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index a4ab4f8..17fc651 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -197,8 +197,6 @@ struct rpc_xprt { size_t max_payload; /* largest RPC payload size, in bytes */ - unsigned int tsh_size; /* size of transport specific - header */ struct rpc_wait_queue binding; /* requests waiting on rpcbind */ struct rpc_wait_queue sending; /* requests waiting to send */ @@ -363,11 +361,6 @@ struct rpc_xprt * xprt_alloc(struct net *net, size_t size, unsigned int max_req); void xprt_free(struct rpc_xprt *); -static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p) -{ - return p + xprt->tsh_size; -} - static inline int xprt_enable_swap(struct rpc_xprt *xprt) { diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 4735a06..13508e4 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1577,8 +1577,7 @@ static void gss_pipe_free(struct gss_pipe *p) /* We compute the checksum for the verifier over the xdr-encoded bytes * starting with the xid and ending at the end of the credential: */ - iov.iov_base = xprt_skip_transport_header(req->rq_xprt, - req->rq_snd_buf.head[0].iov_base); + iov.iov_base = req->rq_snd_buf.head[0].iov_base; iov.iov_len = (u8 *)p - (u8 *)iov.iov_base; xdr_buf_from_iov(&iov, &verf_buf); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 1ee04e0..4f88062 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2330,7 +2330,6 @@ void rpc_force_rebind(struct rpc_clnt *clnt) /* FIXME: check buffer size? */ - p = xprt_skip_transport_header(req->rq_xprt, p); *p++ = req->rq_xid; /* XID */ *p++ = htonl(RPC_CALL); /* CALL */ *p++ = htonl(RPC_VERSION); /* RPC version */ diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index b908f2c..907464c 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -304,7 +304,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->prot = XPRT_TRANSPORT_BC_RDMA; - xprt->tsh_size = 0; xprt->ops = &xprt_rdma_bc_procs; memcpy(&xprt->addr, args->dstaddr, args->addrlen); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 9a7bb58..2a704d5 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -332,7 +332,6 @@ xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->resvport = 0; /* privileged port not needed */ - xprt->tsh_size = 0; /* RPC-RDMA handles framing */ xprt->ops = &xprt_rdma_procs; /* diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index d5ce1a8..66b08aa 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -678,6 +678,31 @@ static void xs_stream_data_receive_workfn(struct work_struct *work) #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) +static int xs_send_record_marker(struct sock_xprt *transport, + const struct rpc_rqst *req) +{ + static struct msghdr msg = { + .msg_name = NULL, + .msg_namelen = 0, + .msg_flags = (XS_SENDMSG_FLAGS | MSG_MORE), + }; + rpc_fraghdr marker; + struct kvec iov = { + .iov_base = &marker, + .iov_len = sizeof(marker), + }; + u32 reclen; + + if (unlikely(!transport->sock)) + return -ENOTSOCK; + if (req->rq_bytes_sent) + return 0; + + reclen = req->rq_snd_buf.len; + marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); + return kernel_sendmsg(transport->sock, &msg, &iov, 1, iov.iov_len); +} + static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) { struct msghdr msg = { @@ -851,16 +876,6 @@ static int xs_nospace(struct rpc_rqst *req) return transport->xmit.offset != 0 && req->rq_bytes_sent == 0; } -/* - * Construct a stream transport record marker in @buf. - */ -static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) -{ - u32 reclen = buf->len - sizeof(rpc_fraghdr); - rpc_fraghdr *base = buf->head[0].iov_base; - *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); -} - /** * xs_local_send_request - write an RPC request to an AF_LOCAL socket * @req: pointer to RPC request @@ -887,18 +902,20 @@ static int xs_local_send_request(struct rpc_rqst *req) return -ENOTCONN; } - xs_encode_stream_record_marker(&req->rq_snd_buf); - xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); req->rq_xtime = ktime_get(); + status = xs_send_record_marker(transport, req); + if (status < 0) + goto marker_fail; status = xs_sendpages(transport->sock, NULL, 0, xdr, transport->xmit.offset, true, &sent); dprintk("RPC: %s(%u) = %d\n", __func__, xdr->len - transport->xmit.offset, status); +marker_fail: if (status == -EAGAIN && sock_writeable(transport->inet)) status = -ENOBUFS; @@ -1039,8 +1056,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req) return -ENOTCONN; } - xs_encode_stream_record_marker(&req->rq_snd_buf); - xs_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); @@ -1058,6 +1073,9 @@ static int xs_tcp_send_request(struct rpc_rqst *req) * to cope with writespace callbacks arriving _after_ we have * called sendmsg(). */ req->rq_xtime = ktime_get(); + status = xs_send_record_marker(transport, req); + if (status < 0) + goto marker_fail; while (1) { sent = 0; status = xs_sendpages(transport->sock, NULL, 0, xdr, @@ -1106,6 +1124,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req) vm_wait = false; } +marker_fail: switch (status) { case -ENOTSOCK: status = -ENOTCONN; @@ -2530,24 +2549,24 @@ static int bc_sendto(struct rpc_rqst *req) struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct socket *sock = transport->sock; unsigned long headoff; unsigned long tailoff; - xs_encode_stream_record_marker(xbufp); + len = xs_send_record_marker(transport, req); + if (len < 0) + goto out_fail; tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; - len = svc_send_common(sock, xbufp, + len = svc_send_common(transport->sock, xbufp, virt_to_page(xbufp->head[0].iov_base), headoff, xbufp->tail[0].iov_base, tailoff); - - if (len != xbufp->len) { - printk(KERN_NOTICE "Error sending entire callback!\n"); - len = -EAGAIN; - } - + if (len != xbufp->len) + goto out_fail; return len; +out_fail: + pr_info("Error sending entire callback!\n"); + return -EAGAIN; } /* @@ -2787,7 +2806,6 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = 0; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -2856,7 +2874,6 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_UDP; - xprt->tsh_size = 0; /* XXX: header size can vary due to auth type, IPv6, etc. */ xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); @@ -2936,7 +2953,6 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; @@ -3009,7 +3025,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; - xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->timeout = &xs_tcp_default_timeout;