Re: [PATCH 5/6] Bluetooth: Use event-driven approach for handling ERTM receive buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi Mat,

* Mat Martineau <mathewm@xxxxxxxxxxxxxx> [2011-07-01 10:29:23 -0700]:

> 
> Hi Gustavo -
> 
> On Thu, 30 Jun 2011, Gustavo F. Padovan wrote:
> 
> >Hi Mat,
> >
> >* Mat Martineau <mathewm@xxxxxxxxxxxxxx> [2011-06-29 14:35:23 -0700]:
> >
> >>This change moves most L2CAP ERTM receive buffer handling out of the
> >>L2CAP core and in to the socket code.  It's up to the higher layer
> >>(the socket code, in this case) to tell the core when its buffer is
> >>full or has space available.  The recv op should always accept
> >>incoming ERTM data or else the connection will go down.
> >>
> >>Within the socket layer, an skb that does not fit in the socket
> >>receive buffer will be temporarily stored.  When the socket is read
> >>from, that skb will be placed in the receive buffer if possible.  Once
> >>adequate buffer space becomes available, the L2CAP core is informed
> >>and the ERTM local busy state is cleared.
> >>
> >>Receive buffer management for non-ERTM modes is unchanged.
> >>
> >>Signed-off-by: Mat Martineau <mathewm@xxxxxxxxxxxxxx>
> >>---
> >> include/net/bluetooth/l2cap.h |    3 ++
> >> net/bluetooth/l2cap_core.c    |   41 ++++++++++++++++---------
> >> net/bluetooth/l2cap_sock.c    |   67 +++++++++++++++++++++++++++++++++++++++--
> >> 3 files changed, 93 insertions(+), 18 deletions(-)
> >>
> >>diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h
> >>index 9c18e55..82387c5 100644
> >>--- a/include/net/bluetooth/l2cap.h
> >>+++ b/include/net/bluetooth/l2cap.h
> >>@@ -422,6 +422,8 @@ struct l2cap_conn {
> >> struct l2cap_pinfo {
> >> 	struct bt_sock	bt;
> >> 	struct l2cap_chan	*chan;
> >>+	struct sk_buff	*rx_busy_skb;
> >>+	int rx_busy;
> >> };
> >>
> >> enum {
> >>@@ -498,5 +500,6 @@ void l2cap_chan_close(struct l2cap_chan *chan, int reason);
> >> void l2cap_chan_destroy(struct l2cap_chan *chan);
> >> int l2cap_chan_connect(struct l2cap_chan *chan);
> >> int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len);
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy);
> >>
> >> #endif /* __L2CAP_H */
> >>diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c
> >>index 4b764b1..f0e7ba7 100644
> >>--- a/net/bluetooth/l2cap_core.c
> >>+++ b/net/bluetooth/l2cap_core.c
> >>@@ -3350,21 +3350,21 @@ static int l2cap_push_rx_skb(struct l2cap_chan *chan, struct sk_buff *skb, u16 c
> >> 	}
> >>
> >> 	err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>-	if (err >= 0) {
> >>-		chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>-		return err;
> >>-	}
> >>-
> >>-	l2cap_ertm_enter_local_busy(chan);
> >>-
> >>-	bt_cb(skb)->sar = control >> L2CAP_CTRL_SAR_SHIFT;
> >>-	__skb_queue_tail(&chan->busy_q, skb);
> >>-
> >>-	queue_work(_busy_wq, &chan->busy_work);
> >>+	chan->buffer_seq = (chan->buffer_seq + 1) % 64;
> >>
> >> 	return err;
> >> }
> >>
> >>+void l2cap_chan_busy(struct l2cap_chan *chan, int busy)
> >>+{
> >>+	if (chan->mode == L2CAP_MODE_ERTM) {
> >>+		if (busy)
> >>+			l2cap_ertm_enter_local_busy(chan);
> >>+		else
> >>+			l2cap_ertm_exit_local_busy(chan);
> >>+	}
> >>+}
> >>+
> >> static int l2cap_streaming_reassembly_sdu(struct l2cap_chan *chan, struct sk_buff *skb, u16 control)
> >> {
> >> 	struct sk_buff *_skb;
> >>@@ -3463,13 +3463,22 @@ static void l2cap_check_srej_gap(struct l2cap_chan *chan, u8 tx_seq)
> >> 	struct sk_buff *skb;
> >> 	u16 control;
> >>
> >>-	while ((skb = skb_peek(&chan->srej_q))) {
> >>+	while ((skb = skb_peek(&chan->srej_q)) &&
> >>+			!test_bit(CONN_LOCAL_BUSY, &chan->conn_state)) {
> >>+		int err;
> >>+
> >> 		if (bt_cb(skb)->tx_seq != tx_seq)
> >> 			break;
> >>
> >> 		skb = skb_dequeue(&chan->srej_q);
> >> 		control = bt_cb(skb)->sar << L2CAP_CTRL_SAR_SHIFT;
> >>-		l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+		err = l2cap_ertm_reassembly_sdu(chan, skb, control);
> >>+
> >>+		if (err < 0) {
> >>+			l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+			break;
> >>+		}
> >>+
> >> 		chan->buffer_seq_srej =
> >> 			(chan->buffer_seq_srej + 1) % 64;
> >> 		tx_seq = (tx_seq + 1) % 64;
> >>@@ -3625,8 +3634,10 @@ expected:
> >> 	}
> >>
> >> 	err = l2cap_push_rx_skb(chan, skb, rx_control);
> >>-	if (err < 0)
> >>-		return 0;
> >>+	if (err < 0) {
> >>+		l2cap_send_disconn_req(chan->conn, chan, ECONNRESET);
> >>+		return err;
> >>+	}
> >>
> >> 	if (rx_control & L2CAP_CTRL_FINAL) {
> >> 		if (!test_and_clear_bit(CONN_REJ_ACT, &chan->conn_state))
> >>diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c
> >>index 39082d4..ab0494b 100644
> >>--- a/net/bluetooth/l2cap_sock.c
> >>+++ b/net/bluetooth/l2cap_sock.c
> >>@@ -711,6 +711,8 @@ static int l2cap_sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t len, int flags)
> >> {
> >> 	struct sock *sk = sock->sk;
> >>+	struct l2cap_pinfo *pi = l2cap_pi(sk);
> >>+	int err;
> >>
> >> 	lock_sock(sk);
> >>
> >>@@ -725,9 +727,40 @@ static int l2cap_sock_recvmsg(struct kiocb *iocb, struct socket *sock, struct ms
> >> 	release_sock(sk);
> >>
> >> 	if (sock->type == SOCK_STREAM)
> >>-		return bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+		err = bt_sock_stream_recvmsg(iocb, sock, msg, len, flags);
> >>+	else
> >>+		err = bt_sock_recvmsg(iocb, sock, msg, len, flags);
> >>+
> >>+	if (pi->chan->mode == L2CAP_MODE_ERTM) {
> >>+		int queue_err;
> >>+		int threshold;
> >>+
> >>+		/* Attempt to put pending rx data in the socket buffer */
> >>+
> >>+		lock_sock(sk);
> >>+		if (pi->rx_busy_skb) {
> >>+			queue_err = sock_queue_rcv_skb(sk, pi->rx_busy_skb);
> >>+
> >>+			if (!queue_err)
> >>+				pi->rx_busy_skb = NULL;
> >>+		}
> >>+
> >>+		/* Restore data flow when half of the receive buffer is
> >>+		 * available.  This avoids resending large numbers of
> >>+		 * frames.
> >>+		 */
> >>+		threshold = sk->sk_rcvbuf >> 1;
> >>+		if (pi->rx_busy && !pi->rx_busy_skb &&
> >>+				atomic_read(&sk->sk_rmem_alloc) <= threshold) {
> >>+
> >>+			pi->rx_busy = 0;
> >>+			l2cap_chan_busy(l2cap_pi(sk)->chan, pi->rx_busy);
> >>+		}
> >>+
> >>+		release_sock(sk);
> >
> >This is to much core logic outside of the core. I didn't think hard on this
> >but this can be simplified to just read the threshold and report core if we
> >are not busy anymore, and then get rid of rx_busy and rx_busy_skb.
> 
> 
> The main purpose of this patch is to change the behavior of
> l2cap_ops->recv() to better match what ERTM needs.
> 
> The way sock_queue_rcv_skb() works is not a good match for ERTM.
> ERTM can't put data in to the buffer until after it is reassembled,
> so when sock_queue_rcv_skb() tells the core there's no room, the
> core needs to keep track of the skb that didn't fit.
> 
> The core code is simpler if we have a recv_cb function that tells
> the core it is busy *before* it will start rejecting skbs.  Then the
> core does not have to keep track of reassembled and acked skbs. The
> core knows that it gave the skb to the socket (or AMP manager) and
> does not have to worry about it any more.
> 
> This patch hides socket-specific behavior in socket-specific code,
> which seems like a good fit.  The core should not have to care about
> socket-specific behavior.
> 
> It also delays exit of the local busy state until there is receive
> buffer space available.  This results in fewer transitions in & out
> of local busy.
> 
> 
> I could get rid of rx_busy by querying the LOCAL_BUSY bit from the
> core.
> 
> 
> >
> >>+	}
> >>
> >>-	return bt_sock_recvmsg(iocb, sock, msg, len, flags);
> >>+	return err;
> >> }
> >>
> >> /* Kill socket (only if zapped and orphan)
> >>@@ -811,9 +844,32 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(void *data)
> >>
> >> static int l2cap_sock_recv_cb(void *data, struct sk_buff *skb)
> >> {
> >>+	int err;
> >> 	struct sock *sk = data;
> >>+	struct l2cap_pinfo *pi = l2cap_pi(sk);
> >>+
> >>+	if (pi->rx_busy_skb)
> >>+		return -ENOMEM;
> >>+
> >>+	err = sock_queue_rcv_skb(sk, skb);
> >>+
> >>+	/* For ERTM, handle one skb that doesn't fit into the recv
> >>+	 * buffer.  This is important to do because the data frames
> >>+	 * have already been acked, so the skb cannot be discarded.
> >>+	 *
> >>+	 * Notify the l2cap core that the buffer is full, so the
> >>+	 * LOCAL_BUSY state is entered and no more frames are
> >>+	 * acked and reassembled until there is buffer space
> >>+	 * available.
> >>+	 */
> >>+	if (err < 0 && pi->chan->mode == L2CAP_MODE_ERTM) {
> >>+		pi->rx_busy = 1;
> >>+		pi->rx_busy_skb = skb;
> >>+		l2cap_chan_busy(pi->chan, pi->rx_busy);
> >>+		err = 0;
> >>+	}
> >
> >This can totally be in l2cap_core.c. The idea is move Core stuff to
> >l2cap_core.c and not let the L2CAP users handle this. Just enter in LOCAL_BUSY
> >when recv_cb returns returns less than 0.
> 
> 
> I see the core's responsibility as handling the ERTM protocol - it
> receives, reassembles, and acks the data.  When it has finished that
> job, and has no indication that it should be "busy", it passes that
> data to the higher layer (socket or AMP manager) and is done with
> it. This means the socket layer must keep the one skb that didn't
> fit in the receive buffer, and it puts that data in the socket
> rcvbuf in a way that is invisible to the core.
> 
> This changes the local busy behavior from:
> 
>  * skb arrives, core reassembles it
>  * Core calls socket recv_cb, error returned
>  * Core puts 'end' PDU or 'unsegmented' SDU on the busy_q
>  * Core sends RNR
>  * Core polls socket recv_cb [up to 12 calls from core to socket]
>    * Reassembly code is executed each time (but data not re-copied)
>  * Core sends RR though there may be very little space in the socket
>    recv buffer
> 
> and instead does:
> 
>  * skb arrives, core reassembles it exactly once
>  * Core calls socket recv_cb
>  * Socket tells the core it is busy
>  * Core sends RNR
>  * Application calls recvmsg() until buffer is half full (no core
> interaction) [no limit other than socket timeout]
>  * Socket tells the core it is not busy
>  * Core sends RR, and there is socket buffer space to accept
> incoming data
> 
> The new way has no iteration in the core, and all of the work that
> happens during a recvmsg() stays in the socket code until there is
> buffer space available.
> 
> 
> The change is also motiviated by trying to remove the busy
> workqueue, which is a good thing to do for several reasons:
> 
>  * Gets rid of an extra thread and associated scheduling and memory
>  * The current implementation of l2cap_busy_work() does not operate
> well with multiple sockets (the workqueue thread is blocked until
> l2cap_busy_work() returns - so no)
>  * The socket layer knows when the receive buffer was read from, so
> it makes sense to only attempt reassembly after recvmsg and not poll
> every 200ms.
>  * Data gets to the socket layer faster if there's no wait for the
> next polling attempt.
>  * Channel is disconnected after a couple of seconds, which isn't
> necessary and may break some apps

This is actually a good idea in the end, get rid of the workqueue thread and
the polling every 200ms is a good thing to do. I didn't grok the recv path
totally when I first wrote this code. ;)

So let's remove busy_rx and go got a -v2 patch. Remove busy_rx may reduce the
logic inside socket code.

	Gustavo
--
To unsubscribe from this list: send the line "unsubscribe linux-bluetooth" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html


[Index of Archives]     [Bluez Devel]     [Linux Wireless Networking]     [Linux Wireless Personal Area Networking]     [Linux ATH6KL]     [Linux USB Devel]     [Linux Media Drivers]     [Linux Audio Users]     [Linux Kernel]     [Linux SCSI]     [Big List of Linux Books]

  Powered by Linux