diff mbox series

[RFC,v9,18/26] RDMA/rxe: Convert mca read locking to RCU

Message ID 20220127213755.31697-19-rpearsonhpe@gmail.com (mailing list archive)
State RFC
Headers show
Series [RFC,v9,01/26] RDMA/rxe: Move rxe_mcast_add/delete to rxe_mcast.c | expand

Commit Message

Bob Pearson Jan. 27, 2022, 9:37 p.m. UTC
Replace spinlocks with rcu read locks for read side operations
on mca.n rxe_recv.c and rxe_mcast.c. Use rcu list extensions on
write side operations and keep spinlocks to separate write threads.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
 drivers/infiniband/sw/rxe/rxe_mcast.c | 57 ++++++++++++++++-----------
 drivers/infiniband/sw/rxe/rxe_recv.c  |  6 +--
 drivers/infiniband/sw/rxe/rxe_verbs.h |  1 +
 3 files changed, 39 insertions(+), 25 deletions(-)

Comments

Jason Gunthorpe Jan. 28, 2022, 6:39 p.m. UTC | #1
On Thu, Jan 27, 2022 at 03:37:47PM -0600, Bob Pearson wrote:
>  /**
> - * __rxe_init_mca - initialize a new mca holding lock
> + * __rxe_init_mca_rcu - initialize a new mca holding lock
>   * @qp: qp object
>   * @mcg: mcg object
>   * @mca: empty space for new mca
> @@ -280,7 +281,7 @@ void rxe_cleanup_mcg(struct kref *kref)
>   *
>   * Returns: 0 on success else an error
>   */
> -static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
> +static int __rxe_init_mca_rcu(struct rxe_qp *qp, struct rxe_mcg *mcg,
>  			  struct rxe_mca *mca)

There is nothing "rcu" about this function..

> @@ -324,14 +325,14 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
>  	int err;
>  
>  	/* check to see if the qp is already a member of the group */
> -	spin_lock_bh(&rxe->mcg_lock);
> -	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
> +	rcu_read_lock();
> +	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
>  		if (mca->qp == qp) {
> -			spin_unlock_bh(&rxe->mcg_lock);
> +			rcu_read_unlock();
>  			return 0;
>  		}
>  	}
> -	spin_unlock_bh(&rxe->mcg_lock);
> +	rcu_read_unlock();

Ok..

> @@ -340,16 +341,19 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
>  
>  	spin_lock_bh(&rxe->mcg_lock);
>  	/* re-check to see if someone else just attached qp */
> -	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
> +	rcu_read_lock();

Do not hold the RCU if you are holding the write side spinlock. All
mutations o fthe list must hold mcg_lock.

> +	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
>  		if (mca->qp == qp) {
> +			rcu_read_unlock();
>  			kfree(new_mca);
>  			err = 0;
>  			goto done;
>  		}
>  	}
> +	rcu_read_unlock();
>  
>  	mca = new_mca;
> -	err = __rxe_init_mca(qp, mcg, mca);
> +	err = __rxe_init_mca_rcu(qp, mcg, mca);
>  	if (err)
>  		kfree(mca);

Which looks since the list_add is still inside the spinlock

>  done:
> @@ -359,21 +363,23 @@ static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
>  }
>  
>  /**
> - * __rxe_cleanup_mca - cleanup mca object holding lock
> + * __rxe_cleanup_mca_rcu - cleanup mca object holding lock
>   * @mca: mca object
>   * @mcg: mcg object
>   *
>   * Context: caller must hold a reference to mcg and rxe->mcg_lock
>   */
> -static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
> +static void __rxe_cleanup_mca_rcu(struct rxe_mca *mca, struct rxe_mcg *mcg)

Also not rcu, list_del must hold the write side spinlock.

