diff mbox series

[net-next,03/10] net/smc: allow confirm/delete rkey response deliver multiplex

Message ID f04de6958ebe9539227ae459837993e8c3e6ede6.1660152975.git.alibuda@linux.alibaba.com (mailing list archive)
State Not Applicable
Headers show
Series net/smc: optimize the parallelism of SMC-R connections | expand

Commit Message

D. Wythe Aug. 10, 2022, 5:47 p.m. UTC
From: "D. Wythe" <alibuda@linux.alibaba.com>

We know that all flows except confirm_rkey and delete_rkey are exclusive,
confirm/delete rkey flows can run concurrently (local and remote).

Although the protocol allows, all flows are actually mutually exclusive
in implementation, dues to waiting for LLC messages is in serial.

This aggravates the time for establishing or destroying a SMC-R
connections, connections have to be queued in smc_llc_wait.

We use rtokens or rkey to correlate a confirm/delete rkey message
with its response.

Before sending a message, we put context with rtokens or rkey into
wait queue. When a response message received, we wakeup the context
which with the same rtokens or rkey against the response message.

Signed-off-by: D. Wythe <alibuda@linux.alibaba.com>
---
 net/smc/smc_llc.c | 174 +++++++++++++++++++++++++++++++++++++++++-------------
 net/smc/smc_wr.c  |  10 ----
 net/smc/smc_wr.h  |  10 ++++
 3 files changed, 143 insertions(+), 51 deletions(-)

Comments

