diff mbox series

[for-next,v6,07/10] RDMA/rxe: Implement flush execution in responder side

Message ID 20221116081951.32750-8-lizhijian@fujitsu.com (mailing list archive)
State Superseded
Headers show
Series RDMA/rxe: Add RDMA FLUSH operation | expand

Commit Message

Zhijian Li (Fujitsu) Nov. 16, 2022, 8:19 a.m. UTC
Only the requested placement types that also registered in the destination
memory region are acceptable.
Otherwise, responder will also reply NAK "Remote Access Error" if it
found a placement type violation.

We will persist data via arch_wb_cache_pmem(), which could be
architecture specific.

This commit also add 2 helpers to update qp.resp from the incoming packet.

Reviewed-by: Zhu Yanjun <zyjzyj2000@gmail.com>
Signed-off-by: Li Zhijian <lizhijian@fujitsu.com>
---
v6: call iova_to_vaddr to transform iova
v5: add QP attr check for flush access
    rename flush_nvdimm_iova -> rxe_flush_pmem_iova()
v4: add send_read_response_ack and flush resource
---
 drivers/infiniband/sw/rxe/rxe_loc.h   |   1 +
 drivers/infiniband/sw/rxe/rxe_mr.c    |  36 ++++++
 drivers/infiniband/sw/rxe/rxe_resp.c  | 176 +++++++++++++++++++++++---
 drivers/infiniband/sw/rxe/rxe_verbs.h |   6 +
 4 files changed, 199 insertions(+), 20 deletions(-)
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe_loc.h b/drivers/infiniband/sw/rxe/rxe_loc.h
index c2a5c8814a48..944d564a11cd 100644
--- a/drivers/infiniband/sw/rxe/rxe_loc.h
+++ b/drivers/infiniband/sw/rxe/rxe_loc.h
@@ -68,6 +68,7 @@  void rxe_mr_init_dma(int access, struct rxe_mr *mr);
 int rxe_mr_init_user(struct rxe_dev *rxe, u64 start, u64 length, u64 iova,
 		     int access, struct rxe_mr *mr);
 int rxe_mr_init_fast(int max_pages, struct rxe_mr *mr);
