diff mbox series

[for-next,v3,3/7] RDMA/rxe: Cleanup code for responder Atomic operations

Message ID d95bb1de314ec3cf5a93e0c5730900d67521e08d.1671772917.git.matsuda-daisuke@fujitsu.com (mailing list archive)
State New, archived
Headers show
Series On-Demand Paging on SoftRoCE | expand

Commit Message

Daisuke Matsuda (Fujitsu) Dec. 23, 2022, 6:51 a.m. UTC
Currently, rxe_responder() directly calls the function to execute Atomic
operations. This need to be modified to insert some conditional branches
for the ODP feature. Additionally, rxe_resp.h is newly added to be used by
rxe_odp.c in near future.

Signed-off-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
---
 drivers/infiniband/sw/rxe/rxe_resp.c | 100 +++++++++++++++++----------
 drivers/infiniband/sw/rxe/rxe_resp.h |   9 +++
 2 files changed, 71 insertions(+), 38 deletions(-)
 create mode 100644 drivers/infiniband/sw/rxe/rxe_resp.h

Comments

Jason Gunthorpe Jan. 16, 2023, 6:21 p.m. UTC | #1
On Fri, Dec 23, 2022 at 03:51:54PM +0900, Daisuke Matsuda wrote:
> @@ -733,60 +734,83 @@ static enum resp_states process_flush(struct rxe_qp *qp,
>  /* Guarantee atomicity of atomic operations at the machine level. */
>  static DEFINE_SPINLOCK(atomic_ops_lock);
>  
> -static enum resp_states atomic_reply(struct rxe_qp *qp,
> -					 struct rxe_pkt_info *pkt)
> +enum resp_states rxe_process_atomic(struct rxe_qp *qp,
> +				    struct rxe_pkt_info *pkt, u64 *vaddr)
>  {
> -	u64 *vaddr;
>  	enum resp_states ret;
> -	struct rxe_mr *mr = qp->resp.mr;
>  	struct resp_res *res = qp->resp.res;
>  	u64 value;
>  
> -	if (!res) {
> -		res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
> -		qp->resp.res = res;
> +	/* check vaddr is 8 bytes aligned. */
> +	if (!vaddr || (uintptr_t)vaddr & 7) {
> +		ret = RESPST_ERR_MISALIGNED_ATOMIC;
> +		goto out;
>  	}
>  
> -	if (!res->replay) {
> -		if (mr->state != RXE_MR_STATE_VALID) {
> -			ret = RESPST_ERR_RKEY_VIOLATION;
> -			goto out;
> -		}
> +	spin_lock(&atomic_ops_lock);
> +	res->atomic.orig_val = value = *vaddr;
>  
> -		vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
> -					sizeof(u64));

I think you need to properly fix the lifetime problem with iova_to_vaddr
function, not hack around it like this.

iova_to_vaddr should be able to return an IOVA for ODP just fine - the
reason it can't is the same bug it has with normal MRs, the mapping
can just change under the feet and there is no protective locking.

If you are going to follow the same ODP design as mlx5 then
fundamentally all ODP does to the MR is add a not-present bit and
allow the MR pages to churn rapidly.

Make the MR safe to changes in the page references against races and
ODP will work just fine.

This will be easier on top of Bob's xarray patch, please check what he
has there and test it.

Thanks,
Jason
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 991550baef8c..e18bca076337 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -9,6 +9,7 @@ 
 #include "rxe.h"
 #include "rxe_loc.h"
 #include "rxe_queue.h"
+#include "rxe_resp.h"
 
 enum resp_states {
 	RESPST_NONE,
@@ -733,60 +734,83 @@  static enum resp_states process_flush(struct rxe_qp *qp,
 /* Guarantee atomicity of atomic operations at the machine level. */
 static DEFINE_SPINLOCK(atomic_ops_lock);
 
-static enum resp_states atomic_reply(struct rxe_qp *qp,
-					 struct rxe_pkt_info *pkt)
+enum resp_states rxe_process_atomic(struct rxe_qp *qp,
+				    struct rxe_pkt_info *pkt, u64 *vaddr)
 {
-	u64 *vaddr;
 	enum resp_states ret;
-	struct rxe_mr *mr = qp->resp.mr;
 	struct resp_res *res = qp->resp.res;
 	u64 value;
 
-	if (!res) {
-		res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
-		qp->resp.res = res;
+	/* check vaddr is 8 bytes aligned. */
+	if (!vaddr || (uintptr_t)vaddr & 7) {
+		ret = RESPST_ERR_MISALIGNED_ATOMIC;
+		goto out;
 	}
 
-	if (!res->replay) {
-		if (mr->state != RXE_MR_STATE_VALID) {
-			ret = RESPST_ERR_RKEY_VIOLATION;
-			goto out;
-		}
+	spin_lock(&atomic_ops_lock);
+	res->atomic.orig_val = value = *vaddr;
 
-		vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
-					sizeof(u64));
+	if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
+		if (value == atmeth_comp(pkt))
+			value = atmeth_swap_add(pkt);
+	} else {
+		value += atmeth_swap_add(pkt);
+	}
 
-		/* check vaddr is 8 bytes aligned. */
-		if (!vaddr || (uintptr_t)vaddr & 7) {
-			ret = RESPST_ERR_MISALIGNED_ATOMIC;
-			goto out;
-		}
+	*vaddr = value;
+	spin_unlock(&atomic_ops_lock);
 
-		spin_lock_bh(&atomic_ops_lock);
-		res->atomic.orig_val = value = *vaddr;
+	qp->resp.msn++;
 
-		if (pkt->opcode == IB_OPCODE_RC_COMPARE_SWAP) {
-			if (value == atmeth_comp(pkt))
-				value = atmeth_swap_add(pkt);
-		} else {
-			value += atmeth_swap_add(pkt);
-		}
+	/* next expected psn, read handles this separately */
+	qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+	qp->resp.ack_psn = qp->resp.psn;
 
-		*vaddr = value;
-		spin_unlock_bh(&atomic_ops_lock);
+	qp->resp.opcode = pkt->opcode;
+	qp->resp.status = IB_WC_SUCCESS;
 
-		qp->resp.msn++;
+	ret = RESPST_ACKNOWLEDGE;
+out:
+	return ret;
+}
+
+static enum resp_states rxe_atomic_ops(struct rxe_qp *qp,
+					struct rxe_pkt_info *pkt,
+					struct rxe_mr *mr)
+{
+	u64 *vaddr;
+	int ret;
+
+	vaddr = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset,
+			      sizeof(u64));
+
+	if (pkt->mask & RXE_ATOMIC_MASK)
+		ret = rxe_process_atomic(qp, pkt, vaddr);
+	else
+		ret = RESPST_ERR_UNSUPPORTED_OPCODE;
+
+	return ret;
+}
 
-		/* next expected psn, read handles this separately */
-		qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
-		qp->resp.ack_psn = qp->resp.psn;
+static enum resp_states rxe_atomic_reply(struct rxe_qp *qp,
+					 struct rxe_pkt_info *pkt)
+{
+	struct rxe_mr *mr = qp->resp.mr;
+	struct resp_res *res = qp->resp.res;
+	int ret;
 
-		qp->resp.opcode = pkt->opcode;
-		qp->resp.status = IB_WC_SUCCESS;
+	if (!res) {
+		res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_MASK);
+		qp->resp.res = res;
 	}
 
-	ret = RESPST_ACKNOWLEDGE;
-out:
+	if (!res->replay) {
+		if (mr->state != RXE_MR_STATE_VALID)
+			return RESPST_ERR_RKEY_VIOLATION;
+		ret = rxe_atomic_ops(qp, pkt, mr);
+	} else
+		ret = RESPST_ACKNOWLEDGE;
+
 	return ret;
 }
 
@@ -1556,7 +1580,7 @@  int rxe_responder(void *arg)
 			state = read_reply(qp, pkt);
 			break;
 		case RESPST_ATOMIC_REPLY:
-			state = atomic_reply(qp, pkt);
+			state = rxe_atomic_reply(qp, pkt);
 			break;
 		case RESPST_ATOMIC_WRITE_REPLY:
 			state = atomic_write_reply(qp, pkt);
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.h b/drivers/infiniband/sw/rxe/rxe_resp.h
new file mode 100644
index 000000000000..94a4869fdab6
--- /dev/null
+++ b/drivers/infiniband/sw/rxe/rxe_resp.h
@@ -0,0 +1,9 @@ 
+/* SPDX-License-Identifier: GPL-2.0 OR Linux-OpenIB */
+
+#ifndef RXE_RESP_H
+#define RXE_RESP_H
+
+enum resp_states rxe_process_atomic(struct rxe_qp *qp,
+				    struct rxe_pkt_info *pkt, u64 *vaddr);
+
+#endif /* RXE_RESP_H */