>  {
> -	list_del(&mca->qp_list);
> +	list_del_rcu(&mca->qp_list);
>  
>  	atomic_dec(&mcg->qp_num);
>  	atomic_dec(&mcg->rxe->mcg_attach);
>  	atomic_dec(&mca->qp->mcg_num);
>  
>  	rxe_drop_ref(mca->qp);
> +
> +	kfree_rcu(mca, rcu);

OK

>  }
>  
>  /**
> @@ -386,22 +392,29 @@ static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
>  static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
>  {
>  	struct rxe_dev *rxe = mcg->rxe;
> -	struct rxe_mca *mca, *tmp;
> +	struct rxe_mca *mca;
> +	int ret;
>  
>  	spin_lock_bh(&rxe->mcg_lock);
> -	list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) {
> +	rcu_read_lock();
> +	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {

As before, don't hold the rcu when holding the write side lock

>  		if (mca->qp == qp) {
> -			__rxe_cleanup_mca(mca, mcg);
> -			if (atomic_read(&mcg->qp_num) <= 0)
> -				kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg);
> -			spin_unlock_bh(&rxe->mcg_lock);
> -			kfree(mca);
> -			return 0;
> +			rcu_read_unlock();
> +			goto found;
>  		}
>  	}
> +	rcu_read_unlock();
> +	ret = -EINVAL;
> +	goto done;
> +found:
> +	__rxe_cleanup_mca_rcu(mca, mcg);
> +	if (atomic_read(&mcg->qp_num) <= 0)
> +		kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg);

This is confusing, why an atomic and a refcount with an atomic? Isn't
qpnum == 0 the same as list_empty(qp_list) ?

> diff --git a/drivers/infiniband/sw/rxe/rxe_recv.c b/drivers/infiniband/sw/rxe/rxe_recv.c
> index 357a6cea1484..7f2ea61a52c1 100644
> +++ b/drivers/infiniband/sw/rxe/rxe_recv.c
> @@ -267,13 +267,13 @@ static void rxe_rcv_mcast_pkt(struct sk_buff *skb)
>  	qp_array = kmalloc_array(nmax, sizeof(qp), GFP_KERNEL);
>  
>  	n = 0;
> -	spin_lock_bh(&rxe->mcg_lock);
> -	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
> +	rcu_read_lock();
> +	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
>  		qp_array[n++] = mca->qp;
>  		if (n == nmax)
>  			break;
>  	}
> -	spin_unlock_bh(&rxe->mcg_lock);
> +	rcu_read_unlock();
>  	kref_put(&mcg->ref_cnt, rxe_cleanup_mcg);

I have no idea how this works, what keeps 'qp' valid and prevents it
from being free'd once we leave the locking? Remember the mca can be
in concurrent progress to free so qp is just garbage under RCU at this
point.

Jason
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe_mcast.c b/drivers/infiniband/sw/rxe/rxe_mcast.c
index 865e6e85084f..c193bd4975f7 100644
--- a/drivers/infiniband/sw/rxe/rxe_mcast.c
+++ b/drivers/infiniband/sw/rxe/rxe_mcast.c
@@ -27,7 +27,8 @@ 
  * the mcg is created and an extra kref_put when the qp count decreases
  * to zero.
  *
- * The qp list and the red-black tree are protected by a single
+ * The qp list is protected for read operations by RCU and the qp list and
+ * the red-black tree are protected for write operations by a single
  * rxe->mcg_lock per device.
  */
 
@@ -270,7 +271,7 @@  void rxe_cleanup_mcg(struct kref *kref)
 }
 
 /**
- * __rxe_init_mca - initialize a new mca holding lock
+ * __rxe_init_mca_rcu - initialize a new mca holding lock
  * @qp: qp object
  * @mcg: mcg object
  * @mca: empty space for new mca
@@ -280,7 +281,7 @@  void rxe_cleanup_mcg(struct kref *kref)
  *
  * Returns: 0 on success else an error
  */
-static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
+static int __rxe_init_mca_rcu(struct rxe_qp *qp, struct rxe_mcg *mcg,
 			  struct rxe_mca *mca)
 {
 	struct rxe_dev *rxe = to_rdev(qp->ibqp.device);
@@ -304,7 +305,7 @@  static int __rxe_init_mca(struct rxe_qp *qp, struct rxe_mcg *mcg,
 	rxe_add_ref(qp);
 	mca->qp = qp;
 
-	list_add_tail(&mca->qp_list, &mcg->qp_list);
+	list_add_tail_rcu(&mca->qp_list, &mcg->qp_list);
 
 	return 0;
 }
@@ -324,14 +325,14 @@  static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
 	int err;
 
 	/* check to see if the qp is already a member of the group */
-	spin_lock_bh(&rxe->mcg_lock);
-	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
 		if (mca->qp == qp) {
-			spin_unlock_bh(&rxe->mcg_lock);
+			rcu_read_unlock();
 			return 0;
 		}
 	}
-	spin_unlock_bh(&rxe->mcg_lock);
+	rcu_read_unlock();
 
 	/* speculative alloc new mca without using GFP_ATOMIC */
 	new_mca = kzalloc(sizeof(*mca), GFP_KERNEL);
@@ -340,16 +341,19 @@  static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
 
 	spin_lock_bh(&rxe->mcg_lock);
 	/* re-check to see if someone else just attached qp */