+int rxe_flush_pmem_iova(struct rxe_mr *mr, u64 iova, int length);
 int rxe_mr_copy(struct rxe_mr *mr, u64 iova, void *addr, int length,
 		enum rxe_mr_copy_dir dir);
 int copy_data(struct rxe_pd *pd, int access, struct rxe_dma_info *dma,
diff --git a/drivers/infiniband/sw/rxe/rxe_mr.c b/drivers/infiniband/sw/rxe/rxe_mr.c
index fd423c015be0..592965ee89fa 100644
--- a/drivers/infiniband/sw/rxe/rxe_mr.c
+++ b/drivers/infiniband/sw/rxe/rxe_mr.c
@@ -4,6 +4,8 @@ 
  * Copyright (c) 2015 System Fabric Works, Inc. All rights reserved.
  */
 
+#include <linux/libnvdimm.h>
+
 #include "rxe.h"
 #include "rxe_loc.h"
 
@@ -196,6 +198,7 @@  int rxe_mr_init_user(struct rxe_dev *rxe, u64 start, u64 length, u64 iova,
 	mr->offset = ib_umem_offset(umem);
 	mr->state = RXE_MR_STATE_VALID;
 	mr->ibmr.type = IB_MR_TYPE_USER;
+	mr->ibmr.page_size = PAGE_SIZE;
 
 	return 0;
 
@@ -303,6 +306,39 @@  void *iova_to_vaddr(struct rxe_mr *mr, u64 iova, int length)
 	return addr;
 }
 
+int rxe_flush_pmem_iova(struct rxe_mr *mr, u64 iova, int length)
+{
+	size_t offset;
+
+	if (length == 0)
+		return 0;
+
+	if (mr->ibmr.type == IB_MR_TYPE_DMA)
+		return -EFAULT;
+
+	offset = (iova - mr->ibmr.iova + mr->offset) & mr->page_mask;
+	while (length > 0) {
+		u8 *va;
+		int bytes;
+
+		bytes = mr->ibmr.page_size - offset;
+		if (bytes > length)
+			bytes = length;
+
+		va = iova_to_vaddr(mr, iova, length);
+		if (!va)
+			return -EFAULT;
+
+		arch_wb_cache_pmem(va, bytes);
+
+		length -= bytes;
+		iova += bytes;
+		offset = 0;
+	}
+
+	return 0;
+}
+
 /* copy data from a range (vaddr, vaddr+length-1) to or from
  * a mr object starting at iova.
  */
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 8caa9941e70e..43cf3fb04674 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -22,6 +22,7 @@  enum resp_states {
 	RESPST_EXECUTE,
 	RESPST_READ_REPLY,
 	RESPST_ATOMIC_REPLY,
+	RESPST_PROCESS_FLUSH,
 	RESPST_COMPLETE,
 	RESPST_ACKNOWLEDGE,
 	RESPST_CLEANUP,
@@ -57,6 +58,7 @@  static char *resp_state_name[] = {
 	[RESPST_EXECUTE]			= "EXECUTE",
 	[RESPST_READ_REPLY]			= "READ_REPLY",
 	[RESPST_ATOMIC_REPLY]			= "ATOMIC_REPLY",
+	[RESPST_PROCESS_FLUSH]			= "PROCESS_FLUSH",
 	[RESPST_COMPLETE]			= "COMPLETE",
 	[RESPST_ACKNOWLEDGE]			= "ACKNOWLEDGE",
 	[RESPST_CLEANUP]			= "CLEANUP",
@@ -256,19 +258,38 @@  static enum resp_states check_op_seq(struct rxe_qp *qp,
 	}
 }
 
+static bool check_qp_attr_access(struct rxe_qp *qp,
+				 struct rxe_pkt_info *pkt)
+{
+	if (((pkt->mask & RXE_READ_MASK) &&
+	     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_READ)) ||
+	    ((pkt->mask & RXE_WRITE_MASK) &&
+	     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_WRITE)) ||
+	    ((pkt->mask & RXE_ATOMIC_MASK) &&
+	     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_ATOMIC))) {
+		return false;
+	}
+
+	if (pkt->mask & RXE_FLUSH_MASK) {
+		u32 flush_type = feth_plt(pkt);
+
+		if ((flush_type & IB_FLUSH_GLOBAL &&
+		     !(qp->attr.qp_access_flags & IB_ACCESS_FLUSH_GLOBAL)) ||
+		    (flush_type & IB_FLUSH_PERSISTENT &&
+		     !(qp->attr.qp_access_flags & IB_ACCESS_FLUSH_PERSISTENT)))
+			return false;
+	}
+
+	return true;
+}
+
 static enum resp_states check_op_valid(struct rxe_qp *qp,
 				       struct rxe_pkt_info *pkt)
 {
 	switch (qp_type(qp)) {
 	case IB_QPT_RC:
-		if (((pkt->mask & RXE_READ_MASK) &&
-		     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_READ)) ||
-		    ((pkt->mask & RXE_WRITE_MASK) &&
-		     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_WRITE)) ||
-		    ((pkt->mask & RXE_ATOMIC_MASK) &&
-		     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_ATOMIC))) {
+		if (!check_qp_attr_access(qp, pkt))
 			return RESPST_ERR_UNSUPPORTED_OPCODE;
-		}
 
 		break;
 
@@ -425,6 +446,23 @@  static enum resp_states check_length(struct rxe_qp *qp,
 	return RESPST_CHK_RKEY;
 }
 
