Hey John, Exiting to see this work :-) On Fri, Oct 09, 2020 at 08:36 PM CEST, John Fastabend wrote: > When we receive an skb and the ingress skb verdict program returns > SK_PASS we currently set the ingress flag and put it on the workqueue > so it can be turned into a sk_msg and put on the sk_msg ingress queue. > Then finally telling userspace with data_ready hook. > > Here we observe that if the workqueue is empty then we can try to > convert into a sk_msg type and call data_ready directly without > bouncing through a workqueue. Its a common pattern to have a recv > verdict program for visibility that always returns SK_PASS. In this > case unless there is an ENOMEM error or we overrun the socket we > can avoid the workqueue completely only using it when we fall back > to error cases caused by memory pressure. > > By doing this we eliminate another case where data may be dropped > if errors occur on memory limits in workqueue. > > Fixes: 51199405f9672 ("bpf: skb_verdict, support SK_PASS on RX BPF path") > Signed-off-by: John Fastabend <john.fastabend@xxxxxxxxx> > --- > net/core/skmsg.c | 17 +++++++++++++++-- > 1 file changed, 15 insertions(+), 2 deletions(-) > > diff --git a/net/core/skmsg.c b/net/core/skmsg.c > index 040ae1d75b65..4b160d97b7f9 100644 > --- a/net/core/skmsg.c > +++ b/net/core/skmsg.c > @@ -773,6 +773,7 @@ static void sk_psock_verdict_apply(struct sk_psock *psock, > { > struct tcp_skb_cb *tcp; > struct sock *sk_other; > + int err = -EIO; > > switch (verdict) { > case __SK_PASS: > @@ -784,8 +785,20 @@ static void sk_psock_verdict_apply(struct sk_psock *psock, > > tcp = TCP_SKB_CB(skb); > tcp->bpf.flags |= BPF_F_INGRESS; > - skb_queue_tail(&psock->ingress_skb, skb); > - schedule_work(&psock->work); > + > + /* If the queue is empty then we can submit directly > + * into the msg queue. If its not empty we have to > + * queue work otherwise we may get OOO data. Otherwise, > + * if sk_psock_skb_ingress errors will be handled by > + * retrying later from workqueue. > + */ > + if (skb_queue_empty(&psock->ingress_skb)) { > + err = sk_psock_skb_ingress(psock, skb); When going through the workqueue (sk_psock_backlog), we will also check if socket didn't get detached from the process, that is if psock->sk->sk_socket != NULL, before queueing into msg queue. Do we need a similar check here? > + } > + if (err < 0) { > + skb_queue_tail(&psock->ingress_skb, skb); > + schedule_work(&psock->work); > + } > break; > case __SK_REDIRECT: > sk_psock_skb_redirect(skb);