@@ -87,6 +87,7 @@ struct io_uring_sqe {
union {
__s32 splice_fd_in;
__u32 file_index;
+ __u32 zcrx_ifq_idx;
__u32 optlen;
struct {
__u16 addr_len;
@@ -278,6 +279,7 @@ enum io_uring_op {
IORING_OP_FTRUNCATE,
IORING_OP_BIND,
IORING_OP_LISTEN,
+ IORING_OP_RECV_ZC,
/* this goes last, obviously */
IORING_OP_LAST,
@@ -184,6 +184,16 @@ static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret
return io_get_cqe_overflow(ctx, ret, false);
}
+static inline bool io_defer_get_uncommited_cqe(struct io_ring_ctx *ctx,
+ struct io_uring_cqe **cqe_ret)
+{
+ io_lockdep_assert_cq_locked(ctx);
+
+ ctx->cq_extra++;
+ ctx->submit_state.cq_flush = true;
+ return io_get_cqe(ctx, cqe_ret);
+}
+
static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
@@ -16,6 +16,7 @@
#include "net.h"
#include "notif.h"
#include "rsrc.h"
+#include "zcrx.h"
#if defined(CONFIG_NET)
struct io_shutdown {
@@ -88,6 +89,13 @@ struct io_sr_msg {
*/
#define MULTISHOT_MAX_RETRY 32
+struct io_recvzc {
+ struct file *file;
+ unsigned msg_flags;
+ u16 flags;
+ struct io_zcrx_ifq *ifq;
+};
+
int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown);
@@ -1209,6 +1217,70 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
return ret;
}
+int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+ struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
+ unsigned ifq_idx;
+
+ if (unlikely(sqe->file_index || sqe->addr2 || sqe->addr ||
+ sqe->len || sqe->addr3))
+ return -EINVAL;
+
+ ifq_idx = READ_ONCE(sqe->zcrx_ifq_idx);
+ if (ifq_idx != 0)
+ return -EINVAL;
+ zc->ifq = req->ctx->ifq;
+ if (!zc->ifq)
+ return -EINVAL;
+
+ zc->flags = READ_ONCE(sqe->ioprio);
+ zc->msg_flags = READ_ONCE(sqe->msg_flags);
+ if (zc->msg_flags)
+ return -EINVAL;
+ if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT))
+ return -EINVAL;
+ /* multishot required */
+ if (!(zc->flags & IORING_RECV_MULTISHOT))
+ return -EINVAL;
+ /* All data completions are posted as aux CQEs. */
+ req->flags |= REQ_F_APOLL_MULTISHOT;
+
+ return 0;
+}
+
+int io_recvzc(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
+ struct socket *sock;
+ int ret;
+
+ if (!(req->flags & REQ_F_POLLED) &&
+ (zc->flags & IORING_RECVSEND_POLL_FIRST))
+ return -EAGAIN;
+
+ sock = sock_from_file(req->file);
+ if (unlikely(!sock))
+ return -ENOTSOCK;
+
+ ret = io_zcrx_recv(req, zc->ifq, sock, zc->msg_flags | MSG_DONTWAIT,
+ issue_flags);
+ if (unlikely(ret <= 0) && ret != -EAGAIN) {
+ if (ret == -ERESTARTSYS)
+ ret = -EINTR;
+
+ req_set_fail(req);
+ io_req_set_res(req, ret, 0);
+
+ if (issue_flags & IO_URING_F_MULTISHOT)
+ return IOU_STOP_MULTISHOT;
+ return IOU_OK;
+ }
+
+ if (issue_flags & IO_URING_F_MULTISHOT)
+ return IOU_ISSUE_SKIP_COMPLETE;
+ return -EAGAIN;
+}
+
void io_send_zc_cleanup(struct io_kiocb *req)
{
struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -36,6 +36,7 @@
#include "waitid.h"
#include "futex.h"
#include "truncate.h"
+#include "zcrx.h"
static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
{
@@ -513,6 +514,18 @@ const struct io_issue_def io_issue_defs[] = {
.async_size = sizeof(struct io_async_msghdr),
#else
.prep = io_eopnotsupp_prep,
+#endif
+ },
+ [IORING_OP_RECV_ZC] = {
+ .needs_file = 1,
+ .unbound_nonreg_file = 1,
+ .pollin = 1,
+ .ioprio = 1,
+#if defined(CONFIG_NET)
+ .prep = io_recvzc_prep,
+ .issue = io_recvzc,
+#else
+ .prep = io_eopnotsupp_prep,
#endif
},
};
@@ -744,6 +757,9 @@ const struct io_cold_def io_cold_defs[] = {
[IORING_OP_LISTEN] = {
.name = "LISTEN",
},
+ [IORING_OP_RECV_ZC] = {
+ .name = "RECV_ZC",
+ },
};
const char *io_uring_get_opcode(u8 opcode)
@@ -13,6 +13,8 @@
#include <net/netlink.h>
#include <trace/events/page_pool.h>
+#include <net/tcp.h>
+#include <net/rps.h>
#include <uapi/linux/io_uring.h>
@@ -100,7 +102,12 @@ static void io_zcrx_sync_for_device(const struct page_pool *pool,
#define IO_RQ_MAX_ENTRIES 32768
-__maybe_unused
+struct io_zcrx_args {
+ struct io_kiocb *req;
+ struct io_zcrx_ifq *ifq;
+ struct socket *sock;
+};
+
static const struct memory_provider_ops io_uring_pp_zc_ops;
static inline struct io_zcrx_area *io_zcrx_iov_to_area(const struct net_iov *niov)
@@ -127,6 +134,11 @@ static bool io_zcrx_put_niov_uref(struct net_iov *niov)
return true;
}
+static void io_zcrx_get_niov_uref(struct net_iov *niov)
+{
+ atomic_inc(io_get_user_counter(niov));
+}
+
static int io_allocate_rbuf_ring(struct io_zcrx_ifq *ifq,
struct io_uring_zcrx_ifq_reg *reg,
struct io_uring_region_desc *rd)
@@ -594,3 +606,179 @@ static const struct memory_provider_ops io_uring_pp_zc_ops = {
.destroy = io_pp_zc_destroy,
.nl_fill = io_pp_nl_fill,
};
+
+static bool io_zcrx_queue_cqe(struct io_kiocb *req, struct net_iov *niov,
+ struct io_zcrx_ifq *ifq, int off, int len)
+{
+ struct io_uring_zcrx_cqe *rcqe;
+ struct io_zcrx_area *area;
+ struct io_uring_cqe *cqe;
+ u64 offset;
+
+ if (!io_defer_get_uncommited_cqe(req->ctx, &cqe))
+ return false;
+
+ cqe->user_data = req->cqe.user_data;
+ cqe->res = len;
+ cqe->flags = IORING_CQE_F_MORE;
+
+ area = io_zcrx_iov_to_area(niov);
+ offset = off + (net_iov_idx(niov) << PAGE_SHIFT);
+ rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1);
+ rcqe->off = offset + ((u64)area->area_id << IORING_ZCRX_AREA_SHIFT);
+ rcqe->__pad = 0;
+ return true;
+}
+
+static int io_zcrx_recv_frag(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
+ const skb_frag_t *frag, int off, int len)
+{
+ struct net_iov *niov;
+
+ if (unlikely(!skb_frag_is_net_iov(frag)))
+ return -EOPNOTSUPP;
+
+ niov = netmem_to_net_iov(frag->netmem);
+ if (niov->pp->mp_ops != &io_uring_pp_zc_ops ||
+ niov->pp->mp_priv != ifq)
+ return -EFAULT;
+
+ if (!io_zcrx_queue_cqe(req, niov, ifq, off + skb_frag_off(frag), len))
+ return -ENOSPC;
+
+ /*
+ * Prevent it from being recycled while user is accessing it.
+ * It has to be done before grabbing a user reference.
+ */
+ page_pool_ref_netmem(net_iov_to_netmem(niov));
+ io_zcrx_get_niov_uref(niov);
+ return len;
+}
+
+static int
+io_zcrx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb,
+ unsigned int offset, size_t len)
+{
+ struct io_zcrx_args *args = desc->arg.data;
+ struct io_zcrx_ifq *ifq = args->ifq;
+ struct io_kiocb *req = args->req;
+ struct sk_buff *frag_iter;
+ unsigned start, start_off;
+ int i, copy, end, off;
+ int ret = 0;
+
+ start = skb_headlen(skb);
+ start_off = offset;
+
+ if (offset < start)
+ return -EOPNOTSUPP;
+
+ for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
+ const skb_frag_t *frag;
+
+ if (WARN_ON(start > offset + len))
+ return -EFAULT;
+
+ frag = &skb_shinfo(skb)->frags[i];
+ end = start + skb_frag_size(frag);
+
+ if (offset < end) {
+ copy = end - offset;
+ if (copy > len)
+ copy = len;
+
+ off = offset - start;
+ ret = io_zcrx_recv_frag(req, ifq, frag, off, copy);
+ if (ret < 0)
+ goto out;
+
+ offset += ret;
+ len -= ret;
+ if (len == 0 || ret != copy)
+ goto out;
+ }
+ start = end;
+ }
+
+ skb_walk_frags(skb, frag_iter) {
+ if (WARN_ON(start > offset + len))
+ return -EFAULT;
+
+ end = start + frag_iter->len;
+ if (offset < end) {
+ copy = end - offset;
+ if (copy > len)
+ copy = len;
+
+ off = offset - start;
+ ret = io_zcrx_recv_skb(desc, frag_iter, off, copy);
+ if (ret < 0)
+ goto out;
+
+ offset += ret;
+ len -= ret;
+ if (len == 0 || ret != copy)
+ goto out;
+ }
+ start = end;
+ }
+
+out:
+ if (offset == start_off)
+ return ret;
+ return offset - start_off;
+}
+
+static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
+ struct sock *sk, int flags,
+ unsigned issue_flags)
+{
+ struct io_zcrx_args args = {
+ .req = req,
+ .ifq = ifq,
+ .sock = sk->sk_socket,
+ };
+ read_descriptor_t rd_desc = {
+ .count = 1,
+ .arg.data = &args,
+ };
+ int ret;
+
+ lock_sock(sk);
+ ret = tcp_read_sock(sk, &rd_desc, io_zcrx_recv_skb);
+ if (ret <= 0) {
+ if (ret < 0 || sock_flag(sk, SOCK_DONE))
+ goto out;
+ if (sk->sk_err)
+ ret = sock_error(sk);
+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
+ goto out;
+ else if (sk->sk_state == TCP_CLOSE)
+ ret = -ENOTCONN;
+ else
+ ret = -EAGAIN;
+ } else if (sock_flag(sk, SOCK_DONE)) {
+ /* Make it to retry until it finally gets 0. */
+ if (issue_flags & IO_URING_F_MULTISHOT)
+ ret = IOU_REQUEUE;
+ else
+ ret = -EAGAIN;
+ }
+out:
+ release_sock(sk);
+ return ret;
+}
+
+int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
+ struct socket *sock, unsigned int flags,
+ unsigned issue_flags)
+{
+ struct sock *sk = sock->sk;
+ const struct proto *prot = READ_ONCE(sk->sk_prot);
+
+ if (prot->recvmsg != tcp_recvmsg)
+ return -EPROTONOSUPPORT;
+
+ sock_rps_record_flow(sk);
+ return io_zcrx_tcp_recvmsg(req, ifq, sk, flags, issue_flags);
+}
@@ -3,6 +3,7 @@
#define IOU_ZC_RX_H
#include <linux/io_uring_types.h>
+#include <linux/socket.h>
#include <net/page_pool/types.h>
#include <net/net_trackers.h>
@@ -41,6 +42,9 @@ int io_register_zcrx_ifq(struct io_ring_ctx *ctx,
struct io_uring_zcrx_ifq_reg __user *arg);
void io_unregister_zcrx_ifqs(struct io_ring_ctx *ctx);
void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx);
+int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
+ struct socket *sock, unsigned int flags,
+ unsigned issue_flags);
#else
static inline int io_register_zcrx_ifq(struct io_ring_ctx *ctx,
struct io_uring_zcrx_ifq_reg __user *arg)
@@ -53,6 +57,15 @@ static inline void io_unregister_zcrx_ifqs(struct io_ring_ctx *ctx)
static inline void io_shutdown_zcrx_ifqs(struct io_ring_ctx *ctx)
{
}
+static inline int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
+ struct socket *sock, unsigned int flags,
+ unsigned issue_flags)
+{
+ return -EOPNOTSUPP;
+}
#endif
+int io_recvzc(struct io_kiocb *req, unsigned int issue_flags);
+int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+
#endif