+static void qp_resp_from_reth(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
+{
+	qp->resp.va = reth_va(pkt);
+	qp->resp.offset = 0;
+	qp->resp.rkey = reth_rkey(pkt);
+	qp->resp.resid = reth_len(pkt);
+	qp->resp.length = reth_len(pkt);
+}
+
+static void qp_resp_from_atmeth(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
+{
+	qp->resp.va = atmeth_va(pkt);
+	qp->resp.offset = 0;
+	qp->resp.rkey = atmeth_rkey(pkt);
+	qp->resp.resid = sizeof(u64);
+}
+
 static enum resp_states check_rkey(struct rxe_qp *qp,
 				   struct rxe_pkt_info *pkt)
 {
@@ -436,23 +474,26 @@  static enum resp_states check_rkey(struct rxe_qp *qp,
 	u32 pktlen;
 	int mtu = qp->mtu;
 	enum resp_states state;
-	int access;
+	int access = 0;
 
 	if (pkt->mask & RXE_READ_OR_WRITE_MASK) {
-		if (pkt->mask & RXE_RETH_MASK) {
-			qp->resp.va = reth_va(pkt);
-			qp->resp.offset = 0;
-			qp->resp.rkey = reth_rkey(pkt);
-			qp->resp.resid = reth_len(pkt);
-			qp->resp.length = reth_len(pkt);
-		}
+		if (pkt->mask & RXE_RETH_MASK)
+			qp_resp_from_reth(qp, pkt);
+
 		access = (pkt->mask & RXE_READ_MASK) ? IB_ACCESS_REMOTE_READ
 						     : IB_ACCESS_REMOTE_WRITE;
+	} else if (pkt->mask & RXE_FLUSH_MASK) {
+		u32 flush_type = feth_plt(pkt);
+
+		if (pkt->mask & RXE_RETH_MASK)
+			qp_resp_from_reth(qp, pkt);
+
+		if (flush_type & IB_FLUSH_GLOBAL)
+			access |= IB_ACCESS_FLUSH_GLOBAL;
+		if (flush_type & IB_FLUSH_PERSISTENT)
+			access |= IB_ACCESS_FLUSH_PERSISTENT;
 	} else if (pkt->mask & RXE_ATOMIC_MASK) {
-		qp->resp.va = atmeth_va(pkt);
-		qp->resp.offset = 0;
-		qp->resp.rkey = atmeth_rkey(pkt);
-		qp->resp.resid = sizeof(u64);
+		qp_resp_from_atmeth(qp, pkt);
 		access = IB_ACCESS_REMOTE_ATOMIC;
 	} else {
 		return RESPST_EXECUTE;
@@ -501,12 +542,21 @@  static enum resp_states check_rkey(struct rxe_qp *qp,
 		}
 	}
 
+	if (pkt->mask & RXE_FLUSH_MASK) {
+		/* FLUSH MR may not set va or resid
+		 * no need to check range since we will flush whole mr
+		 */
+		if (feth_sel(pkt) == IB_FLUSH_MR)
+			goto skip_check_range;
+	}
+
 	if (mr_check_range(mr, va + qp->resp.offset, resid)) {
 		state = RESPST_ERR_RKEY_VIOLATION;
 		goto err;
 	}
 
-	if (pkt->mask & RXE_WRITE_MASK)	 {
+skip_check_range:
+	if (pkt->mask & RXE_WRITE_MASK) {
 		if (resid > mtu) {
 			if (pktlen != mtu || bth_pad(pkt)) {
 				state = RESPST_ERR_LENGTH;
@@ -610,11 +660,61 @@  static struct resp_res *rxe_prepare_res(struct rxe_qp *qp,
 		res->last_psn = pkt->psn;
 		res->cur_psn = pkt->psn;
 		break;
+	case RXE_FLUSH_MASK:
+		res->flush.va = qp->resp.va + qp->resp.offset;
+		res->flush.length = qp->resp.length;
+		res->flush.type = feth_plt(pkt);
+		res->flush.level = feth_sel(pkt);
 	}
 
 	return res;
 }
 
+static enum resp_states process_flush(struct rxe_qp *qp,
+				       struct rxe_pkt_info *pkt)
+{
+	u64 length, start;
+	struct rxe_mr *mr = qp->resp.mr;
+	struct resp_res *res = qp->resp.res;
+
+	/* oA19-14, oA19-15 */
+	if (res && res->replay)
+		return RESPST_ACKNOWLEDGE;
+	else if (!res) {
+		res = rxe_prepare_res(qp, pkt, RXE_FLUSH_MASK);
+		qp->resp.res = res;
+	}
+
+	if (res->flush.level == IB_FLUSH_RANGE) {
+		start = res->flush.va;
+		length = res->flush.length;
+	} else { /* level == IB_FLUSH_MR */
+		start = mr->ibmr.iova;
+		length = mr->ibmr.length;
+	}
+
+	if (res->flush.type & IB_FLUSH_PERSISTENT) {
+		if (rxe_flush_pmem_iova(mr, start, length))
+			return RESPST_ERR_RKEY_VIOLATION;
+		/* Make data persistent. */
+		wmb();
+	} else if (res->flush.type & IB_FLUSH_GLOBAL) {
+		/* Make data global visibility. */
+		wmb();
+	}
+
+	qp->resp.msn++;
+
+	/* next expected psn, read handles this separately */
+	qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+	qp->resp.ack_psn = qp->resp.psn;
+
+	qp->resp.opcode = pkt->opcode;
+	qp->resp.status = IB_WC_SUCCESS;
+
+	return RESPST_ACKNOWLEDGE;
+}
+
 /* Guarantee atomicity of atomic operations at the machine level. */
 static DEFINE_SPINLOCK(atomic_ops_lock);
 
@@ -916,6 +1016,8 @@  static enum resp_states execute(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
 		return RESPST_READ_REPLY;
 	} else if (pkt->mask & RXE_ATOMIC_MASK) {
 		return RESPST_ATOMIC_REPLY;
+	} else if (pkt->mask & RXE_FLUSH_MASK) {
+		return RESPST_PROCESS_FLUSH;
 	} else {
 		/* Unreachable */
 		WARN_ON_ONCE(1);
@@ -1089,6 +1191,19 @@  static int send_atomic_ack(struct rxe_qp *qp, u8 syndrome, u32 psn)
 	return ret;
 }
 
+static int send_read_response_ack(struct rxe_qp *qp, u8 syndrome, u32 psn)
+{
+	int ret = send_common_ack(qp, syndrome, psn,
+			IB_OPCODE_RC_RDMA_READ_RESPONSE_ONLY,
+			"RDMA READ response of length zero ACK");
+
+	/* have to clear this since it is used to trigger
+	 * long read replies
+	 */
+	qp->resp.res = NULL;
+	return ret;
+}
+
 static enum resp_states acknowledge(struct rxe_qp *qp,
 				    struct rxe_pkt_info *pkt)
 {
@@ -1099,6 +1214,8 @@  static enum resp_states acknowledge(struct rxe_qp *qp,
 		send_ack(qp, qp->resp.aeth_syndrome, pkt->psn);
 	else if (pkt->mask & RXE_ATOMIC_MASK)
 		send_atomic_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
+	else if (pkt->mask & RXE_FLUSH_MASK)
+		send_read_response_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
 	else if (bth_ack(pkt))
 		send_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
 
@@ -1155,6 +1272,22 @@  static enum resp_states duplicate_request(struct rxe_qp *qp,
 		/* SEND. Ack again and cleanup. C9-105. */
 		send_ack(qp, AETH_ACK_UNLIMITED, prev_psn);
 		return RESPST_CLEANUP;
+	} else if (pkt->mask & RXE_FLUSH_MASK) {
+		struct resp_res *res;
+
+		/* Find the operation in our list of responder resources. */
+		res = find_resource(qp, pkt->psn);
+		if (res) {
+			res->replay = 1;
+			res->cur_psn = pkt->psn;
+			qp->resp.res = res;
+			rc = RESPST_PROCESS_FLUSH;
+			goto out;
+		}
+
+		/* Resource not found. Class D error. Drop the request. */
+		rc = RESPST_CLEANUP;
+		goto out;
 	} else if (pkt->mask & RXE_READ_MASK) {
 		struct resp_res *res;
 
@@ -1348,6 +1481,9 @@  int rxe_responder(void *arg)
 		case RESPST_ATOMIC_REPLY:
 			state = atomic_reply(qp, pkt);
 			break;
+		case RESPST_PROCESS_FLUSH:
+			state = process_flush(qp, pkt);
+			break;
 		case RESPST_ACKNOWLEDGE:
 			state = acknowledge(qp, pkt);
 			break;
diff --git a/drivers/infiniband/sw/rxe/rxe_verbs.h b/drivers/infiniband/sw/rxe/rxe_verbs.h
index 22a299b0a9f0..19ddfa890480 100644
--- a/drivers/infiniband/sw/rxe/rxe_verbs.h
+++ b/drivers/infiniband/sw/rxe/rxe_verbs.h
@@ -165,6 +165,12 @@  struct resp_res {
 			u64		va;
 			u32		resid;
 		} read;
+		struct {
+			u32		length;
+			u64		va;
+			u8		type;
+			u8		level;
+		} flush;
 	};
 };