Tony Lu Aug. 16, 2022, 8:17 a.m. UTC | #1
On Thu, Aug 11, 2022 at 01:47:34AM +0800, D. Wythe wrote:
> From: "D. Wythe" <alibuda@linux.alibaba.com>
> 
> We know that all flows except confirm_rkey and delete_rkey are exclusive,
> confirm/delete rkey flows can run concurrently (local and remote).
> 
> Although the protocol allows, all flows are actually mutually exclusive
> in implementation, dues to waiting for LLC messages is in serial.
> 
> This aggravates the time for establishing or destroying a SMC-R
> connections, connections have to be queued in smc_llc_wait.
> 
> We use rtokens or rkey to correlate a confirm/delete rkey message
> with its response.
> 
> Before sending a message, we put context with rtokens or rkey into
> wait queue. When a response message received, we wakeup the context
> which with the same rtokens or rkey against the response message.
> 
> Signed-off-by: D. Wythe <alibuda@linux.alibaba.com>
> ---
>  net/smc/smc_llc.c | 174 +++++++++++++++++++++++++++++++++++++++++-------------
>  net/smc/smc_wr.c  |  10 ----
>  net/smc/smc_wr.h  |  10 ++++
>  3 files changed, 143 insertions(+), 51 deletions(-)
> 
> diff --git a/net/smc/smc_llc.c b/net/smc/smc_llc.c
> index 8134c15..b026df2 100644
> --- a/net/smc/smc_llc.c
> +++ b/net/smc/smc_llc.c
> @@ -200,6 +200,7 @@ struct smc_llc_msg_delete_rkey_v2 {	/* type 0x29 */
>  struct smc_llc_qentry {
>  	struct list_head list;
>  	struct smc_link *link;
> +	void *private;
>  	union smc_llc_msg msg;
>  };
>  
> @@ -479,19 +480,17 @@ int smc_llc_send_confirm_link(struct smc_link *link,
>  	return rc;
>  }
>  
> -/* send LLC confirm rkey request */
> -static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
> -				     struct smc_buf_desc *rmb_desc)
> +/* build LLC confirm rkey request */
> +static int smc_llc_build_confirm_rkey_request(struct smc_link *send_link,
> +					      struct smc_buf_desc *rmb_desc,
> +					      struct smc_wr_tx_pend_priv **priv)
>  {
>  	struct smc_llc_msg_confirm_rkey *rkeyllc;
> -	struct smc_wr_tx_pend_priv *pend;
>  	struct smc_wr_buf *wr_buf;
>  	struct smc_link *link;
>  	int i, rc, rtok_ix;
>  
> -	if (!smc_wr_tx_link_hold(send_link))
> -		return -ENOLINK;
> -	rc = smc_llc_add_pending_send(send_link, &wr_buf, &pend);
> +	rc = smc_llc_add_pending_send(send_link, &wr_buf, priv);
>  	if (rc)
>  		goto put_out;
>  	rkeyllc = (struct smc_llc_msg_confirm_rkey *)wr_buf;
> @@ -521,25 +520,20 @@ static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
>  		cpu_to_be64((uintptr_t)rmb_desc->cpu_addr) :
>  		cpu_to_be64((u64)sg_dma_address
>  			    (rmb_desc->sgt[send_link->link_idx].sgl));
> -	/* send llc message */
> -	rc = smc_wr_tx_send(send_link, pend);
>  put_out:
> -	smc_wr_tx_link_put(send_link);
>  	return rc;
>  }
>  
> -/* send LLC delete rkey request */
> -static int smc_llc_send_delete_rkey(struct smc_link *link,
> -				    struct smc_buf_desc *rmb_desc)
> +/* build LLC delete rkey request */
> +static int smc_llc_build_delete_rkey_request(struct smc_link *link,
> +					     struct smc_buf_desc *rmb_desc,
> +					     struct smc_wr_tx_pend_priv **priv)
>  {
>  	struct smc_llc_msg_delete_rkey *rkeyllc;
> -	struct smc_wr_tx_pend_priv *pend;
>  	struct smc_wr_buf *wr_buf;
>  	int rc;
>  
> -	if (!smc_wr_tx_link_hold(link))
> -		return -ENOLINK;
> -	rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
> +	rc = smc_llc_add_pending_send(link, &wr_buf, priv);
>  	if (rc)
>  		goto put_out;
>  	rkeyllc = (struct smc_llc_msg_delete_rkey *)wr_buf;
> @@ -548,10 +542,7 @@ static int smc_llc_send_delete_rkey(struct smc_link *link,
>  	smc_llc_init_msg_hdr(&rkeyllc->hd, link->lgr, sizeof(*rkeyllc));
>  	rkeyllc->num_rkeys = 1;
>  	rkeyllc->rkey[0] = htonl(rmb_desc->mr[link->link_idx]->rkey);
> -	/* send llc message */
> -	rc = smc_wr_tx_send(link, pend);
>  put_out:
> -	smc_wr_tx_link_put(link);
>  	return rc;
>  }
>  
> @@ -2023,7 +2014,8 @@ static void smc_llc_rx_response(struct smc_link *link,
>  	case SMC_LLC_DELETE_RKEY:
>  		if (flowtype != SMC_LLC_FLOW_RKEY || flow->qentry)
>  			break;	/* drop out-of-flow response */
> -		goto assign;
> +		__wake_up(&link->lgr->llc_msg_waiter, TASK_NORMAL, 1, qentry);
> +		goto free;
>  	case SMC_LLC_CONFIRM_RKEY_CONT:
>  		/* not used because max links is 3 */
>  		break;
> @@ -2032,6 +2024,7 @@ static void smc_llc_rx_response(struct smc_link *link,
>  					   qentry->msg.raw.hdr.common.type);
>  		break;
>  	}
> +free:
>  	kfree(qentry);
>  	return;
>  assign:
> @@ -2191,25 +2184,98 @@ void smc_llc_link_clear(struct smc_link *link, bool log)
>  	cancel_delayed_work_sync(&link->llc_testlink_wrk);
>  }
>  
> +static int smc_llc_rkey_response_wake_function(struct wait_queue_entry *wq_entry,
> +					       unsigned int mode, int sync, void *key)
> +{
> +	struct smc_llc_qentry *except, *incoming;
> +	u8 except_llc_type;
> +
> +	/* not a rkey response */
> +	if (!key)
> +		return 0;
> +
> +	except = wq_entry->private;
> +	incoming = key;
> +
> +	except_llc_type = except->msg.raw.hdr.common.llc_type;
> +
> +	/* except LLC MSG TYPE mismatch */
> +	if (except_llc_type != incoming->msg.raw.hdr.common.llc_type)
> +		return 0;
> +
> +	switch (except_llc_type) {
> +	case SMC_LLC_CONFIRM_RKEY:
> +		if (memcmp(except->msg.confirm_rkey.rtoken, incoming->msg.confirm_rkey.rtoken,
> +			   sizeof(struct smc_rmb_rtoken) *
> +			   except->msg.confirm_rkey.rtoken[0].num_rkeys))
> +			return 0;
> +		break;
> +	case SMC_LLC_DELETE_RKEY:
> +		if (memcmp(except->msg.delete_rkey.rkey, incoming->msg.delete_rkey.rkey,
> +			   sizeof(__be32) * except->msg.delete_rkey.num_rkeys))
> +			return 0;
> +		break;
> +	default:
> +		panic("invalid except llc msg %d", except_llc_type);

Replace panic with pr_warn?

> +		return 0;
> +	}
> +
> +	/* match, save hdr */
> +	memcpy(&except->msg.raw.hdr, &incoming->msg.raw.hdr, sizeof(except->msg.raw.hdr));
> +
> +	wq_entry->private = except->private;
> +	return woken_wake_function(wq_entry, mode, sync, NULL);
> +}
> +
>  /* register a new rtoken at the remote peer (for all links) */
>  int smc_llc_do_confirm_rkey(struct smc_link *send_link,
>  			    struct smc_buf_desc *rmb_desc)
>  {
> +	long timeout = SMC_LLC_WAIT_TIME;

Reverse Christmas trees.

>  	struct smc_link_group *lgr = send_link->lgr;
> -	struct smc_llc_qentry *qentry = NULL;
> -	int rc = 0;
> +	struct smc_llc_qentry qentry;
> +	struct smc_wr_tx_pend *pend;
> +	struct smc_wr_tx_pend_priv *priv;
> +	DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function);
> +	int rc = 0, flags;

