On Fri, 2009-05-01 at 02:20 +0300, Benny Halevy wrote: > From: Ricardo Labiaga <Ricardo.Labiaga@xxxxxxxxxx> > > Handles RPC replies and backchannel callbacks. Traditionally the NFS > client has expected only RPC replies on its open connections. With > NFSv4.1, callbacks can arrive over an existing open connection. > > This patch refactors the old xs_tcp_read_request() into an RPC reply handler: > xs_tcp_read_reply(), a new backchannel callback handler: xs_tcp_read_callback(), > and a common routine to read the data off the transport: xs_tcp_read_common(). > The new xs_tcp_read_callback() queues callback requests onto a queue where > the callback service (a separate thread) is listening for the processing. > > This patch incorporates work and suggestions from Rahul Iyer (iyer@xxxxxxxxxx) > and Benny Halevy (bhalevy@xxxxxxxxxxx). > > [nfs41: Keep track of RPC call/reply direction with a flag] > [nfs41: Preallocate rpc_rqst receive buffer for handling callbacks] > Signed-off-by: Ricardo Labiaga <ricardo.labiaga@xxxxxxxxxx> > Signed-off-by: Benny Halevy <bhalevy@xxxxxxxxxxx> > --- > net/sunrpc/xprtsock.c | 141 ++++++++++++++++++++++++++++++++++++++++++------ > 1 files changed, 123 insertions(+), 18 deletions(-) > > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c > index 692b517..403ebda 100644 > --- a/net/sunrpc/xprtsock.c > +++ b/net/sunrpc/xprtsock.c > @@ -34,6 +34,9 @@ > #include <linux/sunrpc/sched.h> > #include <linux/sunrpc/xprtsock.h> > #include <linux/file.h> > +#ifdef CONFIG_NFS_V4_1 > +#include <linux/sunrpc/bc_xprt.h> > +#endif > > #include <net/sock.h> > #include <net/checksum.h> > @@ -1028,25 +1031,16 @@ static inline void xs_tcp_read_calldir(struct sock_xprt *transport, > xs_tcp_check_fraghdr(transport); > } > > -static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) > +static inline void xs_tcp_read_common(struct rpc_xprt *xprt, > + struct xdr_skb_reader *desc, > + struct rpc_rqst *req) > { > - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); > - struct rpc_rqst *req; > + struct sock_xprt *transport = > + container_of(xprt, struct sock_xprt, xprt); > struct xdr_buf *rcvbuf; > size_t len; > ssize_t r; > > - /* Find and lock the request corresponding to this xid */ > - spin_lock(&xprt->transport_lock); > - req = xprt_lookup_rqst(xprt, transport->tcp_xid); > - if (!req) { > - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; > - dprintk("RPC: XID %08x request not found!\n", > - ntohl(transport->tcp_xid)); > - spin_unlock(&xprt->transport_lock); > - return; > - } > - > rcvbuf = &req->rq_private_buf; > len = desc->count; > if (len > transport->tcp_reclen - transport->tcp_offset) { > @@ -1084,7 +1078,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea > "tcp_offset = %u, tcp_reclen = %u\n", > xprt, transport->tcp_copied, > transport->tcp_offset, transport->tcp_reclen); > - goto out; > + return; > } > > dprintk("RPC: XID %08x read %Zd bytes\n", > @@ -1100,11 +1094,122 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea > transport->tcp_flags &= ~TCP_RCV_COPY_DATA; > } > > -out: > + return; > +} > + > +/* > + * Finds the request corresponding to the RPC xid and invokes the common > + * tcp read code to read the data. > + */ > +static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, > + struct xdr_skb_reader *desc) > +{ > + struct sock_xprt *transport = > + container_of(xprt, struct sock_xprt, xprt); > + struct rpc_rqst *req; > + > + dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); > + > + /* Find and lock the request corresponding to this xid */ > + spin_lock(&xprt->transport_lock); > + req = xprt_lookup_rqst(xprt, transport->tcp_xid); > + if (!req) { > + dprintk("RPC: XID %08x request not found!\n", > + ntohl(transport->tcp_xid)); > + spin_unlock(&xprt->transport_lock); > + return -1; > + } > + > + xs_tcp_read_common(xprt, desc, req); > + > if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) > xprt_complete_rqst(req->rq_task, transport->tcp_copied); > + > spin_unlock(&xprt->transport_lock); > - xs_tcp_check_fraghdr(transport); > + return 0; > +} > + > +#if defined(CONFIG_NFS_V4_1) > +/* > + * Obtains an rpc_rqst previously allocated and invokes the common > + * tcp read code to read the data. The result is placed in the callback > + * queue. > + * If we're unable to obtain the rpc_rqst we schedule the closing of the > + * connection and return -1. > + */ > +static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, > + struct xdr_skb_reader *desc) > +{ > + struct sock_xprt *transport = > + container_of(xprt, struct sock_xprt, xprt); > + struct rpc_rqst *req; > + > + req = xprt_alloc_bc_request(xprt); > + if (req == NULL) { > + /* > + * Schedule an autoclose RPC call > + */ > + printk(KERN_WARNING "Callback slot table overflowed\n"); > + set_bit(XPRT_CLOSE_WAIT, &xprt->state); > + if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) > + queue_work(rpciod_workqueue, &xprt->task_cleanup); > + return -1; > + } > + > + req->rq_xid = transport->tcp_xid; > + dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); > + xs_tcp_read_common(xprt, desc, req); > + > + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { > + struct svc_serv *bc_serv = xprt->bc_serv; > + > + /* > + * Add callback request to callback list. The callback > + * service sleeps on the sv_cb_waitq waiting for new > + * requests. Wake it up after adding enqueing the > + * request. > + */ > + dprintk("RPC: add callback request to list\n"); > + spin_lock(&bc_serv->sv_cb_lock); > + list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); > + spin_unlock(&bc_serv->sv_cb_lock); > + wake_up(&bc_serv->sv_cb_waitq); > + } > + > + req->rq_private_buf.len = transport->tcp_copied; > + > + return 0; > +} > +#endif /* CONFIG_NFS_V4_1 */ > + > +/* > + * Read data off the transport. This can be either an RPC_CALL or an > + * RPC_REPLY. Relay the processing to helper functions. > + */ > +static void xs_tcp_read_data(struct rpc_xprt *xprt, > + struct xdr_skb_reader *desc) > +{ > + struct sock_xprt *transport = > + container_of(xprt, struct sock_xprt, xprt); > + int status; > + > +#if defined(CONFIG_NFS_V4_1) > + status = (transport->tcp_flags & TCP_RPC_REPLY) ? > + xs_tcp_read_reply(xprt, desc) : > + xs_tcp_read_callback(xprt, desc); > +#else > + status = xs_tcp_read_reply(xprt, desc); > +#endif /* CONFIG_NFS_V4_1 */ Ugh... Please don't embed ifdefs in functions like this... > + > + if (status == 0) > + xs_tcp_check_fraghdr(transport); > + else { > + /* > + * The transport_lock protects the request handling. > + * There's no need to hold it to update the tcp_flags. > + */ > + transport->tcp_flags &= ~TCP_RCV_COPY_DATA; > + } > } > > static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) > @@ -1151,7 +1256,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns > } > /* Read in the request data */ > if (transport->tcp_flags & TCP_RCV_COPY_DATA) { > - xs_tcp_read_request(xprt, &desc); > + xs_tcp_read_data(xprt, &desc); > continue; > } > /* Skip over any trailing bytes on short reads */ -- Trond Myklebust Linux NFS client maintainer NetApp Trond.Myklebust@xxxxxxxxxx www.netapp.com -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html