Re: [PATCH v4 06/30] xprtrdma: Don't wake pending tasks until disconnect is done

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




> On Dec 17, 2018, at 12:28 PM, Trond Myklebust <trondmy@xxxxxxxxxxxxxxx> wrote:
> 
> On Mon, 2018-12-17 at 11:39 -0500, Chuck Lever wrote:
>> Transport disconnect processing does a "wake pending tasks" at
>> various points.
>> 
>> Suppose an RPC Reply is being processed. The RPC task that Reply
>> goes with is waiting on the pending queue. If a disconnect wake-up
>> happens before reply processing is done, that reply, even if it is
>> good, is thrown away, and the RPC has to be sent again.
>> 
>> This window apparently does not exist for socket transports because
>> there is a lock held while a reply is being received which prevents
>> the wake-up call until after reply processing is done.
>> 
>> To resolve this, all RPC replies being processed on an RPC-over-RDMA
>> transport have to complete before pending tasks are awoken due to a
>> transport disconnect.
>> 
>> Callers that already hold the transport write lock may invoke
>> ->ops->close directly. Others use a generic helper that schedules
>> a close when the write lock can be taken safely.
>> 
>> Signed-off-by: Chuck Lever <chuck.lever@xxxxxxxxxx>
>> ---
>> include/linux/sunrpc/xprt.h                |    1 +
>> net/sunrpc/xprt.c                          |   19
>> +++++++++++++++++++
>> net/sunrpc/xprtrdma/backchannel.c          |   13 +++++++------
>> net/sunrpc/xprtrdma/svc_rdma_backchannel.c |    8 +++++---
>> net/sunrpc/xprtrdma/transport.c            |   16 ++++++++++------
>> net/sunrpc/xprtrdma/verbs.c                |    5 ++---
>> 6 files changed, 44 insertions(+), 18 deletions(-)
>> 
>> diff --git a/include/linux/sunrpc/xprt.h
>> b/include/linux/sunrpc/xprt.h
>> index a4ab4f8..ee94ed0 100644
>> --- a/include/linux/sunrpc/xprt.h
>> +++ b/include/linux/sunrpc/xprt.h
>> @@ -401,6 +401,7 @@ static inline __be32
>> *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *
>> bool			xprt_request_get_cong(struct rpc_xprt *xprt,
>> struct rpc_rqst *req);
>> void			xprt_disconnect_done(struct rpc_xprt *xprt);
>> void			xprt_force_disconnect(struct rpc_xprt *xprt);
>> +void			xprt_disconnect_nowake(struct rpc_xprt *xprt);
>> void			xprt_conditional_disconnect(struct rpc_xprt
>> *xprt, unsigned int cookie);
>> 
>> bool			xprt_lock_connect(struct rpc_xprt *, struct
>> rpc_task *, void *);
>> diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
>> index ce92700..afe412e 100644
>> --- a/net/sunrpc/xprt.c
>> +++ b/net/sunrpc/xprt.c
>> @@ -685,6 +685,25 @@ void xprt_force_disconnect(struct rpc_xprt
>> *xprt)
>> }
>> EXPORT_SYMBOL_GPL(xprt_force_disconnect);
>> 
>> +/**
>> + * xprt_disconnect_nowake - force a call to xprt->ops->close
>> + * @xprt: transport to disconnect
>> + *
>> + * The caller must ensure that xprt_wake_pending_tasks() is
>> + * called later.
>> + */
>> +void xprt_disconnect_nowake(struct rpc_xprt *xprt)
>> +{
>> +       /* Don't race with the test_bit() in xprt_clear_locked() */
>> +       spin_lock_bh(&xprt->transport_lock);
>> +       set_bit(XPRT_CLOSE_WAIT, &xprt->state);
>> +       /* Try to schedule an autoclose RPC call */
>> +       if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
>> +               queue_work(xprtiod_workqueue, &xprt->task_cleanup);
>> +       spin_unlock_bh(&xprt->transport_lock);
>> +}
>> +EXPORT_SYMBOL_GPL(xprt_disconnect_nowake);
>> +
> 
> We shouldn't need both xprt_disconnect_nowake() and
> xprt_force_disconnect() to be exported given that you can build the
> latter from the former + xprt_wake_pending_tasks() (which is also
> already exported).