Ditto.

>  
> -	rc = smc_llc_send_confirm_rkey(send_link, rmb_desc);
> +	if (!smc_wr_tx_link_hold(send_link))
> +		return -ENOLINK;
> +
> +	rc = smc_llc_build_confirm_rkey_request(send_link, rmb_desc, &priv);
>  	if (rc)
>  		goto out;
> -	/* receive CONFIRM RKEY response from server over RoCE fabric */
> -	qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
> -			      SMC_LLC_CONFIRM_RKEY);
> -	if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
> +
> +	pend = container_of(priv, struct smc_wr_tx_pend, priv);
> +	/* make a copy of send msg */
> +	memcpy(&qentry.msg.confirm_rkey, send_link->wr_tx_bufs[pend->idx].raw,
> +	       sizeof(qentry.msg.confirm_rkey));
> +
> +	qentry.private = wait.private;
> +	wait.private = &qentry;
> +
> +	add_wait_queue(&lgr->llc_msg_waiter, &wait);
> +
> +	/* send llc message */
> +	rc = smc_wr_tx_send(send_link, priv);
> +	smc_wr_tx_link_put(send_link);
> +	if (rc) {
> +		remove_wait_queue(&lgr->llc_msg_waiter, &wait);
> +		goto out;
> +	}
> +
> +	while (!signal_pending(current) && timeout) {
> +		timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
> +		if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP)
> +			break;
> +	}
> +
> +	remove_wait_queue(&lgr->llc_msg_waiter, &wait);
> +	flags = qentry.msg.raw.hdr.flags;
> +
> +	if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG)
>  		rc = -EFAULT;
>  out:
> -	if (qentry)
> -		smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
>  	return rc;
>  }
>  
> @@ -2217,26 +2283,52 @@ int smc_llc_do_confirm_rkey(struct smc_link *send_link,
>  int smc_llc_do_delete_rkey(struct smc_link_group *lgr,
>  			   struct smc_buf_desc *rmb_desc)
>  {
> -	struct smc_llc_qentry *qentry = NULL;
> +	long timeout = SMC_LLC_WAIT_TIME;
> +	struct smc_llc_qentry qentry;
> +	struct smc_wr_tx_pend *pend;
>  	struct smc_link *send_link;
> -	int rc = 0;
> +	struct smc_wr_tx_pend_priv *priv;

Reverse Christmas trees.

> +	DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function);
> +	int rc = 0, flags;
>  
>  	send_link = smc_llc_usable_link(lgr);
> -	if (!send_link)
> +	if (!send_link || !smc_wr_tx_link_hold(send_link))
>  		return -ENOLINK;
>  
> -	/* protected by llc_flow control */
> -	rc = smc_llc_send_delete_rkey(send_link, rmb_desc);
> +	rc = smc_llc_build_delete_rkey_request(send_link, rmb_desc, &priv);
>  	if (rc)
>  		goto out;
> -	/* receive DELETE RKEY response from server over RoCE fabric */
> -	qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
> -			      SMC_LLC_DELETE_RKEY);
> -	if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
> +
> +	pend = container_of(priv, struct smc_wr_tx_pend, priv);
> +	/* make a copy of send msg */
> +	memcpy(&qentry.msg.delete_link, send_link->wr_tx_bufs[pend->idx].raw,
> +	       sizeof(qentry.msg.delete_link));
> +
> +	qentry.private = wait.private;
> +	wait.private = &qentry;
> +
> +	add_wait_queue(&lgr->llc_msg_waiter, &wait);
> +
> +	/* send llc message */
> +	rc = smc_wr_tx_send(send_link, priv);
> +	smc_wr_tx_link_put(send_link);
> +	if (rc) {
> +		remove_wait_queue(&lgr->llc_msg_waiter, &wait);
> +		goto out;
> +	}
> +
> +	while (!signal_pending(current) && timeout) {
> +		timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
> +		if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP)
> +			break;
> +	}
> +
> +	remove_wait_queue(&lgr->llc_msg_waiter, &wait);
> +	flags = qentry.msg.raw.hdr.flags;
> +
> +	if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG)
>  		rc = -EFAULT;
>  out:
> -	if (qentry)
> -		smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
>  	return rc;
>  }
>  
> diff --git a/net/smc/smc_wr.c b/net/smc/smc_wr.c
> index 26f8f24..52af94f 100644
> --- a/net/smc/smc_wr.c
> +++ b/net/smc/smc_wr.c
> @@ -37,16 +37,6 @@
>  static DEFINE_HASHTABLE(smc_wr_rx_hash, SMC_WR_RX_HASH_BITS);
>  static DEFINE_SPINLOCK(smc_wr_rx_hash_lock);
>  
> -struct smc_wr_tx_pend {	/* control data for a pending send request */
> -	u64			wr_id;		/* work request id sent */
> -	smc_wr_tx_handler	handler;
> -	enum ib_wc_status	wc_status;	/* CQE status */
> -	struct smc_link		*link;
> -	u32			idx;
> -	struct smc_wr_tx_pend_priv priv;
> -	u8			compl_requested;
> -};
> -
>  /******************************** send queue *********************************/
>  
>  /*------------------------------- completion --------------------------------*/
> diff --git a/net/smc/smc_wr.h b/net/smc/smc_wr.h
> index a54e90a..9946ed5 100644
> --- a/net/smc/smc_wr.h
> +++ b/net/smc/smc_wr.h
> @@ -46,6 +46,16 @@ struct smc_wr_rx_handler {
>  	u8			type;
>  };
>  
> +struct smc_wr_tx_pend {	/* control data for a pending send request */
> +	u64			wr_id;		/* work request id sent */
> +	smc_wr_tx_handler	handler;
> +	enum ib_wc_status	wc_status;	/* CQE status */
> +	struct smc_link		*link;
> +	u32			idx;
> +	struct smc_wr_tx_pend_priv priv;
> +	u8			compl_requested;
> +};
> +
>  /* Only used by RDMA write WRs.
>   * All other WRs (CDC/LLC) use smc_wr_tx_send handling WR_ID implicitly
>   */
> -- 
> 1.8.3.1
diff mbox series

