John Fastabend <john.fastabend@xxxxxxxxx> wrote: > > Pengcheng Yang wrote: > > Currently msg is queued in ingress_msg of the target psock > > on ingress redirect, without increment rcv_nxt. The size > > that user can read includes the data in receive_queue and > > ingress_msg. So we introduce sk_msg_queue_len() helper to > > get the data length in ingress_msg. > > > > Note that the msg_len does not include the data length of > > msg from recevive_queue via SK_PASS, as they increment rcv_nxt > > when received. > > > > Signed-off-by: Pengcheng Yang <yangpc@xxxxxxxxxx> > > --- > > include/linux/skmsg.h | 26 ++++++++++++++++++++++++-- > > net/core/skmsg.c | 10 +++++++++- > > 2 files changed, 33 insertions(+), 3 deletions(-) > > > > This has two writers under different locks this looks insufficient > to ensure correctness of the counter. Likely the consume can be > moved into where the dequeue_msg happens? But, then its not always > accurate which might break some applications doing buffer sizing. > An example of this would be nginx. > > > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h > > index c1637515a8a4..423a5c28c606 100644 > > --- a/include/linux/skmsg.h > > +++ b/include/linux/skmsg.h > > @@ -47,6 +47,7 @@ struct sk_msg { > > u32 apply_bytes; > > u32 cork_bytes; > > u32 flags; > > + bool ingress_self; > > struct sk_buff *skb; > > struct sock *sk_redir; > > struct sock *sk; > > @@ -82,6 +83,7 @@ struct sk_psock { > > u32 apply_bytes; > > u32 cork_bytes; > > u32 eval; > > + u32 msg_len; > > bool redir_ingress; /* undefined if sk_redir is null */ > > struct sk_msg *cork; > > struct sk_psock_progs progs; > > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock, > > struct sk_msg *msg) > > { > > spin_lock_bh(&psock->ingress_lock); > > - if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) > > + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { > > list_add_tail(&msg->list, &psock->ingress_msg); > > - else { > > + if (!msg->ingress_self) > > + WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size); > > First writer here can be from > > sk_psock_backlog() > mutex_lock(psock->work_mutex) > ... > sk_psock_handle_skb() > sk_psock_skb_ingress() > sk_psock_skb_ingress_enqueue() > sk_psock_queue_msg() > spin_lock_bh(psock->ingress_lock) > WRITE_ONCE(...) > spin_unlock_bh() > > > + } else { > > sk_msg_free(psock->sk, msg); > > kfree(msg); > > } > > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg) > > kfree(msg); > > } > > > > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) > > +{ > > + WRITE_ONCE(psock->msg_len, psock->msg_len - len); > > +} > > + > > +static inline u32 sk_msg_queue_len(const struct sock *sk) > > +{ > > + struct sk_psock *psock; > > + u32 len = 0; > > + > > + rcu_read_lock(); > > + psock = sk_psock(sk); > > + if (psock) > > + len = READ_ONCE(psock->msg_len); > > + rcu_read_unlock(); > > + return len; > > +} > > + > > static inline void sk_psock_report_error(struct sk_psock *psock, int err) > > { > > struct sock *sk = psock->sk; > > diff --git a/net/core/skmsg.c b/net/core/skmsg.c > > index 6c31eefbd777..f46732a8ddc2 100644 > > --- a/net/core/skmsg.c > > +++ b/net/core/skmsg.c > > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > struct iov_iter *iter = &msg->msg_iter; > > int peek = flags & MSG_PEEK; > > struct sk_msg *msg_rx; > > - int i, copied = 0; > > + int i, copied = 0, msg_copied = 0; > > > > msg_rx = sk_psock_peek_msg(psock); > > while (copied != len) { > > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > } > > > > copied += copy; > > + if (!msg_rx->ingress_self) > > + msg_copied += copy; > > if (likely(!peek)) { > > sge->offset += copy; > > sge->length -= copy; > > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > msg_rx = sk_psock_peek_msg(psock); > > } > > out: > > + if (likely(!peek) && msg_copied) > > + sk_msg_queue_consumed(psock, msg_copied); > > Second writer, > > tcp_bpf_recvmsg_parser() > lock_sock(sk) > sk_msg_recvmsg() > sk_psock_peek_msg() > spin_lock_bh(ingress_lock); <- lock held from first writer. > msg = list... > spin_unlock_bh() > sk_psock_Dequeue_msg(psock) > spin_lock_bh(ingress_lock) > msg = .... <- should call queue_consumed here > spin_unlock_bh() > > out: > if (likely(!peek) && msg_copied) > sk_msg_queue_consumed(psock, msg_copied); <- here no lock? > > > It looks like you could move the queue_consumed up into the dequeue_msg, > but then you have some issue on partial reads I think? Basically the > IOCTL might return more bytes than are actually in the ingress queue. > Also it will look strange if the ioctl is called twice once before a read > and again after a read and the byte count doesn't change. > Thanks john for pointing this out. Yes, I tried to move queue_consumed into dequeue_msg without making major changes to sk_msg_recvmsg, but failed. > Maybe needs ingress queue lock wrapped around this queue consuned and > leave it where it is? Couple ideas anyways, but I don't think its > correct as is. And, is it acceptable to just put the ingress_lock around the queue_consuned in Sk_msg_recvmsg? Like the following: static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) { + spin_lock_bh(&psock->ingress_lock); WRITE_ONCE(psock->msg_len, psock->msg_len - len); + spin_unlock_bh(&psock->ingress_lock); } static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) { struct sk_msg *msg, *tmp; list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { list_del(&msg->list); if (!msg->ingress_self) ~ WRITE_ONCE(psock->msg_len, psock->msg_len - msg->sg.size); sk_msg_free(psock->sk, msg); kfree(msg); } WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); } > > > return copied; > > } > > EXPORT_SYMBOL_GPL(sk_msg_recvmsg); > > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb > > > > if (unlikely(!msg)) > > return -EAGAIN; > > + msg->ingress_self = true; > > skb_set_owner_r(skb, sk); > > err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg); > > if (err < 0) > > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) > > > > purge doesn't use the ingress_lock because its cancelled and syncd the > backlog and proto handlers have been swapped back to original handlers > so there is no longer any way to get at the ingress queue from the socket > side either. > > > list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { > > list_del(&msg->list); > > + if (!msg->ingress_self) > > + sk_msg_queue_consumed(psock, msg->sg.size); > > sk_msg_free(psock->sk, msg); > > kfree(msg); > > } > > + WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); > > } > > > > static void __sk_psock_zap_ingress(struct sk_psock *psock) > > -- > > 2.38.1 > >