Hi Sebastian, On Mon, Apr 8, 2024 at 9:13 AM Sebastian Urban <surban@xxxxxxxxxx> wrote: > > Previously LE flow credits were returned to the > sender even if the socket's receive buffer was > full. This meant that no back-pressure > was applied to the sender, thus it continued to > send data, resulting in data loss without any > error being reported. Furthermore, the amount > of credits was essentially fixed to a small amount, > leading to reduced performance. > > This is fixed by computing the number of returned > LE flow credits based on the available space in the > receive buffer of an L2CAP socket. Consequently, > if the receive buffer is full, no credits are returned > until the buffer is read and thus cleared by user-space. > > Since the computation of available > receive buffer space can only be performed > approximately, i.e. sk_buff overhead is ignored, > and the receive buffer size may be changed by > user-space after flow credits have been sent, > superfluous received data is temporary stored within > l2cap_pinfo. This is necessary because Bluetooth LE > provides no retransmission mechanism once the > data has been acked by the physical layer. > > Signed-off-by: Sebastian Urban <surban@xxxxxxxxxx> > --- > include/net/bluetooth/l2cap.h | 10 +++++- > net/bluetooth/l2cap_core.c | 51 ++++++++++++++++++++++---- > net/bluetooth/l2cap_sock.c | 67 +++++++++++++++++++++++++---------- > 3 files changed, 103 insertions(+), 25 deletions(-) > > diff --git a/include/net/bluetooth/l2cap.h b/include/net/bluetooth/l2cap.h > index 3f4057ced971..bc6ff40ebc9f 100644 > --- a/include/net/bluetooth/l2cap.h > +++ b/include/net/bluetooth/l2cap.h > @@ -547,6 +547,8 @@ struct l2cap_chan { > > __u16 tx_credits; > __u16 rx_credits; > + int rx_avail; > + int rx_staged; I'd probably use size_t for these above, and put some comments of what they refer to e.g. rx_staged is what has been received already, right? Couldn't we use chan->sdu->len instead? > > __u8 tx_state; > __u8 rx_state; > @@ -682,10 +684,15 @@ struct l2cap_user { > /* ----- L2CAP socket info ----- */ > #define l2cap_pi(sk) ((struct l2cap_pinfo *) sk) > > +struct l2cap_rx_busy { > + struct list_head list; > + struct sk_buff *skb; > +}; > + > struct l2cap_pinfo { > struct bt_sock bt; > struct l2cap_chan *chan; > - struct sk_buff *rx_busy_skb; > + struct list_head rx_busy; > }; > > enum { > @@ -944,6 +951,7 @@ int l2cap_chan_reconfigure(struct l2cap_chan *chan, __u16 mtu); > int l2cap_chan_send(struct l2cap_chan *chan, struct msghdr *msg, size_t len, > const struct sockcm_cookie *sockc); > void l2cap_chan_busy(struct l2cap_chan *chan, int busy); > +void l2cap_chan_rx_avail(struct l2cap_chan *chan, int rx_avail); > int l2cap_chan_check_security(struct l2cap_chan *chan, bool initiator); > void l2cap_chan_set_defaults(struct l2cap_chan *chan); > int l2cap_ertm_init(struct l2cap_chan *chan); > diff --git a/net/bluetooth/l2cap_core.c b/net/bluetooth/l2cap_core.c > index b0970462a689..7bad232d40ba 100644 > --- a/net/bluetooth/l2cap_core.c > +++ b/net/bluetooth/l2cap_core.c > @@ -454,6 +454,9 @@ struct l2cap_chan *l2cap_chan_create(void) > /* Set default lock nesting level */ > atomic_set(&chan->nesting, L2CAP_NESTING_NORMAL); > > + /* Available receive buffer space is initially unknown */ > + chan->rx_avail = -1; > + > write_lock(&chan_list_lock); > list_add(&chan->global_l, &chan_list); > write_unlock(&chan_list_lock); > @@ -535,6 +538,26 @@ void l2cap_chan_set_defaults(struct l2cap_chan *chan) > } > EXPORT_SYMBOL_GPL(l2cap_chan_set_defaults); > > +static __u16 l2cap_le_rx_credits(struct l2cap_chan *chan) > +{ > + if (chan->mps == 0) > + return 0; > + > + /* If we don't know the available space in the receiver buffer, give > + * enough credits for a full packet. > + */ > + if (chan->rx_avail == -1) > + return (chan->imtu / chan->mps) + 1; > + > + /* If we know how much space is available in the receive buffer, give > + * out as many credits as would fill the buffer. > + */ > + if (chan->rx_avail <= chan->rx_staged) > + return 0; Missing space. > + return min_t(int, U16_MAX, > + (chan->rx_avail - chan->rx_staged) / chan->mps); We probably need to use DIV_ROUND_UP since the division can return 0 or is that intentional since that means we cannot store another full mps? I think I'd give it a credit since this may impact the throughput if we are not given credits when just a few bytes would be necessary and in any case we would store the extra bytes in the rx_busy list if that is over the rx_avail. > +} > + > static void l2cap_le_flowctl_init(struct l2cap_chan *chan, u16 tx_credits) > { > chan->sdu = NULL; > @@ -543,8 +566,7 @@ static void l2cap_le_flowctl_init(struct l2cap_chan *chan, u16 tx_credits) > chan->tx_credits = tx_credits; > /* Derive MPS from connection MTU to stop HCI fragmentation */ > chan->mps = min_t(u16, chan->imtu, chan->conn->mtu - L2CAP_HDR_SIZE); > - /* Give enough credits for a full packet */ > - chan->rx_credits = (chan->imtu / chan->mps) + 1; > + chan->rx_credits = l2cap_le_rx_credits(chan); > > skb_queue_head_init(&chan->tx_q); > } > @@ -556,7 +578,7 @@ static void l2cap_ecred_init(struct l2cap_chan *chan, u16 tx_credits) > /* L2CAP implementations shall support a minimum MPS of 64 octets */ > if (chan->mps < L2CAP_ECRED_MIN_MPS) { > chan->mps = L2CAP_ECRED_MIN_MPS; > - chan->rx_credits = (chan->imtu / chan->mps) + 1; > + chan->rx_credits = l2cap_le_rx_credits(chan); > } > } > > @@ -6520,9 +6542,7 @@ static void l2cap_chan_le_send_credits(struct l2cap_chan *chan) > { > struct l2cap_conn *conn = chan->conn; > struct l2cap_le_credits pkt; > - u16 return_credits; > - > - return_credits = (chan->imtu / chan->mps) + 1; > + u16 return_credits = l2cap_le_rx_credits(chan); > > if (chan->rx_credits >= return_credits) > return; > @@ -6541,6 +6561,16 @@ static void l2cap_chan_le_send_credits(struct l2cap_chan *chan) > l2cap_send_cmd(conn, chan->ident, L2CAP_LE_CREDITS, sizeof(pkt), &pkt); > } > > +void l2cap_chan_rx_avail(struct l2cap_chan *chan, int rx_avail) > +{ > + BT_DBG("chan %p has %d bytes avail for rx", chan, rx_avail); We should probably check if the value has changed though, or this shall only be called when the buffer changes? > + chan->rx_avail = rx_avail; > + > + if (chan->state == BT_CONNECTED) > + l2cap_chan_le_send_credits(chan); > +} > + > static int l2cap_ecred_recv(struct l2cap_chan *chan, struct sk_buff *skb) > { > int err; > @@ -6550,6 +6580,14 @@ static int l2cap_ecred_recv(struct l2cap_chan *chan, struct sk_buff *skb) > /* Wait recv to confirm reception before updating the credits */ > err = chan->ops->recv(chan, skb); > > + chan->rx_staged = 0; > + > + if (err < 0 && chan->rx_avail != -1) { > + BT_ERR("Queueing received LE L2CAP data failed"); > + l2cap_send_disconn_req(chan, ECONNRESET); > + return err; > + } > + > /* Update credits whenever an SDU is received */ > l2cap_chan_le_send_credits(chan); > > @@ -6571,6 +6609,7 @@ static int l2cap_ecred_data_rcv(struct l2cap_chan *chan, struct sk_buff *skb) > return -ENOBUFS; > } > > + chan->rx_staged += skb->len; > chan->rx_credits--; > BT_DBG("rx_credits %u -> %u", chan->rx_credits + 1, chan->rx_credits); > > diff --git a/net/bluetooth/l2cap_sock.c b/net/bluetooth/l2cap_sock.c > index 7846a068bf60..46603605cb69 100644 > --- a/net/bluetooth/l2cap_sock.c > +++ b/net/bluetooth/l2cap_sock.c > @@ -1157,6 +1157,7 @@ static int l2cap_sock_recvmsg(struct socket *sock, struct msghdr *msg, > { > struct sock *sk = sock->sk; > struct l2cap_pinfo *pi = l2cap_pi(sk); > + int avail; > int err; > > if (unlikely(flags & MSG_ERRQUEUE)) > @@ -1192,28 +1193,34 @@ static int l2cap_sock_recvmsg(struct socket *sock, struct msghdr *msg, > else > err = bt_sock_recvmsg(sock, msg, len, flags); > > - if (pi->chan->mode != L2CAP_MODE_ERTM) > + if (pi->chan->mode != L2CAP_MODE_ERTM && > + pi->chan->mode != L2CAP_MODE_LE_FLOWCTL && > + pi->chan->mode != L2CAP_MODE_EXT_FLOWCTL) > return err; > > - /* Attempt to put pending rx data in the socket buffer */ > - > lock_sock(sk); > > - if (!test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state)) > - goto done; > + avail = max(0, sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc)); > + l2cap_chan_rx_avail(pi->chan, avail); > > - if (pi->rx_busy_skb) { > - if (!__sock_queue_rcv_skb(sk, pi->rx_busy_skb)) > - pi->rx_busy_skb = NULL; > - else > + /* Attempt to put pending rx data in the socket buffer */ > + while (!list_empty(&pi->rx_busy)) { > + struct l2cap_rx_busy *rx_busy = > + list_first_entry(&pi->rx_busy, > + struct l2cap_rx_busy, > + list); > + if (__sock_queue_rcv_skb(sk, rx_busy->skb) < 0) > goto done; > + list_del(&rx_busy->list); > + kfree(rx_busy); > } > > /* Restore data flow when half of the receive buffer is > * available. This avoids resending large numbers of > * frames. > */ > - if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf >> 1) > + if (test_bit(CONN_LOCAL_BUSY, &pi->chan->conn_state) && > + atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf >> 1) > l2cap_chan_busy(pi->chan, 0); > > done: > @@ -1474,17 +1481,21 @@ static struct l2cap_chan *l2cap_sock_new_connection_cb(struct l2cap_chan *chan) > static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb) > { > struct sock *sk = chan->data; > + struct l2cap_pinfo *pi = l2cap_pi(sk); > + int avail; > int err; > > lock_sock(sk); > > - if (l2cap_pi(sk)->rx_busy_skb) { > + if (chan->mode == L2CAP_MODE_ERTM && !list_empty(&pi->rx_busy)) { > err = -ENOMEM; > goto done; > } > > if (chan->mode != L2CAP_MODE_ERTM && > - chan->mode != L2CAP_MODE_STREAMING) { > + chan->mode != L2CAP_MODE_STREAMING && > + chan->mode != L2CAP_MODE_LE_FLOWCTL && > + chan->mode != L2CAP_MODE_EXT_FLOWCTL) { > /* Even if no filter is attached, we could potentially > * get errors from security modules, etc. > */ > @@ -1495,7 +1506,10 @@ static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb) > > err = __sock_queue_rcv_skb(sk, skb); > > - /* For ERTM, handle one skb that doesn't fit into the recv > + avail = max(0, sk->sk_rcvbuf - atomic_read(&sk->sk_rmem_alloc)); > + l2cap_chan_rx_avail(chan, avail); > + > + /* For ERTM and LE, handle a skb that doesn't fit into the recv > * buffer. This is important to do because the data frames > * have already been acked, so the skb cannot be discarded. > * > @@ -1504,8 +1518,18 @@ static int l2cap_sock_recv_cb(struct l2cap_chan *chan, struct sk_buff *skb) > * acked and reassembled until there is buffer space > * available. > */ > - if (err < 0 && chan->mode == L2CAP_MODE_ERTM) { > - l2cap_pi(sk)->rx_busy_skb = skb; > + if (err < 0 && > + (chan->mode == L2CAP_MODE_ERTM || > + chan->mode == L2CAP_MODE_LE_FLOWCTL || > + chan->mode == L2CAP_MODE_EXT_FLOWCTL)) { > + struct l2cap_rx_busy *rx_busy = > + kmalloc(sizeof(*rx_busy), GFP_KERNEL); > + if (!rx_busy) { > + err = -ENOMEM; > + goto done; > + } > + rx_busy->skb = skb; > + list_add_tail(&rx_busy->list, &pi->rx_busy); > l2cap_chan_busy(chan, 1); > err = 0; > } > @@ -1731,6 +1755,8 @@ static const struct l2cap_ops l2cap_chan_ops = { > > static void l2cap_sock_destruct(struct sock *sk) > { > + struct l2cap_rx_busy *rx_busy, *next; > + > BT_DBG("sk %p", sk); > > if (l2cap_pi(sk)->chan) { > @@ -1738,9 +1764,10 @@ static void l2cap_sock_destruct(struct sock *sk) > l2cap_chan_put(l2cap_pi(sk)->chan); > } > > - if (l2cap_pi(sk)->rx_busy_skb) { > - kfree_skb(l2cap_pi(sk)->rx_busy_skb); > - l2cap_pi(sk)->rx_busy_skb = NULL; > + list_for_each_entry_safe(rx_busy, next, &l2cap_pi(sk)->rx_busy, list) { > + kfree_skb(rx_busy->skb); > + list_del(&rx_busy->list); > + kfree(rx_busy); > } > > skb_queue_purge(&sk->sk_receive_queue); > @@ -1845,6 +1872,8 @@ static struct sock *l2cap_sock_alloc(struct net *net, struct socket *sock, > sk->sk_destruct = l2cap_sock_destruct; > sk->sk_sndtimeo = L2CAP_CONN_TIMEOUT; > > + INIT_LIST_HEAD(&l2cap_pi(sk)->rx_busy); > + > chan = l2cap_chan_create(); > if (!chan) { > sk_free(sk); > @@ -1853,6 +1882,8 @@ static struct sock *l2cap_sock_alloc(struct net *net, struct socket *sock, > > l2cap_chan_hold(chan); > > + l2cap_chan_rx_avail(chan, sk->sk_rcvbuf); > + > l2cap_pi(sk)->chan = chan; > > return sk; > -- > 2.34.1 > -- Luiz Augusto von Dentz