@@ -10,6 +10,9 @@
#include "i40e_txrx_common.h"
#include "i40e_xsk.h"
+#define I40E_DESCS_PER_BATCH 64
+#define I40E_XSK_BATCH_MASK ~(I40E_DESCS_PER_BATCH - 1)
+
int i40e_alloc_rx_bi_zc(struct i40e_ring *rx_ring)
{
unsigned long sz = sizeof(*rx_ring->rx_bi_zc) * rx_ring->count;
@@ -139,26 +142,12 @@ int i40e_xsk_pool_setup(struct i40e_vsi *vsi, struct xsk_buff_pool *pool,
i40e_xsk_pool_disable(vsi, qid);
}
-/**
- * i40e_run_xdp_zc - Executes an XDP program on an xdp_buff
- * @rx_ring: Rx ring
- * @xdp: xdp_buff used as input to the XDP program
- *
- * Returns any of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
- **/
-static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp)
+static int i40e_handle_xdp_action(struct i40e_ring *rx_ring, struct xdp_buff *xdp,
+ struct bpf_prog *xdp_prog, u32 act)
{
int err, result = I40E_XDP_PASS;
struct i40e_ring *xdp_ring;
- struct bpf_prog *xdp_prog;
struct xdp_sock *xs;
- u32 act;
-
- /* NB! xdp_prog will always be !NULL, due to the fact that
- * this path is enabled by setting an XDP program.
- */
- xdp_prog = READ_ONCE(rx_ring->xdp_prog);
- act = bpf_prog_run_xdp(xdp_prog, xdp);
if (likely(act == XDP_REDIRECT_XSK)) {
xs = xsk_get_redirect_xsk(&rx_ring->netdev->_rx[xdp->rxq->queue_index]);
@@ -197,6 +186,21 @@ static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp)
return result;
}
+/**
+ * i40e_run_xdp_zc - Executes an XDP program on an xdp_buff
+ * @rx_ring: Rx ring
+ * @xdp: xdp_buff used as input to the XDP program
+ *
+ * Returns any of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
+ **/
+static int i40e_run_xdp_zc(struct i40e_ring *rx_ring, struct xdp_buff *xdp,
+ struct bpf_prog *xdp_prog)
+{
+ u32 act = bpf_prog_run_xdp(xdp_prog, xdp);
+
+ return i40e_handle_xdp_action(rx_ring, xdp, xdp_prog, act);
+}
+
bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
{
u16 ntu = rx_ring->next_to_use;
@@ -218,6 +222,7 @@ bool i40e_alloc_rx_buffers_zc(struct i40e_ring *rx_ring, u16 count)
dma = xsk_buff_xdp_get_dma(*xdp);
rx_desc->read.pkt_addr = cpu_to_le64(dma);
rx_desc->read.hdr_addr = 0;
+ rx_desc->wb.qword1.status_error_len = 0;
rx_desc++;
xdp++;
@@ -324,6 +329,7 @@ static void i40e_handle_xdp_result_zc(struct i40e_ring *rx_ring,
}
static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
+ struct bpf_prog *xdp_prog,
unsigned int *stat_rx_packets,
unsigned int *stat_rx_bytes,
unsigned int *xmit,
@@ -370,7 +376,7 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
xsk_buff_set_size(bi, size);
xsk_buff_dma_sync_for_cpu(bi, rx_ring->xsk_pool);
- xdp_res = i40e_run_xdp_zc(rx_ring, bi);
+ xdp_res = i40e_run_xdp_zc(rx_ring, bi, xdp_prog);
i40e_handle_xdp_result_zc(rx_ring, bi, rx_desc, &rx_packets,
&rx_bytes, size, xdp_res);
total_rx_packets += rx_packets;
@@ -385,6 +391,172 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
*xmit = xdp_xmit;
}
+/**
+ * i40_rx_ring_lookahead - check for new descriptors in the rx ring
+ * @rx_ring: Rx ring
+ * @budget: NAPI budget
+ *
+ * Returns the number of available descriptors in contiguous memory ie.
+ * without a ring wrap.
+ *
+ **/
+static inline unsigned int i40_rx_ring_lookahead(struct i40e_ring *rx_ring,
+ unsigned int budget)
+{
+ u32 used = (rx_ring->next_to_clean - rx_ring->next_to_use - 1) & (rx_ring->count - 1);
+ union i40e_rx_desc *rx_desc0 = (union i40e_rx_desc *)rx_ring->desc, *rx_desc;
+ u32 next_to_clean = rx_ring->next_to_clean;
+ u32 potential = rx_ring->count - used;
+ u16 count_mask = rx_ring->count - 1;
+ unsigned int size;
+ u64 qword;
+
+ budget &= I40E_XSK_BATCH_MASK;
+
+ while (budget) {
+ if (budget > potential)
+ goto next;
+ rx_desc = rx_desc0 + ((next_to_clean + budget - 1) & count_mask);
+ qword = le64_to_cpu(rx_desc->wb.qword1.status_error_len);
+ dma_rmb();
+
+ size = (qword & I40E_RXD_QW1_LENGTH_PBUF_MASK) >>
+ I40E_RXD_QW1_LENGTH_PBUF_SHIFT;
+ if (size && ((next_to_clean + budget) <= count_mask))
+ return budget;
+
+next:
+ budget >>= 1;
+ budget &= I40E_XSK_BATCH_MASK;
+ }
+
+ return 0;
+}
+
+/**
+ * i40e_run_xdp_zc_batch - Executes an XDP program on an array of xdp_buffs
+ * @rx_ring: Rx ring
+ * @bufs: array of xdp_buffs used as input to the XDP program
+ * @res: array of ints with result for each buf if an error occurs or slow path taken.
+ *
+ * Returns zero if all xdp_buffs successfully took the fast path (XDP_REDIRECT_XSK).
+ * Otherwise returns -1 and sets individual results for each buf in the array *res.
+ * Individual results are one of I40E_XDP_{PASS, CONSUMED, TX, REDIR, REDIR_XSK}
+ **/
+static int i40e_run_xdp_zc_batch(struct i40e_ring *rx_ring, struct xdp_buff **bufs,
+ struct bpf_prog *xdp_prog, int *res)
+{
+ u32 last_act = XDP_REDIRECT_XSK;
+ int runs = 0, ret = 0, err, i;
+
+ while ((runs < I40E_DESCS_PER_BATCH) && (last_act == XDP_REDIRECT_XSK))
+ last_act = bpf_prog_run_xdp(xdp_prog, *(bufs + runs++));
+
+ if (likely(runs == I40E_DESCS_PER_BATCH)) {
+ struct xdp_sock *xs =
+ xsk_get_redirect_xsk(&rx_ring->netdev->_rx[(*bufs)->rxq->queue_index]);
+
+ err = xsk_rcv_batch(xs, bufs, I40E_DESCS_PER_BATCH);
+ if (unlikely(err)) {
+ ret = -1;
+ for (i = 0; i < I40E_DESCS_PER_BATCH; i++)
+ *(res + i) = I40E_XDP_PASS;
+ }
+ } else {
+ /* Handle the result of each program run individually */
+ u32 act;
+
+ ret = -1;
+ for (i = 0; i < I40E_DESCS_PER_BATCH; i++) {
+ struct xdp_buff *xdp = *(bufs + i);
+
+ /* The result of the first runs-2 programs was XDP_REDIRECT_XSK.
+ * The result of the subsequent program run was last_act.
+ * Any remaining bufs have not yet had the program executed, so
+ * execute it now.
+ */
+
+ if (i < runs - 2)
+ act = XDP_REDIRECT_XSK;
+ else if (i == runs - 1)
+ act = last_act;
+ else
+ act = bpf_prog_run_xdp(xdp_prog, xdp);
+
+ *(res + i) = i40e_handle_xdp_action(rx_ring, xdp, xdp_prog, act);
+ }
+ }
+
+ return ret;
+}
+
+static inline void i40e_clean_rx_desc_zc_batch(struct i40e_ring *rx_ring,
+ struct bpf_prog *xdp_prog,
+ unsigned int *total_rx_packets,
+ unsigned int *total_rx_bytes,
+ unsigned int *xdp_xmit)
+{
+ u16 next_to_clean = rx_ring->next_to_clean;
+ unsigned int xdp_res[I40E_DESCS_PER_BATCH];
+ unsigned int size[I40E_DESCS_PER_BATCH];
+ unsigned int rx_packets, rx_bytes = 0;
+ union i40e_rx_desc *rx_desc;
+ struct xdp_buff **bufs;
+ int j, ret;
+ u64 qword;
+
+ rx_desc = I40E_RX_DESC(rx_ring, next_to_clean);
+
+ prefetch(rx_desc + I40E_DESCS_PER_BATCH);
+
+ for (j = 0; j < I40E_DESCS_PER_BATCH; j++) {
+ qword = le64_to_cpu((rx_desc + j)->wb.qword1.status_error_len);
+ size[j] = (qword & I40E_RXD_QW1_LENGTH_PBUF_MASK) >>
+ I40E_RXD_QW1_LENGTH_PBUF_SHIFT;
+ }
+
+ /* This memory barrier is needed to keep us from reading
+ * any other fields out of the rx_descs until we have
+ * verified the descriptors have been written back.
+ */
+ dma_rmb();
+
+ bufs = i40e_rx_bi(rx_ring, next_to_clean);
+
+ for (j = 0; j < I40E_DESCS_PER_BATCH; j++)
+ xsk_buff_set_size(*(bufs + j), size[j]);
+
+ xsk_buff_dma_sync_for_cpu_batch(bufs, rx_ring->xsk_pool, I40E_DESCS_PER_BATCH);
+
+ ret = i40e_run_xdp_zc_batch(rx_ring, bufs, xdp_prog, xdp_res);
+
+ if (unlikely(ret)) {
+ unsigned int err_rx_packets = 0, err_rx_bytes = 0;
+
+ rx_packets = 0;
+ rx_bytes = 0;
+
+ for (j = 0; j < I40E_DESCS_PER_BATCH; j++) {
+ i40e_handle_xdp_result_zc(rx_ring, *(bufs + j), rx_desc + j,
+ &err_rx_packets, &err_rx_bytes, size[j],
+ xdp_res[j]);
+ *xdp_xmit |= (xdp_res[j] & (I40E_XDP_TX | I40E_XDP_REDIR |
+ I40E_XDP_REDIR_XSK));
+ rx_packets += err_rx_packets;
+ rx_bytes += err_rx_bytes;
+ }
+ } else {
+ rx_packets = I40E_DESCS_PER_BATCH;
+ for (j = 0; j < I40E_DESCS_PER_BATCH; j++)
+ rx_bytes += size[j];
+ *xdp_xmit |= I40E_XDP_REDIR_XSK;
+ }
+
+ rx_ring->next_to_clean += I40E_DESCS_PER_BATCH;
+ *total_rx_packets += rx_packets;
+ *total_rx_bytes += rx_bytes;
+}
+
/**
* i40e_clean_rx_irq_zc - Consumes Rx packets from the hardware ring
* @rx_ring: Rx ring
@@ -394,17 +566,26 @@ static inline void i40e_clean_rx_desc_zc(struct i40e_ring *rx_ring,
**/
int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
{
+ int batch_budget = i40_rx_ring_lookahead(rx_ring, (unsigned int)budget);
+ struct bpf_prog *xdp_prog = READ_ONCE(rx_ring->xdp_prog);
unsigned int total_rx_bytes = 0, total_rx_packets = 0;
u16 count_mask = rx_ring->count - 1;
unsigned int xdp_xmit = 0;
bool failure = false;
u16 cleaned_count;
+ int i;
+
+ for (i = 0; i < batch_budget; i += I40E_DESCS_PER_BATCH)
+ i40e_clean_rx_desc_zc_batch(rx_ring, xdp_prog,
+ &total_rx_packets,
+ &total_rx_bytes,
+ &xdp_xmit);
- i40e_clean_rx_desc_zc(rx_ring,
+ i40e_clean_rx_desc_zc(rx_ring, xdp_prog,
&total_rx_packets,
&total_rx_bytes,
&xdp_xmit,
- budget);
+ (unsigned int)budget - total_rx_packets);
cleaned_count = (rx_ring->next_to_clean - rx_ring->next_to_use - 1) & count_mask;