-	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
 		if (mca->qp == qp) {
+			rcu_read_unlock();
 			kfree(new_mca);
 			err = 0;
 			goto done;
 		}
 	}
+	rcu_read_unlock();
 
 	mca = new_mca;
-	err = __rxe_init_mca(qp, mcg, mca);
+	err = __rxe_init_mca_rcu(qp, mcg, mca);
 	if (err)
 		kfree(mca);
 done:
@@ -359,21 +363,23 @@  static int rxe_attach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
 }
 
 /**
- * __rxe_cleanup_mca - cleanup mca object holding lock
+ * __rxe_cleanup_mca_rcu - cleanup mca object holding lock
  * @mca: mca object
  * @mcg: mcg object
  *
  * Context: caller must hold a reference to mcg and rxe->mcg_lock
  */
-static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
+static void __rxe_cleanup_mca_rcu(struct rxe_mca *mca, struct rxe_mcg *mcg)
 {
-	list_del(&mca->qp_list);
+	list_del_rcu(&mca->qp_list);
 
 	atomic_dec(&mcg->qp_num);
 	atomic_dec(&mcg->rxe->mcg_attach);
 	atomic_dec(&mca->qp->mcg_num);
 
 	rxe_drop_ref(mca->qp);
+
+	kfree_rcu(mca, rcu);
 }
 
 /**
@@ -386,22 +392,29 @@  static void __rxe_cleanup_mca(struct rxe_mca *mca, struct rxe_mcg *mcg)
 static int rxe_detach_mcg(struct rxe_mcg *mcg, struct rxe_qp *qp)
 {
 	struct rxe_dev *rxe = mcg->rxe;
-	struct rxe_mca *mca, *tmp;
+	struct rxe_mca *mca;
+	int ret;
 
 	spin_lock_bh(&rxe->mcg_lock);
-	list_for_each_entry_safe(mca, tmp, &mcg->qp_list, qp_list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
 		if (mca->qp == qp) {
-			__rxe_cleanup_mca(mca, mcg);
-			if (atomic_read(&mcg->qp_num) <= 0)
-				kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg);
-			spin_unlock_bh(&rxe->mcg_lock);
-			kfree(mca);
-			return 0;
+			rcu_read_unlock();
+			goto found;
 		}
 	}
+	rcu_read_unlock();
+	ret = -EINVAL;
+	goto done;
+found:
+	__rxe_cleanup_mca_rcu(mca, mcg);
+	if (atomic_read(&mcg->qp_num) <= 0)
+		kref_put(&mcg->ref_cnt, __rxe_cleanup_mcg);
+	ret = 0;
+done:
 	spin_unlock_bh(&rxe->mcg_lock);
 
-	return -EINVAL;
+	return ret;
 }
 
 /**
diff --git a/drivers/infiniband/sw/rxe/rxe_recv.c b/drivers/infiniband/sw/rxe/rxe_recv.c
index 357a6cea1484..7f2ea61a52c1 100644
--- a/drivers/infiniband/sw/rxe/rxe_recv.c
+++ b/drivers/infiniband/sw/rxe/rxe_recv.c
@@ -267,13 +267,13 @@  static void rxe_rcv_mcast_pkt(struct sk_buff *skb)
 	qp_array = kmalloc_array(nmax, sizeof(qp), GFP_KERNEL);
 
 	n = 0;
-	spin_lock_bh(&rxe->mcg_lock);
-	list_for_each_entry(mca, &mcg->qp_list, qp_list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(mca, &mcg->qp_list, qp_list) {
 		qp_array[n++] = mca->qp;
 		if (n == nmax)
 			break;
 	}
-	spin_unlock_bh(&rxe->mcg_lock);
+	rcu_read_unlock();
 	kref_put(&mcg->ref_cnt, rxe_cleanup_mcg);
 
 	nmax = n;
diff --git a/drivers/infiniband/sw/rxe/rxe_verbs.h b/drivers/infiniband/sw/rxe/rxe_verbs.h
index 76350d43ce2a..12bff190fc1f 100644
--- a/drivers/infiniband/sw/rxe/rxe_verbs.h
+++ b/drivers/infiniband/sw/rxe/rxe_verbs.h
@@ -365,6 +365,7 @@  struct rxe_mcg {
 struct rxe_mca {
 	struct list_head	qp_list;
 	struct rxe_qp		*qp;
+	struct rcu_head		rcu;
 };
 
 struct rxe_port {