diff mbox series

[bpf-next,v2,1/3] skmsg: Support to get the data length in ingress_msg

Message ID 1700565725-2706-2-git-send-email-yangpc@wangsu.com (mailing list archive)
State Changes Requested
Delegated to: BPF
Headers show
Series skmsg: Add the data length in skmsg to SIOCINQ ioctl and rx_queue | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/codegen success Generated files up to date
netdev/tree_selection success Clearly marked for bpf-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1175 this patch: 1175
netdev/cc_maintainers warning 1 maintainers not CCed: pabeni@redhat.com
netdev/build_clang success Errors and warnings before: 1162 this patch: 1162
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1202 this patch: 1202
netdev/checkpatch warning WARNING: line length of 82 exceeds 80 columns
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-VM_Test-30 success Logs for x86_64-llvm-16 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-33 success Logs for x86_64-llvm-16 / veristat
bpf/vmtest-bpf-next-VM_Test-31 success Logs for x86_64-llvm-16 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-32 success Logs for x86_64-llvm-16 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-16
bpf/vmtest-bpf-next-PR success PR summary
bpf/vmtest-bpf-next-VM_Test-0 success Logs for Lint
bpf/vmtest-bpf-next-VM_Test-1 success Logs for ShellCheck
bpf/vmtest-bpf-next-VM_Test-2 success Logs for Validate matrix.py
bpf/vmtest-bpf-next-VM_Test-3 success Logs for aarch64-gcc / build / build for aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-8 success Logs for aarch64-gcc / veristat
bpf/vmtest-bpf-next-VM_Test-4 success Logs for aarch64-gcc / test (test_maps, false, 360) / test_maps on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-6 success Logs for aarch64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-7 success Logs for aarch64-gcc / test (test_verifier, false, 360) / test_verifier on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-5 success Logs for aarch64-gcc / test (test_progs, false, 360) / test_progs on aarch64 with gcc
bpf/vmtest-bpf-next-VM_Test-9 success Logs for s390x-gcc / build / build for s390x with gcc
bpf/vmtest-bpf-next-VM_Test-14 success Logs for s390x-gcc / veristat
bpf/vmtest-bpf-next-VM_Test-15 success Logs for set-matrix
bpf/vmtest-bpf-next-VM_Test-16 success Logs for x86_64-gcc / build / build for x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-17 success Logs for x86_64-gcc / test (test_maps, false, 360) / test_maps on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-20 success Logs for x86_64-gcc / test (test_progs_no_alu32_parallel, true, 30) / test_progs_no_alu32_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-21 success Logs for x86_64-gcc / test (test_progs_parallel, true, 30) / test_progs_parallel on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-22 success Logs for x86_64-gcc / test (test_verifier, false, 360) / test_verifier on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-23 success Logs for x86_64-gcc / veristat / veristat on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-29 success Logs for x86_64-llvm-16 / veristat
bpf/vmtest-bpf-next-VM_Test-24 success Logs for x86_64-llvm-16 / build / build for x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-25 success Logs for x86_64-llvm-16 / test (test_maps, false, 360) / test_maps on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-18 success Logs for x86_64-gcc / test (test_progs, false, 360) / test_progs on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-19 success Logs for x86_64-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with gcc
bpf/vmtest-bpf-next-VM_Test-26 success Logs for x86_64-llvm-16 / test (test_progs, false, 360) / test_progs on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-27 success Logs for x86_64-llvm-16 / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-28 success Logs for x86_64-llvm-16 / test (test_verifier, false, 360) / test_verifier on x86_64 with llvm-16
bpf/vmtest-bpf-next-VM_Test-13 success Logs for s390x-gcc / test (test_verifier, false, 360) / test_verifier on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-12 success Logs for s390x-gcc / test (test_progs_no_alu32, false, 360) / test_progs_no_alu32 on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-11 success Logs for s390x-gcc / test (test_progs, false, 360) / test_progs on s390x with gcc
bpf/vmtest-bpf-next-VM_Test-10 success Logs for s390x-gcc / test (test_maps, false, 360) / test_maps on s390x with gcc

Commit Message

Pengcheng Yang Nov. 21, 2023, 11:22 a.m. UTC
Currently msg is queued in ingress_msg of the target psock
on ingress redirect, without increment rcv_nxt. The size
that user can read includes the data in receive_queue and
ingress_msg. So we introduce sk_msg_queue_len() helper to
get the data length in ingress_msg.

Note that the msg_len does not include the data length of
msg from recevive_queue via SK_PASS, as they increment rcv_nxt
when received.

