Message ID | 1700565725-2706-2-git-send-email-yangpc@wangsu.com (mailing list archive) |
---|---|
State | Changes Requested |
Delegated to: | BPF |
Headers | show |
Series | skmsg: Add the data length in skmsg to SIOCINQ ioctl and rx_queue | expand |
Pengcheng Yang wrote: > Currently msg is queued in ingress_msg of the target psock > on ingress redirect, without increment rcv_nxt. The size > that user can read includes the data in receive_queue and > ingress_msg. So we introduce sk_msg_queue_len() helper to > get the data length in ingress_msg. > > Note that the msg_len does not include the data length of > msg from recevive_queue via SK_PASS, as they increment rcv_nxt > when received. > > Signed-off-by: Pengcheng Yang <yangpc@wangsu.com> > --- > include/linux/skmsg.h | 26 ++++++++++++++++++++++++-- > net/core/skmsg.c | 10 +++++++++- > 2 files changed, 33 insertions(+), 3 deletions(-) > This has two writers under different locks this looks insufficient to ensure correctness of the counter. Likely the consume can be moved into where the dequeue_msg happens? But, then its not always accurate which might break some applications doing buffer sizing. An example of this would be nginx. > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h > index c1637515a8a4..423a5c28c606 100644 > --- a/include/linux/skmsg.h > +++ b/include/linux/skmsg.h > @@ -47,6 +47,7 @@ struct sk_msg { > u32 apply_bytes; > u32 cork_bytes; > u32 flags; > + bool ingress_self; > struct sk_buff *skb; > struct sock *sk_redir; > struct sock *sk; > @@ -82,6 +83,7 @@ struct sk_psock { > u32 apply_bytes; > u32 cork_bytes; > u32 eval; > + u32 msg_len; > bool redir_ingress; /* undefined if sk_redir is null */ > struct sk_msg *cork; > struct sk_psock_progs progs; > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock, > struct sk_msg *msg) > { > spin_lock_bh(&psock->ingress_lock); > - if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) > + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { > list_add_tail(&msg->list, &psock->ingress_msg); > - else { > + if (!msg->ingress_self) > + WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size); First writer here can be from sk_psock_backlog() mutex_lock(psock->work_mutex) ... sk_psock_handle_skb() sk_psock_skb_ingress() sk_psock_skb_ingress_enqueue() sk_psock_queue_msg() spin_lock_bh(psock->ingress_lock) WRITE_ONCE(...) spin_unlock_bh() > + } else { > sk_msg_free(psock->sk, msg); > kfree(msg); > } > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg) > kfree(msg); > } > > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) > +{ > + WRITE_ONCE(psock->msg_len, psock->msg_len - len); > +} > + > +static inline u32 sk_msg_queue_len(const struct sock *sk) > +{ > + struct sk_psock *psock; > + u32 len = 0; > + > + rcu_read_lock(); > + psock = sk_psock(sk); > + if (psock) > + len = READ_ONCE(psock->msg_len); > + rcu_read_unlock(); > + return len; > +} > + > static inline void sk_psock_report_error(struct sk_psock *psock, int err) > { > struct sock *sk = psock->sk; > diff --git a/net/core/skmsg.c b/net/core/skmsg.c > index 6c31eefbd777..f46732a8ddc2 100644 > --- a/net/core/skmsg.c > +++ b/net/core/skmsg.c > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > struct iov_iter *iter = &msg->msg_iter; > int peek = flags & MSG_PEEK; > struct sk_msg *msg_rx; > - int i, copied = 0; > + int i, copied = 0, msg_copied = 0; > > msg_rx = sk_psock_peek_msg(psock); > while (copied != len) { > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > } > > copied += copy; > + if (!msg_rx->ingress_self) > + msg_copied += copy; > if (likely(!peek)) { > sge->offset += copy; > sge->length -= copy; > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > msg_rx = sk_psock_peek_msg(psock); > } > out: > + if (likely(!peek) && msg_copied) > + sk_msg_queue_consumed(psock, msg_copied); Second writer, tcp_bpf_recvmsg_parser() lock_sock(sk) sk_msg_recvmsg() sk_psock_peek_msg() spin_lock_bh(ingress_lock); <- lock held from first writer. msg = list... spin_unlock_bh() sk_psock_Dequeue_msg(psock) spin_lock_bh(ingress_lock) msg = .... <- should call queue_consumed here spin_unlock_bh() out: if (likely(!peek) && msg_copied) sk_msg_queue_consumed(psock, msg_copied); <- here no lock? It looks like you could move the queue_consumed up into the dequeue_msg, but then you have some issue on partial reads I think? Basically the IOCTL might return more bytes than are actually in the ingress queue. Also it will look strange if the ioctl is called twice once before a read and again after a read and the byte count doesn't change. Maybe needs ingress queue lock wrapped around this queue consuned and leave it where it is? Couple ideas anyways, but I don't think its correct as is. > return copied; > } > EXPORT_SYMBOL_GPL(sk_msg_recvmsg); > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb > > if (unlikely(!msg)) > return -EAGAIN; > + msg->ingress_self = true; > skb_set_owner_r(skb, sk); > err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg); > if (err < 0) > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) > purge doesn't use the ingress_lock because its cancelled and syncd the backlog and proto handlers have been swapped back to original handlers so there is no longer any way to get at the ingress queue from the socket side either. > list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { > list_del(&msg->list); > + if (!msg->ingress_self) > + sk_msg_queue_consumed(psock, msg->sg.size); > sk_msg_free(psock->sk, msg); > kfree(msg); > } > + WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); > } > > static void __sk_psock_zap_ingress(struct sk_psock *psock) > -- > 2.38.1 >
John Fastabend <john.fastabend@gmail.com> wrote: > > Pengcheng Yang wrote: > > Currently msg is queued in ingress_msg of the target psock > > on ingress redirect, without increment rcv_nxt. The size > > that user can read includes the data in receive_queue and > > ingress_msg. So we introduce sk_msg_queue_len() helper to > > get the data length in ingress_msg. > > > > Note that the msg_len does not include the data length of > > msg from recevive_queue via SK_PASS, as they increment rcv_nxt > > when received. > > > > Signed-off-by: Pengcheng Yang <yangpc@wangsu.com> > > --- > > include/linux/skmsg.h | 26 ++++++++++++++++++++++++-- > > net/core/skmsg.c | 10 +++++++++- > > 2 files changed, 33 insertions(+), 3 deletions(-) > > > > This has two writers under different locks this looks insufficient > to ensure correctness of the counter. Likely the consume can be > moved into where the dequeue_msg happens? But, then its not always > accurate which might break some applications doing buffer sizing. > An example of this would be nginx. > > > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h > > index c1637515a8a4..423a5c28c606 100644 > > --- a/include/linux/skmsg.h > > +++ b/include/linux/skmsg.h > > @@ -47,6 +47,7 @@ struct sk_msg { > > u32 apply_bytes; > > u32 cork_bytes; > > u32 flags; > > + bool ingress_self; > > struct sk_buff *skb; > > struct sock *sk_redir; > > struct sock *sk; > > @@ -82,6 +83,7 @@ struct sk_psock { > > u32 apply_bytes; > > u32 cork_bytes; > > u32 eval; > > + u32 msg_len; > > bool redir_ingress; /* undefined if sk_redir is null */ > > struct sk_msg *cork; > > struct sk_psock_progs progs; > > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock, > > struct sk_msg *msg) > > { > > spin_lock_bh(&psock->ingress_lock); > > - if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) > > + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { > > list_add_tail(&msg->list, &psock->ingress_msg); > > - else { > > + if (!msg->ingress_self) > > + WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size); > > First writer here can be from > > sk_psock_backlog() > mutex_lock(psock->work_mutex) > ... > sk_psock_handle_skb() > sk_psock_skb_ingress() > sk_psock_skb_ingress_enqueue() > sk_psock_queue_msg() > spin_lock_bh(psock->ingress_lock) > WRITE_ONCE(...) > spin_unlock_bh() > > > + } else { > > sk_msg_free(psock->sk, msg); > > kfree(msg); > > } > > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg) > > kfree(msg); > > } > > > > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) > > +{ > > + WRITE_ONCE(psock->msg_len, psock->msg_len - len); > > +} > > + > > +static inline u32 sk_msg_queue_len(const struct sock *sk) > > +{ > > + struct sk_psock *psock; > > + u32 len = 0; > > + > > + rcu_read_lock(); > > + psock = sk_psock(sk); > > + if (psock) > > + len = READ_ONCE(psock->msg_len); > > + rcu_read_unlock(); > > + return len; > > +} > > + > > static inline void sk_psock_report_error(struct sk_psock *psock, int err) > > { > > struct sock *sk = psock->sk; > > diff --git a/net/core/skmsg.c b/net/core/skmsg.c > > index 6c31eefbd777..f46732a8ddc2 100644 > > --- a/net/core/skmsg.c > > +++ b/net/core/skmsg.c > > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > struct iov_iter *iter = &msg->msg_iter; > > int peek = flags & MSG_PEEK; > > struct sk_msg *msg_rx; > > - int i, copied = 0; > > + int i, copied = 0, msg_copied = 0; > > > > msg_rx = sk_psock_peek_msg(psock); > > while (copied != len) { > > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > } > > > > copied += copy; > > + if (!msg_rx->ingress_self) > > + msg_copied += copy; > > if (likely(!peek)) { > > sge->offset += copy; > > sge->length -= copy; > > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, > > msg_rx = sk_psock_peek_msg(psock); > > } > > out: > > + if (likely(!peek) && msg_copied) > > + sk_msg_queue_consumed(psock, msg_copied); > > Second writer, > > tcp_bpf_recvmsg_parser() > lock_sock(sk) > sk_msg_recvmsg() > sk_psock_peek_msg() > spin_lock_bh(ingress_lock); <- lock held from first writer. > msg = list... > spin_unlock_bh() > sk_psock_Dequeue_msg(psock) > spin_lock_bh(ingress_lock) > msg = .... <- should call queue_consumed here > spin_unlock_bh() > > out: > if (likely(!peek) && msg_copied) > sk_msg_queue_consumed(psock, msg_copied); <- here no lock? > > > It looks like you could move the queue_consumed up into the dequeue_msg, > but then you have some issue on partial reads I think? Basically the > IOCTL might return more bytes than are actually in the ingress queue. > Also it will look strange if the ioctl is called twice once before a read > and again after a read and the byte count doesn't change. > Thanks john for pointing this out. Yes, I tried to move queue_consumed into dequeue_msg without making major changes to sk_msg_recvmsg, but failed. > Maybe needs ingress queue lock wrapped around this queue consuned and > leave it where it is? Couple ideas anyways, but I don't think its > correct as is. And, is it acceptable to just put the ingress_lock around the queue_consuned in Sk_msg_recvmsg? Like the following: static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) { + spin_lock_bh(&psock->ingress_lock); WRITE_ONCE(psock->msg_len, psock->msg_len - len); + spin_unlock_bh(&psock->ingress_lock); } static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) { struct sk_msg *msg, *tmp; list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { list_del(&msg->list); if (!msg->ingress_self) ~ WRITE_ONCE(psock->msg_len, psock->msg_len - msg->sg.size); sk_msg_free(psock->sk, msg); kfree(msg); } WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); } > > > return copied; > > } > > EXPORT_SYMBOL_GPL(sk_msg_recvmsg); > > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb > > > > if (unlikely(!msg)) > > return -EAGAIN; > > + msg->ingress_self = true; > > skb_set_owner_r(skb, sk); > > err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg); > > if (err < 0) > > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) > > > > purge doesn't use the ingress_lock because its cancelled and syncd the > backlog and proto handlers have been swapped back to original handlers > so there is no longer any way to get at the ingress queue from the socket > side either. > > > list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { > > list_del(&msg->list); > > + if (!msg->ingress_self) > > + sk_msg_queue_consumed(psock, msg->sg.size); > > sk_msg_free(psock->sk, msg); > > kfree(msg); > > } > > + WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); > > } > > > > static void __sk_psock_zap_ingress(struct sk_psock *psock) > > -- > > 2.38.1 > >
diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h index c1637515a8a4..423a5c28c606 100644 --- a/include/linux/skmsg.h +++ b/include/linux/skmsg.h @@ -47,6 +47,7 @@ struct sk_msg { u32 apply_bytes; u32 cork_bytes; u32 flags; + bool ingress_self; struct sk_buff *skb; struct sock *sk_redir; struct sock *sk; @@ -82,6 +83,7 @@ struct sk_psock { u32 apply_bytes; u32 cork_bytes; u32 eval; + u32 msg_len; bool redir_ingress; /* undefined if sk_redir is null */ struct sk_msg *cork; struct sk_psock_progs progs; @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock, struct sk_msg *msg) { spin_lock_bh(&psock->ingress_lock); - if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { list_add_tail(&msg->list, &psock->ingress_msg); - else { + if (!msg->ingress_self) + WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size); + } else { sk_msg_free(psock->sk, msg); kfree(msg); } @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg) kfree(msg); } +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len) +{ + WRITE_ONCE(psock->msg_len, psock->msg_len - len); +} + +static inline u32 sk_msg_queue_len(const struct sock *sk) +{ + struct sk_psock *psock; + u32 len = 0; + + rcu_read_lock(); + psock = sk_psock(sk); + if (psock) + len = READ_ONCE(psock->msg_len); + rcu_read_unlock(); + return len; +} + static inline void sk_psock_report_error(struct sk_psock *psock, int err) { struct sock *sk = psock->sk; diff --git a/net/core/skmsg.c b/net/core/skmsg.c index 6c31eefbd777..f46732a8ddc2 100644 --- a/net/core/skmsg.c +++ b/net/core/skmsg.c @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, struct iov_iter *iter = &msg->msg_iter; int peek = flags & MSG_PEEK; struct sk_msg *msg_rx; - int i, copied = 0; + int i, copied = 0, msg_copied = 0; msg_rx = sk_psock_peek_msg(psock); while (copied != len) { @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, } copied += copy; + if (!msg_rx->ingress_self) + msg_copied += copy; if (likely(!peek)) { sge->offset += copy; sge->length -= copy; @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, msg_rx = sk_psock_peek_msg(psock); } out: + if (likely(!peek) && msg_copied) + sk_msg_queue_consumed(psock, msg_copied); return copied; } EXPORT_SYMBOL_GPL(sk_msg_recvmsg); @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb if (unlikely(!msg)) return -EAGAIN; + msg->ingress_self = true; skb_set_owner_r(skb, sk); err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg); if (err < 0) @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) { list_del(&msg->list); + if (!msg->ingress_self) + sk_msg_queue_consumed(psock, msg->sg.size); sk_msg_free(psock->sk, msg); kfree(msg); } + WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0); } static void __sk_psock_zap_ingress(struct sk_psock *psock)
Currently msg is queued in ingress_msg of the target psock on ingress redirect, without increment rcv_nxt. The size that user can read includes the data in receive_queue and ingress_msg. So we introduce sk_msg_queue_len() helper to get the data length in ingress_msg. Note that the msg_len does not include the data length of msg from recevive_queue via SK_PASS, as they increment rcv_nxt when received. Signed-off-by: Pengcheng Yang <yangpc@wangsu.com> --- include/linux/skmsg.h | 26 ++++++++++++++++++++++++-- net/core/skmsg.c | 10 +++++++++- 2 files changed, 33 insertions(+), 3 deletions(-)