Patch

diff --git a/net/smc/smc_llc.c b/net/smc/smc_llc.c
index 8134c15..b026df2 100644
--- a/net/smc/smc_llc.c
+++ b/net/smc/smc_llc.c
@@ -200,6 +200,7 @@  struct smc_llc_msg_delete_rkey_v2 {	/* type 0x29 */
 struct smc_llc_qentry {
 	struct list_head list;
 	struct smc_link *link;
+	void *private;
 	union smc_llc_msg msg;
 };
 
@@ -479,19 +480,17 @@  int smc_llc_send_confirm_link(struct smc_link *link,
 	return rc;
 }
 
-/* send LLC confirm rkey request */
-static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
-				     struct smc_buf_desc *rmb_desc)
+/* build LLC confirm rkey request */
+static int smc_llc_build_confirm_rkey_request(struct smc_link *send_link,
+					      struct smc_buf_desc *rmb_desc,
+					      struct smc_wr_tx_pend_priv **priv)
 {
 	struct smc_llc_msg_confirm_rkey *rkeyllc;
-	struct smc_wr_tx_pend_priv *pend;
 	struct smc_wr_buf *wr_buf;
 	struct smc_link *link;
 	int i, rc, rtok_ix;
 
-	if (!smc_wr_tx_link_hold(send_link))
-		return -ENOLINK;
-	rc = smc_llc_add_pending_send(send_link, &wr_buf, &pend);
+	rc = smc_llc_add_pending_send(send_link, &wr_buf, priv);
 	if (rc)
 		goto put_out;
 	rkeyllc = (struct smc_llc_msg_confirm_rkey *)wr_buf;
@@ -521,25 +520,20 @@  static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
 		cpu_to_be64((uintptr_t)rmb_desc->cpu_addr) :
 		cpu_to_be64((u64)sg_dma_address
 			    (rmb_desc->sgt[send_link->link_idx].sgl));
-	/* send llc message */
-	rc = smc_wr_tx_send(send_link, pend);
 put_out:
-	smc_wr_tx_link_put(send_link);
 	return rc;
 }
 
