Re: [PATCH bpf-next v2 1/3] skmsg: Support to get the data length in ingress_msg

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



John Fastabend <john.fastabend@xxxxxxxxx> wrote:
> 
> Pengcheng Yang wrote:
> > Currently msg is queued in ingress_msg of the target psock
> > on ingress redirect, without increment rcv_nxt. The size
> > that user can read includes the data in receive_queue and
> > ingress_msg. So we introduce sk_msg_queue_len() helper to
> > get the data length in ingress_msg.
> >
> > Note that the msg_len does not include the data length of
> > msg from recevive_queue via SK_PASS, as they increment rcv_nxt
> > when received.
> >
> > Signed-off-by: Pengcheng Yang <yangpc@xxxxxxxxxx>
> > ---
> >  include/linux/skmsg.h | 26 ++++++++++++++++++++++++--
> >  net/core/skmsg.c      | 10 +++++++++-
> >  2 files changed, 33 insertions(+), 3 deletions(-)
> >
> 
> This has two writers under different locks this looks insufficient
> to ensure correctness of the counter. Likely the consume can be
> moved into where the dequeue_msg happens? But, then its not always
> accurate which might break some applications doing buffer sizing.
> An example of this would be nginx.
> 
> > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
> > index c1637515a8a4..423a5c28c606 100644
> > --- a/include/linux/skmsg.h
> > +++ b/include/linux/skmsg.h
> > @@ -47,6 +47,7 @@ struct sk_msg {
> >  	u32				apply_bytes;
> >  	u32				cork_bytes;
> >  	u32				flags;
> > +	bool				ingress_self;
> >  	struct sk_buff			*skb;
> >  	struct sock			*sk_redir;
> >  	struct sock			*sk;
> > @@ -82,6 +83,7 @@ struct sk_psock {
> >  	u32				apply_bytes;
> >  	u32				cork_bytes;
> >  	u32				eval;
> > +	u32				msg_len;
> >  	bool				redir_ingress; /* undefined if sk_redir is null */
> >  	struct sk_msg			*cork;
> >  	struct sk_psock_progs		progs;
> > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock,
> >  				      struct sk_msg *msg)
> >  {
> >  	spin_lock_bh(&psock->ingress_lock);
> > -	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
> > +	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) {
> >  		list_add_tail(&msg->list, &psock->ingress_msg);
> > -	else {
> > +		if (!msg->ingress_self)
> > +			WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size);
> 
> First writer here can be from
> 
>   sk_psock_backlog()
>     mutex_lock(psock->work_mutex)
>     ...
>     sk_psock_handle_skb()
>       sk_psock_skb_ingress()
>         sk_psock_skb_ingress_enqueue()
>           sk_psock_queue_msg()
>              spin_lock_bh(psock->ingress_lock)
>                WRITE_ONCE(...)
>              spin_unlock_bh()
> 
> > +	} else {
> >  		sk_msg_free(psock->sk, msg);
> >  		kfree(msg);
> >  	}
> > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg)
> >  	kfree(msg);
> >  }
> >
> > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
> > +{
> > +	WRITE_ONCE(psock->msg_len, psock->msg_len - len);
> > +}
> > +
> > +static inline u32 sk_msg_queue_len(const struct sock *sk)
> > +{
> > +	struct sk_psock *psock;
> > +	u32 len = 0;
> > +
> > +	rcu_read_lock();
> > +	psock = sk_psock(sk);
> > +	if (psock)
> > +		len = READ_ONCE(psock->msg_len);
> > +	rcu_read_unlock();
> > +	return len;
> > +}
> > +
> >  static inline void sk_psock_report_error(struct sk_psock *psock, int err)
> >  {
> >  	struct sock *sk = psock->sk;
> > diff --git a/net/core/skmsg.c b/net/core/skmsg.c
> > index 6c31eefbd777..f46732a8ddc2 100644
> > --- a/net/core/skmsg.c
> > +++ b/net/core/skmsg.c
> > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  	struct iov_iter *iter = &msg->msg_iter;
> >  	int peek = flags & MSG_PEEK;
> >  	struct sk_msg *msg_rx;
> > -	int i, copied = 0;
> > +	int i, copied = 0, msg_copied = 0;
> >
> >  	msg_rx = sk_psock_peek_msg(psock);
> >  	while (copied != len) {
> > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  			}
> >
> >  			copied += copy;
> > +			if (!msg_rx->ingress_self)
> > +				msg_copied += copy;
> >  			if (likely(!peek)) {
> >  				sge->offset += copy;
> >  				sge->length -= copy;
> > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  		msg_rx = sk_psock_peek_msg(psock);
> >  	}
> >  out:
> > +	if (likely(!peek) && msg_copied)
> > +		sk_msg_queue_consumed(psock, msg_copied);
> 
> Second writer,
> 
>    tcp_bpf_recvmsg_parser()
>      lock_sock(sk)
>      sk_msg_recvmsg()
>        sk_psock_peek_msg()
>          spin_lock_bh(ingress_lock); <- lock held from first writer.
>          msg = list...
>          spin_unlock_bh()
>        sk_psock_Dequeue_msg(psock)
>          spin_lock_bh(ingress_lock)
>          msg = ....                     <- should call queue_consumed here
>          spin_unlock_bh()
> 
>     out:
>       if (likely(!peek) && msg_copied)
>         sk_msg_queue_consumed(psock, msg_copied); <- here no lock?
> 
> 
> It looks like you could move the queue_consumed up into the dequeue_msg,
> but then you have some issue on partial reads I think? Basically the
> IOCTL might return more bytes than are actually in the ingress queue.
> Also it will look strange if the ioctl is called twice once before a read
> and again after a read and the byte count doesn't change.
> 

Thanks john for pointing this out.
Yes, I tried to move queue_consumed into dequeue_msg without
making major changes to sk_msg_recvmsg, but failed.

> Maybe needs ingress queue lock wrapped around this queue consuned and
> leave it where it is? Couple ideas anyways, but I don't think its
> correct as is.

And, is it acceptable to just put the ingress_lock around the queue_consuned in
Sk_msg_recvmsg? Like the following:

  static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
  {
+         spin_lock_bh(&psock->ingress_lock);
          WRITE_ONCE(psock->msg_len, psock->msg_len - len);
+         spin_unlock_bh(&psock->ingress_lock);
  }

  static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
  {
          struct sk_msg *msg, *tmp;
  
          list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
                  list_del(&msg->list);
                  if (!msg->ingress_self)
~                         WRITE_ONCE(psock->msg_len, psock->msg_len - msg->sg.size);
                  sk_msg_free(psock->sk, msg);
                  kfree(msg);
          }  
          WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
  }


> 
> >  	return copied;
> >  }
> >  EXPORT_SYMBOL_GPL(sk_msg_recvmsg);
> > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb
> >
> >  	if (unlikely(!msg))
> >  		return -EAGAIN;
> > +	msg->ingress_self = true;
> >  	skb_set_owner_r(skb, sk);
> >  	err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg);
> >  	if (err < 0)
> > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
> >
> 
> purge doesn't use the ingress_lock because its cancelled and syncd the
> backlog and proto handlers have been swapped back to original handlers
> so there is no longer any way to get at the ingress queue from the socket
> side either.
> 
> >  	list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
> >  		list_del(&msg->list);
> > +		if (!msg->ingress_self)
> > +			sk_msg_queue_consumed(psock, msg->sg.size);
> >  		sk_msg_free(psock->sk, msg);
> >  		kfree(msg);
> >  	}
> > +	WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
> >  }
> >
> >  static void __sk_psock_zap_ingress(struct sk_psock *psock)
> > --
> > 2.38.1
> >





[Index of Archives]     [Linux Samsung SoC]     [Linux Rockchip SoC]     [Linux Actions SoC]     [Linux for Synopsys ARC Processors]     [Linux NFS]     [Linux NILFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]


  Powered by Linux