Thanks for your review!

I can get rid of xprt_disconnect_nowake. There are some variations,
depending on why wake_pending_tasks is protected by xprt->transport_lock.

static void xs_tcp_force_close(struct rpc_xprt *xprt)
{
        xprt_force_disconnect(xprt);
        xprt_wake_pending_tasks(xprt, -EAGAIN);
}    

Or,

static void xs_tcp_force_close(struct rpc_xprt *xprt)
{
        xprt_force_disconnect(xprt);
        spin_lock_bh(&xprt->transport_lock);
        xprt_wake_pending_tasks(xprt, -EAGAIN);
        spin_lock_bh(&xprt->transport_lock);
}    

Or,

void xprt_force_disconnect(struct rpc_xprt *xprt, bool wake)
{
        /* Don't race with the test_bit() in xprt_clear_locked() */
        spin_lock_bh(&xprt->transport_lock);
        set_bit(XPRT_CLOSE_WAIT, &xprt->state);
        /* Try to schedule an autoclose RPC call */
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
                queue_work(xprtiod_workqueue, &xprt->task_cleanup);
        if (wake)
                xprt_wake_pending_tasks(xprt, -EAGAIN);
        spin_unlock_bh(&xprt->transport_lock);
}

Which do you prefer?


>> static unsigned int
>> xprt_connect_cookie(struct rpc_xprt *xprt)
>> {
>> diff --git a/net/sunrpc/xprtrdma/backchannel.c
>> b/net/sunrpc/xprtrdma/backchannel.c
>> index 2cb07a3..5d462e8 100644
>> --- a/net/sunrpc/xprtrdma/backchannel.c
>> +++ b/net/sunrpc/xprtrdma/backchannel.c
>> @@ -193,14 +193,15 @@ static int rpcrdma_bc_marshal_reply(struct
>> rpc_rqst *rqst)
>>  */
>> int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
>> {
>> -	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
>> +	struct rpc_xprt *xprt = rqst->rq_xprt;
>> +	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>> 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>> 	int rc;
>> 
>> -	if (!xprt_connected(rqst->rq_xprt))
>> -		goto drop_connection;
>> +	if (!xprt_connected(xprt))
>> +		return -ENOTCONN;
>> 
>> -	if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
>> +	if (!xprt_request_get_cong(xprt, rqst))
>> 		return -EBADSLT;
>> 
>> 	rc = rpcrdma_bc_marshal_reply(rqst);
>> @@ -215,7 +216,7 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst
>> *rqst)
>> 	if (rc != -ENOTCONN)
>> 		return rc;
>> drop_connection:
>> -	xprt_disconnect_done(rqst->rq_xprt);
>> +	xprt->ops->close(xprt);
> 
> Why use an indirect call here? Is this ever going to be different to
> xprt_rdma_close()?
> 
>> 	return -ENOTCONN;
>> }
>> 
>> @@ -338,7 +339,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt
>> *r_xprt,
>> 
>> out_overflow:
>> 	pr_warn("RPC/RDMA backchannel overflow\n");
>> -	xprt_disconnect_done(xprt);
>> +	xprt_disconnect_nowake(xprt);
>> 	/* This receive buffer gets reposted automatically
>> 	 * when the connection is re-established.
>> 	 */
>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> index f3c147d..b908f2c 100644
>> --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
>> @@ -200,11 +200,10 @@ static int svc_rdma_bc_sendto(struct
>> svcxprt_rdma *rdma,
>> 		svc_rdma_send_ctxt_put(rdma, ctxt);
>> 		goto drop_connection;
>> 	}
>> -	return rc;
>> +	return 0;
>> 
>> drop_connection:
>> 	dprintk("svcrdma: failed to send bc call\n");
>> -	xprt_disconnect_done(xprt);
>> 	return -ENOTCONN;
>> }
>> 
>> @@ -225,8 +224,11 @@ static int svc_rdma_bc_sendto(struct
>> svcxprt_rdma *rdma,
>> 
>> 	ret = -ENOTCONN;
>> 	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
>> -	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
>> +	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
>> 		ret = rpcrdma_bc_send_request(rdma, rqst);
>> +		if (ret == -ENOTCONN)
>> +			svc_close_xprt(sxprt);
>> +	}
>> 
>> 	mutex_unlock(&sxprt->xpt_mutex);
>> 
>> diff --git a/net/sunrpc/xprtrdma/transport.c
>> b/net/sunrpc/xprtrdma/transport.c
>> index 91c476a..a16296b 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -453,13 +453,13 @@
>> 
>> 	if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
>> 		rpcrdma_ia_remove(ia);
>> -		return;
>> +		goto out;
>> 	}
>> +
>> 	if (ep->rep_connected == -ENODEV)
>> 		return;
>> 	if (ep->rep_connected > 0)
>> 		xprt->reestablish_timeout = 0;
>> -	xprt_disconnect_done(xprt);
>> 	rpcrdma_ep_disconnect(ep, ia);
>> 
>> 	/* Prepare @xprt for the next connection by reinitializing
>> @@ -467,6 +467,10 @@
>> 	 */
>> 	r_xprt->rx_buf.rb_credits = 1;
>> 	xprt->cwnd = RPC_CWNDSHIFT;
>> +
>> +out:
>> +	++xprt->connect_cookie;
>> +	xprt_disconnect_done(xprt);
>> }
>> 
>> /**
>> @@ -515,7 +519,7 @@
>> static void
>> xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task)
>> {
>> -	xprt_force_disconnect(xprt);
>> +	xprt_disconnect_nowake(xprt);
>> }
>> 
>> /**
>> @@ -717,7 +721,7 @@
>> #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
>> 
>> 	if (!xprt_connected(xprt))
>> -		goto drop_connection;
>> +		return -ENOTCONN;
>> 
>> 	if (!xprt_request_get_cong(xprt, rqst))
>> 		return -EBADSLT;
>> @@ -749,8 +753,8 @@
>> 	if (rc != -ENOTCONN)
>> 		return rc;
>> drop_connection:
>> -	xprt_disconnect_done(xprt);
>> -	return -ENOTCONN;	/* implies disconnect */
>> +	xprt_rdma_close(xprt);
>> +	return -ENOTCONN;
>> }
>> 
>> void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file
>> *seq)
>> diff --git a/net/sunrpc/xprtrdma/verbs.c
>> b/net/sunrpc/xprtrdma/verbs.c
>> index 9a0a765..38a757c 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -252,7 +252,7 @@ static void rpcrdma_xprt_drain(struct
>> rpcrdma_xprt *r_xprt)
>> #endif
>> 		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
>> 		ep->rep_connected = -ENODEV;
>> -		xprt_force_disconnect(xprt);
>> +		xprt_disconnect_nowake(xprt);
>> 		wait_for_completion(&ia->ri_remove_done);
>> 
>> 		ia->ri_id = NULL;
>> @@ -280,10 +280,9 @@ static void rpcrdma_xprt_drain(struct
>> rpcrdma_xprt *r_xprt)
>> 			ep->rep_connected = -EAGAIN;
>> 		goto disconnected;
>> 	case RDMA_CM_EVENT_DISCONNECTED:
>> -		++xprt->connect_cookie;
>> 		ep->rep_connected = -ECONNABORTED;
>> disconnected:
>> -		xprt_force_disconnect(xprt);
>> +		xprt_disconnect_nowake(xprt);
>> 		wake_up_all(&ep->rep_connect_wait);
>> 		break;
>> 	default:
>> 
> 
> -- 
> Trond Myklebust
> Linux NFS client maintainer, Hammerspace
> trond.myklebust@xxxxxxxxxxxxxxx

--
Chuck Lever







[Index of Archives]     [Linux Filesystem Development]     [Linux USB Development]     [Linux Media Development]     [Video for Linux]     [Linux NILFS]     [Linux Audio Users]     [Yosemite Info]     [Linux SCSI]

  Powered by Linux