Signed-off-by: Pengcheng Yang <yangpc@wangsu.com>
---
 include/linux/skmsg.h | 26 ++++++++++++++++++++++++--
 net/core/skmsg.c      | 10 +++++++++-
 2 files changed, 33 insertions(+), 3 deletions(-)

Comments

John Fastabend Dec. 5, 2023, 12:23 a.m. UTC | #1
Pengcheng Yang wrote:
> Currently msg is queued in ingress_msg of the target psock
> on ingress redirect, without increment rcv_nxt. The size
> that user can read includes the data in receive_queue and
> ingress_msg. So we introduce sk_msg_queue_len() helper to
> get the data length in ingress_msg.
> 
> Note that the msg_len does not include the data length of
> msg from recevive_queue via SK_PASS, as they increment rcv_nxt
> when received.
> 
> Signed-off-by: Pengcheng Yang <yangpc@wangsu.com>
> ---
>  include/linux/skmsg.h | 26 ++++++++++++++++++++++++--
>  net/core/skmsg.c      | 10 +++++++++-
>  2 files changed, 33 insertions(+), 3 deletions(-)
> 

This has two writers under different locks this looks insufficient
to ensure correctness of the counter. Likely the consume can be
moved into where the dequeue_msg happens? But, then its not always
accurate which might break some applications doing buffer sizing.
An example of this would be nginx.

> diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
> index c1637515a8a4..423a5c28c606 100644
> --- a/include/linux/skmsg.h
> +++ b/include/linux/skmsg.h
> @@ -47,6 +47,7 @@ struct sk_msg {
>  	u32				apply_bytes;
>  	u32				cork_bytes;
>  	u32				flags;
> +	bool				ingress_self;
>  	struct sk_buff			*skb;
>  	struct sock			*sk_redir;
>  	struct sock			*sk;
> @@ -82,6 +83,7 @@ struct sk_psock {
>  	u32				apply_bytes;
>  	u32				cork_bytes;
>  	u32				eval;
> +	u32				msg_len;
>  	bool				redir_ingress; /* undefined if sk_redir is null */
>  	struct sk_msg			*cork;
>  	struct sk_psock_progs		progs;
> @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock,
>  				      struct sk_msg *msg)
>  {
>  	spin_lock_bh(&psock->ingress_lock);
> -	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
> +	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) {
>  		list_add_tail(&msg->list, &psock->ingress_msg);
> -	else {
> +		if (!msg->ingress_self)
> +			WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size);

First writer here can be from

  sk_psock_backlog()
    mutex_lock(psock->work_mutex)
    ...
    sk_psock_handle_skb()
      sk_psock_skb_ingress()
        sk_psock_skb_ingress_enqueue()
          sk_psock_queue_msg()
             spin_lock_bh(psock->ingress_lock)
               WRITE_ONCE(...)
             spin_unlock_bh()

> +	} else {
>  		sk_msg_free(psock->sk, msg);
>  		kfree(msg);
>  	}
> @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg)
>  	kfree(msg);
>  }
>  
> +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
> +{
> +	WRITE_ONCE(psock->msg_len, psock->msg_len - len);
> +}
> +
> +static inline u32 sk_msg_queue_len(const struct sock *sk)
> +{
> +	struct sk_psock *psock;
> +	u32 len = 0;
> +
> +	rcu_read_lock();
> +	psock = sk_psock(sk);
> +	if (psock)
> +		len = READ_ONCE(psock->msg_len);
> +	rcu_read_unlock();
> +	return len;
> +}
> +
>  static inline void sk_psock_report_error(struct sk_psock *psock, int err)
>  {
>  	struct sock *sk = psock->sk;
> diff --git a/net/core/skmsg.c b/net/core/skmsg.c
> index 6c31eefbd777..f46732a8ddc2 100644
> --- a/net/core/skmsg.c
> +++ b/net/core/skmsg.c
> @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
>  	struct iov_iter *iter = &msg->msg_iter;
>  	int peek = flags & MSG_PEEK;
>  	struct sk_msg *msg_rx;
> -	int i, copied = 0;
> +	int i, copied = 0, msg_copied = 0;
>  
>  	msg_rx = sk_psock_peek_msg(psock);
>  	while (copied != len) {
> @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
>  			}
>  
>  			copied += copy;
> +			if (!msg_rx->ingress_self)
> +				msg_copied += copy;
>  			if (likely(!peek)) {
>  				sge->offset += copy;
>  				sge->length -= copy;
> @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
>  		msg_rx = sk_psock_peek_msg(psock);
>  	}
>  out:
> +	if (likely(!peek) && msg_copied)
> +		sk_msg_queue_consumed(psock, msg_copied);

Second writer,

   tcp_bpf_recvmsg_parser()
     lock_sock(sk)
     sk_msg_recvmsg()
       sk_psock_peek_msg()
         spin_lock_bh(ingress_lock); <- lock held from first writer.
         msg = list...
         spin_unlock_bh()
       sk_psock_Dequeue_msg(psock)
         spin_lock_bh(ingress_lock)
         msg = ....                     <- should call queue_consumed here
         spin_unlock_bh()

    out:
      if (likely(!peek) && msg_copied)
        sk_msg_queue_consumed(psock, msg_copied); <- here no lock?


It looks like you could move the queue_consumed up into the dequeue_msg,
but then you have some issue on partial reads I think? Basically the
IOCTL might return more bytes than are actually in the ingress queue.
Also it will look strange if the ioctl is called twice once before a read
and again after a read and the byte count doesn't change.

Maybe needs ingress queue lock wrapped around this queue consuned and
leave it where it is? Couple ideas anyways, but I don't think its
correct as is.
       
>  	return copied;
>  }
>  EXPORT_SYMBOL_GPL(sk_msg_recvmsg);
> @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb
>  
>  	if (unlikely(!msg))
>  		return -EAGAIN;
> +	msg->ingress_self = true;
>  	skb_set_owner_r(skb, sk);
>  	err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg);
>  	if (err < 0)
> @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
>  