-/* send LLC delete rkey request */
-static int smc_llc_send_delete_rkey(struct smc_link *link,
-				    struct smc_buf_desc *rmb_desc)
+/* build LLC delete rkey request */
+static int smc_llc_build_delete_rkey_request(struct smc_link *link,
+					     struct smc_buf_desc *rmb_desc,
+					     struct smc_wr_tx_pend_priv **priv)
 {
 	struct smc_llc_msg_delete_rkey *rkeyllc;
-	struct smc_wr_tx_pend_priv *pend;
 	struct smc_wr_buf *wr_buf;
 	int rc;
 
-	if (!smc_wr_tx_link_hold(link))
-		return -ENOLINK;
-	rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
+	rc = smc_llc_add_pending_send(link, &wr_buf, priv);
 	if (rc)
 		goto put_out;
 	rkeyllc = (struct smc_llc_msg_delete_rkey *)wr_buf;
@@ -548,10 +542,7 @@  static int smc_llc_send_delete_rkey(struct smc_link *link,
 	smc_llc_init_msg_hdr(&rkeyllc->hd, link->lgr, sizeof(*rkeyllc));
 	rkeyllc->num_rkeys = 1;
 	rkeyllc->rkey[0] = htonl(rmb_desc->mr[link->link_idx]->rkey);
-	/* send llc message */
-	rc = smc_wr_tx_send(link, pend);
 put_out:
-	smc_wr_tx_link_put(link);
 	return rc;
 }
 
@@ -2023,7 +2014,8 @@  static void smc_llc_rx_response(struct smc_link *link,
 	case SMC_LLC_DELETE_RKEY:
 		if (flowtype != SMC_LLC_FLOW_RKEY || flow->qentry)
 			break;	/* drop out-of-flow response */
-		goto assign;
+		__wake_up(&link->lgr->llc_msg_waiter, TASK_NORMAL, 1, qentry);
+		goto free;
 	case SMC_LLC_CONFIRM_RKEY_CONT:
 		/* not used because max links is 3 */
 		break;
@@ -2032,6 +2024,7 @@  static void smc_llc_rx_response(struct smc_link *link,
 					   qentry->msg.raw.hdr.common.type);
 		break;
 	}
+free:
 	kfree(qentry);
 	return;
 assign:
@@ -2191,25 +2184,98 @@  void smc_llc_link_clear(struct smc_link *link, bool log)
 	cancel_delayed_work_sync(&link->llc_testlink_wrk);
 }
 
