Hi Trond- On Jun 4, 2015, at 5:26 PM, Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx> wrote: > We need to allow the server to send a new request immediately after we've > replied to the previous one. Right now, there is a window between the > send and the release of the old request in rpc_put_task(), where the > server could send us a new backchannel RPC call, and we have no > request to service it. > > Signed-off-by: Trond Myklebust <trond.myklebust@xxxxxxxxxxxxxxx> > --- > include/linux/sunrpc/xprt.h | 3 ++- > net/sunrpc/backchannel_rqst.c | 37 ++++++++++++++++++++++++------------- > net/sunrpc/svc.c | 6 +++++- > 3 files changed, 31 insertions(+), 15 deletions(-) > > diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h > index 8b93ef53df3c..bc9c39d6d30d 100644 > --- a/include/linux/sunrpc/xprt.h > +++ b/include/linux/sunrpc/xprt.h > @@ -212,7 +212,8 @@ struct rpc_xprt { > #if defined(CONFIG_SUNRPC_BACKCHANNEL) > struct svc_serv *bc_serv; /* The RPC service which will */ > /* process the callback */ > - unsigned int bc_alloc_count; /* Total number of preallocs */ > + int bc_alloc_count; /* Total number of preallocs */ > + atomic_t bc_free_slots; > spinlock_t bc_pa_lock; /* Protects the preallocated > * items */ > struct list_head bc_pa_list; /* List of preallocated > diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c > index 1baa41099469..9825ff0f91d6 100644 > --- a/net/sunrpc/backchannel_rqst.c > +++ b/net/sunrpc/backchannel_rqst.c > @@ -37,16 +37,18 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. > */ > static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) > { > - return xprt->bc_alloc_count > 0; > + return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots); > } > > static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n) > { > + atomic_add(n, &xprt->bc_free_slots); > xprt->bc_alloc_count += n; > } > > static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n) > { > + atomic_sub(n, &xprt->bc_free_slots); > return xprt->bc_alloc_count -= n; > } > > @@ -232,9 +234,15 @@ static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid) > struct rpc_rqst *req = NULL; > > dprintk("RPC: allocate a backchannel request\n"); > - if (list_empty(&xprt->bc_pa_list)) > + if (atomic_read(&xprt->bc_free_slots) <= 0) > goto not_found; > - > + if (list_empty(&xprt->bc_pa_list)) { > + req = xprt_alloc_bc_req(xprt, GFP_ATOMIC); > + if (!req) > + goto not_found; > + /* Note: this 'free' request adds it to xprt->bc_pa_list */ > + xprt_free_bc_request(req); xprt_alloc_bc_request() already holds xprt->bc_pa_lock. xprt_free_bc_request() will take it again to add this fresh request to xprt->bc_pa_list. One of our testers hit this. Can you simply open-code the list_add_tail() here? > + } > req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, > rq_bc_pa_list); > req->rq_reply_bytes_recvd = 0; > @@ -260,11 +268,21 @@ void xprt_free_bc_request(struct rpc_rqst *req) > > req->rq_connect_cookie = xprt->connect_cookie - 1; > smp_mb__before_atomic(); > - WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); > clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); > smp_mb__after_atomic(); > > - if (!xprt_need_to_requeue(xprt)) { > + /* > + * Return it to the list of preallocations so that it > + * may be reused by a new callback request. > + */ > + spin_lock_bh(&xprt->bc_pa_lock); > + if (xprt_need_to_requeue(xprt)) { > + list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); > + xprt->bc_alloc_count++; > + req = NULL; > + } > + spin_unlock_bh(&xprt->bc_pa_lock); > + if (req != NULL) { > /* > * The last remaining session was destroyed while this > * entry was in use. Free the entry and don't attempt > @@ -275,14 +293,6 @@ void xprt_free_bc_request(struct rpc_rqst *req) > xprt_free_allocation(req); > return; > } > - > - /* > - * Return it to the list of preallocations so that it > - * may be reused by a new callback request. > - */ > - spin_lock_bh(&xprt->bc_pa_lock); > - list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); > - spin_unlock_bh(&xprt->bc_pa_lock); > } > > /* > @@ -326,6 +336,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) > > spin_lock(&xprt->bc_pa_lock); > list_del(&req->rq_bc_pa_list); > + xprt->bc_alloc_count--; > spin_unlock(&xprt->bc_pa_lock); > > req->rq_private_buf.len = copied; > diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c > index 51c8cad5e765..b47f02f60b9c 100644 > --- a/net/sunrpc/svc.c > +++ b/net/sunrpc/svc.c > @@ -1351,6 +1351,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, > struct kvec *argv = &rqstp->rq_arg.head[0]; > struct kvec *resv = &rqstp->rq_res.head[0]; > struct rpc_task *task; > + int proc_error; > int error; > > dprintk("svc: %s(%p)\n", __func__, req); > @@ -1382,7 +1383,10 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, > svc_getnl(argv); /* CALLDIR */ > > /* Parse and execute the bc call */ > - if (!svc_process_common(rqstp, argv, resv)) { > + proc_error = svc_process_common(rqstp, argv, resv); > + > + atomic_inc(&req->rq_xprt->bc_free_slots); > + if (!proc_error) { > /* Processing error: drop the request */ > xprt_free_bc_request(req); > return 0; > -- > 2.4.2 > > -- > To unsubscribe from this list: send the line "unsubscribe linux-nfs" in > the body of a message to majordomo@xxxxxxxxxxxxxxx > More majordomo info at http://vger.kernel.org/majordomo-info.html -- Chuck Lever -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html