purge doesn't use the ingress_lock because its cancelled and syncd the
backlog and proto handlers have been swapped back to original handlers
so there is no longer any way to get at the ingress queue from the socket
side either.

>  	list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
>  		list_del(&msg->list);
> +		if (!msg->ingress_self)
> +			sk_msg_queue_consumed(psock, msg->sg.size);
>  		sk_msg_free(psock->sk, msg);
>  		kfree(msg);
>  	}
> +	WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
>  }
>  
>  static void __sk_psock_zap_ingress(struct sk_psock *psock)
> -- 
> 2.38.1
>
Pengcheng Yang Dec. 8, 2023, 11:17 a.m. UTC | #2
John Fastabend <john.fastabend@gmail.com> wrote:
> 
> Pengcheng Yang wrote:
> > Currently msg is queued in ingress_msg of the target psock
> > on ingress redirect, without increment rcv_nxt. The size
> > that user can read includes the data in receive_queue and
> > ingress_msg. So we introduce sk_msg_queue_len() helper to
> > get the data length in ingress_msg.
> >
> > Note that the msg_len does not include the data length of
> > msg from recevive_queue via SK_PASS, as they increment rcv_nxt
> > when received.
> >
> > Signed-off-by: Pengcheng Yang <yangpc@wangsu.com>
> > ---
> >  include/linux/skmsg.h | 26 ++++++++++++++++++++++++--
> >  net/core/skmsg.c      | 10 +++++++++-
> >  2 files changed, 33 insertions(+), 3 deletions(-)
> >
> 
> This has two writers under different locks this looks insufficient
> to ensure correctness of the counter. Likely the consume can be
> moved into where the dequeue_msg happens? But, then its not always
> accurate which might break some applications doing buffer sizing.
> An example of this would be nginx.
> 
> > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
> > index c1637515a8a4..423a5c28c606 100644
> > --- a/include/linux/skmsg.h
> > +++ b/include/linux/skmsg.h
> > @@ -47,6 +47,7 @@ struct sk_msg {
> >  	u32				apply_bytes;
> >  	u32				cork_bytes;
> >  	u32				flags;
> > +	bool				ingress_self;
> >  	struct sk_buff			*skb;
> >  	struct sock			*sk_redir;
> >  	struct sock			*sk;
> > @@ -82,6 +83,7 @@ struct sk_psock {
> >  	u32				apply_bytes;
> >  	u32				cork_bytes;
> >  	u32				eval;
> > +	u32				msg_len;
> >  	bool				redir_ingress; /* undefined if sk_redir is null */
> >  	struct sk_msg			*cork;
> >  	struct sk_psock_progs		progs;
> > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock,
> >  				      struct sk_msg *msg)
> >  {
> >  	spin_lock_bh(&psock->ingress_lock);
> > -	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
> > +	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) {
> >  		list_add_tail(&msg->list, &psock->ingress_msg);
> > -	else {
> > +		if (!msg->ingress_self)
> > +			WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size);
> 
> First writer here can be from
> 
>   sk_psock_backlog()
>     mutex_lock(psock->work_mutex)
>     ...
>     sk_psock_handle_skb()
>       sk_psock_skb_ingress()
>         sk_psock_skb_ingress_enqueue()
>           sk_psock_queue_msg()
>              spin_lock_bh(psock->ingress_lock)
>                WRITE_ONCE(...)
>              spin_unlock_bh()
> 
> > +	} else {
> >  		sk_msg_free(psock->sk, msg);
> >  		kfree(msg);
> >  	}
> > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg)
> >  	kfree(msg);
> >  }
> >
> > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
> > +{
> > +	WRITE_ONCE(psock->msg_len, psock->msg_len - len);
> > +}
> > +
> > +static inline u32 sk_msg_queue_len(const struct sock *sk)
> > +{
> > +	struct sk_psock *psock;
> > +	u32 len = 0;
> > +
> > +	rcu_read_lock();
> > +	psock = sk_psock(sk);
> > +	if (psock)
> > +		len = READ_ONCE(psock->msg_len);
> > +	rcu_read_unlock();
> > +	return len;
> > +}
> > +
> >  static inline void sk_psock_report_error(struct sk_psock *psock, int err)
> >  {
> >  	struct sock *sk = psock->sk;
> > diff --git a/net/core/skmsg.c b/net/core/skmsg.c
> > index 6c31eefbd777..f46732a8ddc2 100644
> > --- a/net/core/skmsg.c
> > +++ b/net/core/skmsg.c
> > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  	struct iov_iter *iter = &msg->msg_iter;
> >  	int peek = flags & MSG_PEEK;
> >  	struct sk_msg *msg_rx;
> > -	int i, copied = 0;
> > +	int i, copied = 0, msg_copied = 0;
> >
> >  	msg_rx = sk_psock_peek_msg(psock);
> >  	while (copied != len) {
> > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  			}
> >
> >  			copied += copy;
> > +			if (!msg_rx->ingress_self)
> > +				msg_copied += copy;
> >  			if (likely(!peek)) {
> >  				sge->offset += copy;
> >  				sge->length -= copy;
> > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> >  		msg_rx = sk_psock_peek_msg(psock);
> >  	}
> >  out:
> > +	if (likely(!peek) && msg_copied)
> > +		sk_msg_queue_consumed(psock, msg_copied);
> 
> Second writer,
> 
>    tcp_bpf_recvmsg_parser()
>      lock_sock(sk)
>      sk_msg_recvmsg()
>        sk_psock_peek_msg()
>          spin_lock_bh(ingress_lock); <- lock held from first writer.
>          msg = list...
>          spin_unlock_bh()
>        sk_psock_Dequeue_msg(psock)
>          spin_lock_bh(ingress_lock)
>          msg = ....                     <- should call queue_consumed here
>          spin_unlock_bh()
> 
>     out:
>       if (likely(!peek) && msg_copied)
>         sk_msg_queue_consumed(psock, msg_copied); <- here no lock?
> 
> 
> It looks like you could move the queue_consumed up into the dequeue_msg,
> but then you have some issue on partial reads I think? Basically the
> IOCTL might return more bytes than are actually in the ingress queue.
> Also it will look strange if the ioctl is called twice once before a read
> and again after a read and the byte count doesn't change.
> 

Thanks john for pointing this out.
Yes, I tried to move queue_consumed into dequeue_msg without
making major changes to sk_msg_recvmsg, but failed.

> Maybe needs ingress queue lock wrapped around this queue consuned and
> leave it where it is? Couple ideas anyways, but I don't think its
> correct as is.

And, is it acceptable to just put the ingress_lock around the queue_consuned in
Sk_msg_recvmsg? Like the following:

  static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
  {
+         spin_lock_bh(&psock->ingress_lock);
          WRITE_ONCE(psock->msg_len, psock->msg_len - len);
+         spin_unlock_bh(&psock->ingress_lock);
  }

  static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
  {
          struct sk_msg *msg, *tmp;
  
          list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
                  list_del(&msg->list);
                  if (!msg->ingress_self)
~                         WRITE_ONCE(psock->msg_len, psock->msg_len - msg->sg.size);
                  sk_msg_free(psock->sk, msg);
                  kfree(msg);
          }  
          WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
  }