+static int smc_llc_rkey_response_wake_function(struct wait_queue_entry *wq_entry,
+					       unsigned int mode, int sync, void *key)
+{
+	struct smc_llc_qentry *except, *incoming;
+	u8 except_llc_type;
+
+	/* not a rkey response */
+	if (!key)
+		return 0;
+
+	except = wq_entry->private;
+	incoming = key;
+
+	except_llc_type = except->msg.raw.hdr.common.llc_type;
+
+	/* except LLC MSG TYPE mismatch */
+	if (except_llc_type != incoming->msg.raw.hdr.common.llc_type)
+		return 0;
+
+	switch (except_llc_type) {
+	case SMC_LLC_CONFIRM_RKEY:
+		if (memcmp(except->msg.confirm_rkey.rtoken, incoming->msg.confirm_rkey.rtoken,
+			   sizeof(struct smc_rmb_rtoken) *
+			   except->msg.confirm_rkey.rtoken[0].num_rkeys))
+			return 0;
+		break;
+	case SMC_LLC_DELETE_RKEY:
+		if (memcmp(except->msg.delete_rkey.rkey, incoming->msg.delete_rkey.rkey,
+			   sizeof(__be32) * except->msg.delete_rkey.num_rkeys))
+			return 0;
+		break;
+	default:
+		panic("invalid except llc msg %d", except_llc_type);
+		return 0;
+	}
+
+	/* match, save hdr */
+	memcpy(&except->msg.raw.hdr, &incoming->msg.raw.hdr, sizeof(except->msg.raw.hdr));
+
+	wq_entry->private = except->private;
+	return woken_wake_function(wq_entry, mode, sync, NULL);
+}
+
 /* register a new rtoken at the remote peer (for all links) */
 int smc_llc_do_confirm_rkey(struct smc_link *send_link,
 			    struct smc_buf_desc *rmb_desc)
 {
+	long timeout = SMC_LLC_WAIT_TIME;
 	struct smc_link_group *lgr = send_link->lgr;
-	struct smc_llc_qentry *qentry = NULL;
-	int rc = 0;
+	struct smc_llc_qentry qentry;
+	struct smc_wr_tx_pend *pend;
+	struct smc_wr_tx_pend_priv *priv;
+	DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function);
+	int rc = 0, flags;
 
-	rc = smc_llc_send_confirm_rkey(send_link, rmb_desc);
+	if (!smc_wr_tx_link_hold(send_link))
+		return -ENOLINK;
+
+	rc = smc_llc_build_confirm_rkey_request(send_link, rmb_desc, &priv);
 	if (rc)
 		goto out;
-	/* receive CONFIRM RKEY response from server over RoCE fabric */
-	qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
-			      SMC_LLC_CONFIRM_RKEY);
-	if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
+
+	pend = container_of(priv, struct smc_wr_tx_pend, priv);
+	/* make a copy of send msg */
+	memcpy(&qentry.msg.confirm_rkey, send_link->wr_tx_bufs[pend->idx].raw,
+	       sizeof(qentry.msg.confirm_rkey));
+
+	qentry.private = wait.private;
+	wait.private = &qentry;
+
+	add_wait_queue(&lgr->llc_msg_waiter, &wait);
+
+	/* send llc message */
+	rc = smc_wr_tx_send(send_link, priv);
+	smc_wr_tx_link_put(send_link);
+	if (rc) {
+		remove_wait_queue(&lgr->llc_msg_waiter, &wait);
+		goto out;
+	}
+
+	while (!signal_pending(current) && timeout) {
+		timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
+		if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP)
+			break;
+	}
+
+	remove_wait_queue(&lgr->llc_msg_waiter, &wait);
+	flags = qentry.msg.raw.hdr.flags;
+
+	if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG)
 		rc = -EFAULT;
 out:
-	if (qentry)
-		smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
 	return rc;
 }
 
