@@ -172,6 +172,41 @@ static int tcp_msg_wait_data(struct sock *sk, struct sk_psock *psock,
return ret;
}
+static int tcp_bpf_recvmsg_parser(struct sock *sk,
+ struct msghdr *msg,
+ size_t len,
+ int nonblock,
+ int flags,
+ int *addr_len)
+{
+ struct sk_psock *psock;
+ int copied;
+
+ if (unlikely(flags & MSG_ERRQUEUE))
+ return inet_recv_error(sk, msg, len, addr_len);
+
+ psock = sk_psock_get(sk);
+ if (unlikely(!psock))
+ return tcp_recvmsg(sk, msg, len, nonblock, flags, addr_len);
+
+ lock_sock(sk);
+msg_bytes_ready:
+ copied = sk_msg_recvmsg(sk, psock, msg, len, flags);
+ if (!copied) {
+ long timeo;
+ int data;
+
+ timeo = sock_rcvtimeo(sk, nonblock);
+ data = tcp_msg_wait_data(sk, psock, timeo);
+ if (data && !sk_psock_queue_empty(psock))
+ goto msg_bytes_ready;
+ copied = -EAGAIN;
+ }
+ release_sock(sk);
+ sk_psock_put(sk, psock);
+ return copied;
+}
+
static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
@@ -464,6 +499,8 @@ enum {
enum {
TCP_BPF_BASE,
TCP_BPF_TX,
+ TCP_BPF_RX,
+ TCP_BPF_TXRX,
TCP_BPF_NUM_CFGS,
};
@@ -482,6 +519,12 @@ static void tcp_bpf_rebuild_protos(struct proto prot[TCP_BPF_NUM_CFGS],
prot[TCP_BPF_TX] = prot[TCP_BPF_BASE];
prot[TCP_BPF_TX].sendmsg = tcp_bpf_sendmsg;
prot[TCP_BPF_TX].sendpage = tcp_bpf_sendpage;
+
+ prot[TCP_BPF_RX] = prot[TCP_BPF_BASE];
+ prot[TCP_BPF_RX].recvmsg = tcp_bpf_recvmsg_parser;
+
+ prot[TCP_BPF_TXRX] = prot[TCP_BPF_TX];
+ prot[TCP_BPF_TXRX].recvmsg = tcp_bpf_recvmsg_parser;
}
static void tcp_bpf_check_v6_needs_rebuild(struct proto *ops)
@@ -519,6 +562,10 @@ int tcp_bpf_update_proto(struct sock *sk, struct sk_psock *psock, bool restore)
int family = sk->sk_family == AF_INET6 ? TCP_BPF_IPV6 : TCP_BPF_IPV4;
int config = psock->progs.msg_parser ? TCP_BPF_TX : TCP_BPF_BASE;
+ if (psock->progs.stream_verdict || psock->progs.skb_verdict) {
+ config = (config == TCP_BPF_TX) ? TCP_BPF_TXRX : TCP_BPF_RX;
+ }
+
if (restore) {
if (inet_csk_has_ulp(sk)) {
/* TLS does not have an unhash proto in SW cases,