> 
> >  	return copied;
> >  }
> >  EXPORT_SYMBOL_GPL(sk_msg_recvmsg);
> > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb
> >
> >  	if (unlikely(!msg))
> >  		return -EAGAIN;
> > +	msg->ingress_self = true;
> >  	skb_set_owner_r(skb, sk);
> >  	err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg);
> >  	if (err < 0)
> > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
> >
> 
> purge doesn't use the ingress_lock because its cancelled and syncd the
> backlog and proto handlers have been swapped back to original handlers
> so there is no longer any way to get at the ingress queue from the socket
> side either.
> 
> >  	list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
> >  		list_del(&msg->list);
> > +		if (!msg->ingress_self)
> > +			sk_msg_queue_consumed(psock, msg->sg.size);
> >  		sk_msg_free(psock->sk, msg);
> >  		kfree(msg);
> >  	}
> > +	WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
> >  }
> >
> >  static void __sk_psock_zap_ingress(struct sk_psock *psock)
> > --
> > 2.38.1
> >
diff mbox series

Patch

diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
index c1637515a8a4..423a5c28c606 100644
--- a/include/linux/skmsg.h
+++ b/include/linux/skmsg.h
@@ -47,6 +47,7 @@  struct sk_msg {
 	u32				apply_bytes;
 	u32				cork_bytes;
 	u32				flags;
+	bool				ingress_self;
 	struct sk_buff			*skb;
 	struct sock			*sk_redir;
 	struct sock			*sk;
@@ -82,6 +83,7 @@  struct sk_psock {
 	u32				apply_bytes;
 	u32				cork_bytes;
 	u32				eval;
+	u32				msg_len;
 	bool				redir_ingress; /* undefined if sk_redir is null */
 	struct sk_msg			*cork;
 	struct sk_psock_progs		progs;
@@ -311,9 +313,11 @@  static inline void sk_psock_queue_msg(struct sk_psock *psock,
 				      struct sk_msg *msg)
 {
 	spin_lock_bh(&psock->ingress_lock);
-	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
+	if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) {
 		list_add_tail(&msg->list, &psock->ingress_msg);
-	else {
+		if (!msg->ingress_self)
+			WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size);
+	} else {
 		sk_msg_free(psock->sk, msg);
 		kfree(msg);
 	}
@@ -368,6 +372,24 @@  static inline void kfree_sk_msg(struct sk_msg *msg)
 	kfree(msg);
 }
 
+static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
+{
+	WRITE_ONCE(psock->msg_len, psock->msg_len - len);
+}
+
+static inline u32 sk_msg_queue_len(const struct sock *sk)
+{
+	struct sk_psock *psock;
+	u32 len = 0;
+
+	rcu_read_lock();
+	psock = sk_psock(sk);
+	if (psock)
+		len = READ_ONCE(psock->msg_len);
+	rcu_read_unlock();
+	return len;
+}
+
 static inline void sk_psock_report_error(struct sk_psock *psock, int err)
 {
 	struct sock *sk = psock->sk;
diff --git a/net/core/skmsg.c b/net/core/skmsg.c
index 6c31eefbd777..f46732a8ddc2 100644
--- a/net/core/skmsg.c
+++ b/net/core/skmsg.c
@@ -415,7 +415,7 @@  int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
 	struct iov_iter *iter = &msg->msg_iter;
 	int peek = flags & MSG_PEEK;
 	struct sk_msg *msg_rx;
-	int i, copied = 0;
+	int i, copied = 0, msg_copied = 0;
 
 	msg_rx = sk_psock_peek_msg(psock);
 	while (copied != len) {
@@ -441,6 +441,8 @@  int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
 			}
 
 			copied += copy;
+			if (!msg_rx->ingress_self)
+				msg_copied += copy;
 			if (likely(!peek)) {
 				sge->offset += copy;
 				sge->length -= copy;
@@ -481,6 +483,8 @@  int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
 		msg_rx = sk_psock_peek_msg(psock);
 	}
 out:
+	if (likely(!peek) && msg_copied)
+		sk_msg_queue_consumed(psock, msg_copied);
 	return copied;
 }
 EXPORT_SYMBOL_GPL(sk_msg_recvmsg);
@@ -602,6 +606,7 @@  static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb
 
 	if (unlikely(!msg))
 		return -EAGAIN;
+	msg->ingress_self = true;
 	skb_set_owner_r(skb, sk);
 	err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg);
 	if (err < 0)
@@ -771,9 +776,12 @@  static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
 
 	list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
 		list_del(&msg->list);
+		if (!msg->ingress_self)
+			sk_msg_queue_consumed(psock, msg->sg.size);
 		sk_msg_free(psock->sk, msg);
 		kfree(msg);
 	}
+	WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
 }
 
 static void __sk_psock_zap_ingress(struct sk_psock *psock)