@@ -2217,26 +2283,52 @@  int smc_llc_do_confirm_rkey(struct smc_link *send_link,
 int smc_llc_do_delete_rkey(struct smc_link_group *lgr,
 			   struct smc_buf_desc *rmb_desc)
 {
-	struct smc_llc_qentry *qentry = NULL;
+	long timeout = SMC_LLC_WAIT_TIME;
+	struct smc_llc_qentry qentry;
+	struct smc_wr_tx_pend *pend;
 	struct smc_link *send_link;
-	int rc = 0;
+	struct smc_wr_tx_pend_priv *priv;
+	DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function);
+	int rc = 0, flags;
 
 	send_link = smc_llc_usable_link(lgr);
-	if (!send_link)
+	if (!send_link || !smc_wr_tx_link_hold(send_link))
 		return -ENOLINK;
 
-	/* protected by llc_flow control */
-	rc = smc_llc_send_delete_rkey(send_link, rmb_desc);
+	rc = smc_llc_build_delete_rkey_request(send_link, rmb_desc, &priv);
 	if (rc)
 		goto out;
-	/* receive DELETE RKEY response from server over RoCE fabric */
-	qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME,
-			      SMC_LLC_DELETE_RKEY);
-	if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG))
+
+	pend = container_of(priv, struct smc_wr_tx_pend, priv);
+	/* make a copy of send msg */
+	memcpy(&qentry.msg.delete_link, send_link->wr_tx_bufs[pend->idx].raw,
+	       sizeof(qentry.msg.delete_link));
+
+	qentry.private = wait.private;
+	wait.private = &qentry;
+
+	add_wait_queue(&lgr->llc_msg_waiter, &wait);
+
+	/* send llc message */
+	rc = smc_wr_tx_send(send_link, priv);
+	smc_wr_tx_link_put(send_link);
+	if (rc) {
+		remove_wait_queue(&lgr->llc_msg_waiter, &wait);
+		goto out;
+	}
+
+	while (!signal_pending(current) && timeout) {
+		timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout);
+		if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP)
+			break;
+	}
+
+	remove_wait_queue(&lgr->llc_msg_waiter, &wait);
+	flags = qentry.msg.raw.hdr.flags;
+
+	if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG)
 		rc = -EFAULT;
 out:
-	if (qentry)
-		smc_llc_flow_qentry_del(&lgr->llc_flow_lcl);
 	return rc;
 }
 
diff --git a/net/smc/smc_wr.c b/net/smc/smc_wr.c
index 26f8f24..52af94f 100644
--- a/net/smc/smc_wr.c
+++ b/net/smc/smc_wr.c
@@ -37,16 +37,6 @@ 
 static DEFINE_HASHTABLE(smc_wr_rx_hash, SMC_WR_RX_HASH_BITS);
 static DEFINE_SPINLOCK(smc_wr_rx_hash_lock);
 
-struct smc_wr_tx_pend {	/* control data for a pending send request */
-	u64			wr_id;		/* work request id sent */
-	smc_wr_tx_handler	handler;
-	enum ib_wc_status	wc_status;	/* CQE status */
-	struct smc_link		*link;
-	u32			idx;
-	struct smc_wr_tx_pend_priv priv;
-	u8			compl_requested;
-};
-
 /******************************** send queue *********************************/
 
 /*------------------------------- completion --------------------------------*/
diff --git a/net/smc/smc_wr.h b/net/smc/smc_wr.h
index a54e90a..9946ed5 100644
--- a/net/smc/smc_wr.h
+++ b/net/smc/smc_wr.h
@@ -46,6 +46,16 @@  struct smc_wr_rx_handler {
 	u8			type;
 };
 
+struct smc_wr_tx_pend {	/* control data for a pending send request */
+	u64			wr_id;		/* work request id sent */
+	smc_wr_tx_handler	handler;
+	enum ib_wc_status	wc_status;	/* CQE status */
+	struct smc_link		*link;
+	u32			idx;
+	struct smc_wr_tx_pend_priv priv;
+	u8			compl_requested;
+};
+
 /* Only used by RDMA write WRs.
  * All other WRs (CDC/LLC) use smc_wr_tx_send handling WR_ID implicitly
  */