From patchwork Tue Feb 7 07:36:13 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "D. Wythe" X-Patchwork-Id: 13131197 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 8333FC64EC6 for ; Tue, 7 Feb 2023 07:36:32 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S230168AbjBGHga (ORCPT ); Tue, 7 Feb 2023 02:36:30 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:57382 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S229588AbjBGHg2 (ORCPT ); Tue, 7 Feb 2023 02:36:28 -0500 Received: from out30-112.freemail.mail.aliyun.com (out30-112.freemail.mail.aliyun.com [115.124.30.112]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id F0575D522; Mon, 6 Feb 2023 23:36:24 -0800 (PST) X-Alimail-AntiSpam: AC=PASS;BC=-1|-1;BR=01201311R131e4;CH=green;DM=||false|;DS=||;FP=0|-1|-1|-1|0|-1|-1|-1;HT=ay29a033018046059;MF=alibuda@linux.alibaba.com;NM=1;PH=DS;RN=8;SR=0;TI=SMTPD_---0Vb6MLNj_1675755382; Received: from j66a10360.sqa.eu95.tbsite.net(mailfrom:alibuda@linux.alibaba.com fp:SMTPD_---0Vb6MLNj_1675755382) by smtp.aliyun-inc.com; Tue, 07 Feb 2023 15:36:22 +0800 From: "D. Wythe" To: kgraul@linux.ibm.com, wenjia@linux.ibm.com, jaka@linux.ibm.com Cc: kuba@kernel.org, davem@davemloft.net, netdev@vger.kernel.org, linux-s390@vger.kernel.org, linux-rdma@vger.kernel.org Subject: [net-next 1/2] net/smc: allow confirm/delete rkey response deliver multiplex Date: Tue, 7 Feb 2023 15:36:13 +0800 Message-Id: <1675755374-107598-2-git-send-email-alibuda@linux.alibaba.com> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1675755374-107598-1-git-send-email-alibuda@linux.alibaba.com> References: <1675755374-107598-1-git-send-email-alibuda@linux.alibaba.com> Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org From: "D. Wythe" We know that all flows except confirm_rkey and delete_rkey are exclusive, confirm/delete rkey flows can run concurrently (local and remote). Although the protocol allows, all flows are actually mutually exclusive in implementation, dues to waiting for LLC messages is in serial. This aggravates the time for establishing or destroying a SMC-R connections, connections have to be queued in smc_llc_wait. We use rtokens or rkey to correlate a confirm/delete rkey message with its response. Before sending a message, we put context with rtokens or rkey into wait queue. When a response message received, we wakeup the context which with the same rtokens or rkey against the response message. Signed-off-by: D. Wythe --- net/smc/smc_llc.c | 174 +++++++++++++++++++++++++++++++++++++++++------------- net/smc/smc_wr.c | 10 ---- net/smc/smc_wr.h | 10 ++++ 3 files changed, 143 insertions(+), 51 deletions(-) diff --git a/net/smc/smc_llc.c b/net/smc/smc_llc.c index a0840b8..d565909 100644 --- a/net/smc/smc_llc.c +++ b/net/smc/smc_llc.c @@ -200,6 +200,7 @@ struct smc_llc_msg_delete_rkey_v2 { /* type 0x29 */ struct smc_llc_qentry { struct list_head list; struct smc_link *link; + void *private; union smc_llc_msg msg; }; @@ -479,19 +480,17 @@ int smc_llc_send_confirm_link(struct smc_link *link, return rc; } -/* send LLC confirm rkey request */ -static int smc_llc_send_confirm_rkey(struct smc_link *send_link, - struct smc_buf_desc *rmb_desc) +/* build LLC confirm rkey request */ +static int smc_llc_build_confirm_rkey_request(struct smc_link *send_link, + struct smc_buf_desc *rmb_desc, + struct smc_wr_tx_pend_priv **priv) { struct smc_llc_msg_confirm_rkey *rkeyllc; - struct smc_wr_tx_pend_priv *pend; struct smc_wr_buf *wr_buf; struct smc_link *link; int i, rc, rtok_ix; - if (!smc_wr_tx_link_hold(send_link)) - return -ENOLINK; - rc = smc_llc_add_pending_send(send_link, &wr_buf, &pend); + rc = smc_llc_add_pending_send(send_link, &wr_buf, priv); if (rc) goto put_out; rkeyllc = (struct smc_llc_msg_confirm_rkey *)wr_buf; @@ -521,25 +520,20 @@ static int smc_llc_send_confirm_rkey(struct smc_link *send_link, cpu_to_be64((uintptr_t)rmb_desc->cpu_addr) : cpu_to_be64((u64)sg_dma_address (rmb_desc->sgt[send_link->link_idx].sgl)); - /* send llc message */ - rc = smc_wr_tx_send(send_link, pend); put_out: - smc_wr_tx_link_put(send_link); return rc; } -/* send LLC delete rkey request */ -static int smc_llc_send_delete_rkey(struct smc_link *link, - struct smc_buf_desc *rmb_desc) +/* build LLC delete rkey request */ +static int smc_llc_build_delete_rkey_request(struct smc_link *link, + struct smc_buf_desc *rmb_desc, + struct smc_wr_tx_pend_priv **priv) { struct smc_llc_msg_delete_rkey *rkeyllc; - struct smc_wr_tx_pend_priv *pend; struct smc_wr_buf *wr_buf; int rc; - if (!smc_wr_tx_link_hold(link)) - return -ENOLINK; - rc = smc_llc_add_pending_send(link, &wr_buf, &pend); + rc = smc_llc_add_pending_send(link, &wr_buf, priv); if (rc) goto put_out; rkeyllc = (struct smc_llc_msg_delete_rkey *)wr_buf; @@ -548,10 +542,7 @@ static int smc_llc_send_delete_rkey(struct smc_link *link, smc_llc_init_msg_hdr(&rkeyllc->hd, link->lgr, sizeof(*rkeyllc)); rkeyllc->num_rkeys = 1; rkeyllc->rkey[0] = htonl(rmb_desc->mr[link->link_idx]->rkey); - /* send llc message */ - rc = smc_wr_tx_send(link, pend); put_out: - smc_wr_tx_link_put(link); return rc; } @@ -2017,7 +2008,8 @@ static void smc_llc_rx_response(struct smc_link *link, case SMC_LLC_DELETE_RKEY: if (flowtype != SMC_LLC_FLOW_RKEY || flow->qentry) break; /* drop out-of-flow response */ - goto assign; + __wake_up(&link->lgr->llc_msg_waiter, TASK_NORMAL, 1, qentry); + goto free; case SMC_LLC_CONFIRM_RKEY_CONT: /* not used because max links is 3 */ break; @@ -2026,6 +2018,7 @@ static void smc_llc_rx_response(struct smc_link *link, qentry->msg.raw.hdr.common.type); break; } +free: kfree(qentry); return; assign: @@ -2184,25 +2177,98 @@ void smc_llc_link_clear(struct smc_link *link, bool log) cancel_delayed_work_sync(&link->llc_testlink_wrk); } +static int smc_llc_rkey_response_wake_function(struct wait_queue_entry *wq_entry, + unsigned int mode, int sync, void *key) +{ + struct smc_llc_qentry *except, *incoming; + u8 except_llc_type; + + /* not a rkey response */ + if (!key) + return 0; + + except = wq_entry->private; + incoming = key; + + except_llc_type = except->msg.raw.hdr.common.llc_type; + + /* except LLC MSG TYPE mismatch */ + if (except_llc_type != incoming->msg.raw.hdr.common.llc_type) + return 0; + + switch (except_llc_type) { + case SMC_LLC_CONFIRM_RKEY: + if (memcmp(except->msg.confirm_rkey.rtoken, incoming->msg.confirm_rkey.rtoken, + sizeof(struct smc_rmb_rtoken) * + except->msg.confirm_rkey.rtoken[0].num_rkeys)) + return 0; + break; + case SMC_LLC_DELETE_RKEY: + if (memcmp(except->msg.delete_rkey.rkey, incoming->msg.delete_rkey.rkey, + sizeof(__be32) * except->msg.delete_rkey.num_rkeys)) + return 0; + break; + default: + pr_warn("smc: invalid except llc msg %d.\n", except_llc_type); + return 0; + } + + /* match, save hdr */ + memcpy(&except->msg.raw.hdr, &incoming->msg.raw.hdr, sizeof(except->msg.raw.hdr)); + + wq_entry->private = except->private; + return woken_wake_function(wq_entry, mode, sync, NULL); +} + /* register a new rtoken at the remote peer (for all links) */ int smc_llc_do_confirm_rkey(struct smc_link *send_link, struct smc_buf_desc *rmb_desc) { + DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function); struct smc_link_group *lgr = send_link->lgr; - struct smc_llc_qentry *qentry = NULL; - int rc = 0; + long timeout = SMC_LLC_WAIT_TIME; + struct smc_wr_tx_pend_priv *priv; + struct smc_llc_qentry qentry; + struct smc_wr_tx_pend *pend; + int rc = 0, flags; - rc = smc_llc_send_confirm_rkey(send_link, rmb_desc); + if (!smc_wr_tx_link_hold(send_link)) + return -ENOLINK; + + rc = smc_llc_build_confirm_rkey_request(send_link, rmb_desc, &priv); if (rc) goto out; - /* receive CONFIRM RKEY response from server over RoCE fabric */ - qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME, - SMC_LLC_CONFIRM_RKEY); - if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG)) + + pend = container_of(priv, struct smc_wr_tx_pend, priv); + /* make a copy of send msg */ + memcpy(&qentry.msg.confirm_rkey, send_link->wr_tx_bufs[pend->idx].raw, + sizeof(qentry.msg.confirm_rkey)); + + qentry.private = wait.private; + wait.private = &qentry; + + add_wait_queue(&lgr->llc_msg_waiter, &wait); + + /* send llc message */ + rc = smc_wr_tx_send(send_link, priv); + smc_wr_tx_link_put(send_link); + if (rc) { + remove_wait_queue(&lgr->llc_msg_waiter, &wait); + goto out; + } + + while (!signal_pending(current) && timeout) { + timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout); + if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP) + break; + } + + remove_wait_queue(&lgr->llc_msg_waiter, &wait); + flags = qentry.msg.raw.hdr.flags; + + if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG) rc = -EFAULT; out: - if (qentry) - smc_llc_flow_qentry_del(&lgr->llc_flow_lcl); return rc; } @@ -2210,26 +2276,52 @@ int smc_llc_do_confirm_rkey(struct smc_link *send_link, int smc_llc_do_delete_rkey(struct smc_link_group *lgr, struct smc_buf_desc *rmb_desc) { - struct smc_llc_qentry *qentry = NULL; + DEFINE_WAIT_FUNC(wait, smc_llc_rkey_response_wake_function); + long timeout = SMC_LLC_WAIT_TIME; + struct smc_wr_tx_pend_priv *priv; + struct smc_llc_qentry qentry; + struct smc_wr_tx_pend *pend; struct smc_link *send_link; - int rc = 0; + int rc = 0, flags; send_link = smc_llc_usable_link(lgr); - if (!send_link) + if (!send_link || !smc_wr_tx_link_hold(send_link)) return -ENOLINK; - /* protected by llc_flow control */ - rc = smc_llc_send_delete_rkey(send_link, rmb_desc); + rc = smc_llc_build_delete_rkey_request(send_link, rmb_desc, &priv); if (rc) goto out; - /* receive DELETE RKEY response from server over RoCE fabric */ - qentry = smc_llc_wait(lgr, send_link, SMC_LLC_WAIT_TIME, - SMC_LLC_DELETE_RKEY); - if (!qentry || (qentry->msg.raw.hdr.flags & SMC_LLC_FLAG_RKEY_NEG)) + + pend = container_of(priv, struct smc_wr_tx_pend, priv); + /* make a copy of send msg */ + memcpy(&qentry.msg.delete_link, send_link->wr_tx_bufs[pend->idx].raw, + sizeof(qentry.msg.delete_link)); + + qentry.private = wait.private; + wait.private = &qentry; + + add_wait_queue(&lgr->llc_msg_waiter, &wait); + + /* send llc message */ + rc = smc_wr_tx_send(send_link, priv); + smc_wr_tx_link_put(send_link); + if (rc) { + remove_wait_queue(&lgr->llc_msg_waiter, &wait); + goto out; + } + + while (!signal_pending(current) && timeout) { + timeout = wait_woken(&wait, TASK_INTERRUPTIBLE, timeout); + if (qentry.msg.raw.hdr.flags & SMC_LLC_FLAG_RESP) + break; + } + + remove_wait_queue(&lgr->llc_msg_waiter, &wait); + flags = qentry.msg.raw.hdr.flags; + + if (!(flags & SMC_LLC_FLAG_RESP) || flags & SMC_LLC_FLAG_RKEY_NEG) rc = -EFAULT; out: - if (qentry) - smc_llc_flow_qentry_del(&lgr->llc_flow_lcl); return rc; } diff --git a/net/smc/smc_wr.c b/net/smc/smc_wr.c index b0678a4..797dffa 100644 --- a/net/smc/smc_wr.c +++ b/net/smc/smc_wr.c @@ -37,16 +37,6 @@ static DEFINE_HASHTABLE(smc_wr_rx_hash, SMC_WR_RX_HASH_BITS); static DEFINE_SPINLOCK(smc_wr_rx_hash_lock); -struct smc_wr_tx_pend { /* control data for a pending send request */ - u64 wr_id; /* work request id sent */ - smc_wr_tx_handler handler; - enum ib_wc_status wc_status; /* CQE status */ - struct smc_link *link; - u32 idx; - struct smc_wr_tx_pend_priv priv; - u8 compl_requested; -}; - /******************************** send queue *********************************/ /*------------------------------- completion --------------------------------*/ diff --git a/net/smc/smc_wr.h b/net/smc/smc_wr.h index 45e9b89..a4ea215 100644 --- a/net/smc/smc_wr.h +++ b/net/smc/smc_wr.h @@ -46,6 +46,16 @@ struct smc_wr_rx_handler { u8 type; }; +struct smc_wr_tx_pend { /* control data for a pending send request */ + u64 wr_id; /* work request id sent */ + smc_wr_tx_handler handler; + enum ib_wc_status wc_status; /* CQE status */ + struct smc_link *link; + u32 idx; + struct smc_wr_tx_pend_priv priv; + u8 compl_requested; +}; + /* Only used by RDMA write WRs. * All other WRs (CDC/LLC) use smc_wr_tx_send handling WR_ID implicitly */ From patchwork Tue Feb 7 07:36:14 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "D. Wythe" X-Patchwork-Id: 13131196 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 8CD50C64EC5 for ; Tue, 7 Feb 2023 07:36:31 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S229942AbjBGHg3 (ORCPT ); Tue, 7 Feb 2023 02:36:29 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:57384 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S229607AbjBGHg2 (ORCPT ); Tue, 7 Feb 2023 02:36:28 -0500 Received: from out30-98.freemail.mail.aliyun.com (out30-98.freemail.mail.aliyun.com [115.124.30.98]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 998E219F2D; Mon, 6 Feb 2023 23:36:25 -0800 (PST) X-Alimail-AntiSpam: AC=PASS;BC=-1|-1;BR=01201311R591e4;CH=green;DM=||false|;DS=||;FP=0|-1|-1|-1|0|-1|-1|-1;HT=ay29a033018046050;MF=alibuda@linux.alibaba.com;NM=1;PH=DS;RN=8;SR=0;TI=SMTPD_---0Vb6MLNr_1675755382; Received: from j66a10360.sqa.eu95.tbsite.net(mailfrom:alibuda@linux.alibaba.com fp:SMTPD_---0Vb6MLNr_1675755382) by smtp.aliyun-inc.com; Tue, 07 Feb 2023 15:36:23 +0800 From: "D. Wythe" To: kgraul@linux.ibm.com, wenjia@linux.ibm.com, jaka@linux.ibm.com Cc: kuba@kernel.org, davem@davemloft.net, netdev@vger.kernel.org, linux-s390@vger.kernel.org, linux-rdma@vger.kernel.org Subject: [net-next 2/2] net/smc: make SMC_LLC_FLOW_RKEY run concurrently Date: Tue, 7 Feb 2023 15:36:14 +0800 Message-Id: <1675755374-107598-3-git-send-email-alibuda@linux.alibaba.com> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1675755374-107598-1-git-send-email-alibuda@linux.alibaba.com> References: <1675755374-107598-1-git-send-email-alibuda@linux.alibaba.com> Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org From: "D. Wythe" Once confirm/delete rkey response can be multiplex delivered, We can allow parallel execution of start (remote) or initialization (local) a SMC_LLC_FLOW_RKEY flow. This patch will count the flows executed in parallel, and only when the count reaches zero will the current flow type be removed. Signed-off-by: D. Wythe --- net/smc/smc_core.h | 1 + net/smc/smc_llc.c | 89 ++++++++++++++++++++++++++++++++++++++++++------------ net/smc/smc_llc.h | 6 ++++ 3 files changed, 77 insertions(+), 19 deletions(-) diff --git a/net/smc/smc_core.h b/net/smc/smc_core.h index 08b457c..1fc17d3 100644 --- a/net/smc/smc_core.h +++ b/net/smc/smc_core.h @@ -242,6 +242,7 @@ enum smc_llc_flowtype { struct smc_llc_flow { enum smc_llc_flowtype type; struct smc_llc_qentry *qentry; + refcount_t parallel_refcnt; }; struct smc_link_group { diff --git a/net/smc/smc_llc.c b/net/smc/smc_llc.c index d565909..47146ff 100644 --- a/net/smc/smc_llc.c +++ b/net/smc/smc_llc.c @@ -231,15 +231,23 @@ static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow, flow->qentry = qentry; } -static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type, +static bool smc_llc_flow_parallel(struct smc_link_group *lgr, struct smc_llc_flow *flow, struct smc_llc_qentry *qentry) { u8 msg_type = qentry->msg.raw.hdr.common.llc_type; + u8 flow_type = flow->type; + + /* SMC_LLC_FLOW_RKEY can be parallel */ + if (flow_type == SMC_LLC_FLOW_RKEY && + (msg_type == SMC_LLC_CONFIRM_RKEY || msg_type == SMC_LLC_DELETE_RKEY)) { + refcount_inc(&flow->parallel_refcnt); + return true; + } if ((msg_type == SMC_LLC_ADD_LINK || msg_type == SMC_LLC_DELETE_LINK) && flow_type != msg_type && !lgr->delayed_event) { lgr->delayed_event = qentry; - return; + return false; } /* drop parallel or already-in-progress llc requests */ if (flow_type != msg_type) @@ -250,6 +258,7 @@ static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type, qentry->msg.raw.hdr.common.type, flow_type, lgr->role); kfree(qentry); + return false; } /* try to start a new llc flow, initiated by an incoming llc msg */ @@ -257,13 +266,14 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow, struct smc_llc_qentry *qentry) { struct smc_link_group *lgr = qentry->link->lgr; + bool allow_start = true; spin_lock_bh(&lgr->llc_flow_lock); if (flow->type) { /* a flow is already active */ - smc_llc_flow_parallel(lgr, flow->type, qentry); + allow_start = smc_llc_flow_parallel(lgr, flow, qentry); spin_unlock_bh(&lgr->llc_flow_lock); - return false; + return allow_start; } switch (qentry->msg.raw.hdr.common.llc_type) { case SMC_LLC_ADD_LINK: @@ -280,8 +290,9 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow, flow->type = SMC_LLC_FLOW_NONE; } smc_llc_flow_qentry_set(flow, qentry); + refcount_set(&flow->parallel_refcnt, 1); spin_unlock_bh(&lgr->llc_flow_lock); - return true; + return allow_start; } /* start a new local llc flow, wait till current flow finished */ @@ -289,6 +300,7 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr, enum smc_llc_flowtype type) { enum smc_llc_flowtype allowed_remote = SMC_LLC_FLOW_NONE; + bool accept = false; int rc; /* all flows except confirm_rkey and delete_rkey are exclusive, @@ -300,10 +312,39 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr, if (list_empty(&lgr->list)) return -ENODEV; spin_lock_bh(&lgr->llc_flow_lock); - if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE && - (lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE || - lgr->llc_flow_rmt.type == allowed_remote)) { - lgr->llc_flow_lcl.type = type; + + /* Flow is initialized only if the following conditions are met: + * incoming flow local flow remote flow + * exclusive NONE NONE + * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY + * SMC_LLC_FLOW_RKEY NONE SMC_LLC_FLOW_RKEY + * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY NONE + */ + switch (type) { + case SMC_LLC_FLOW_RKEY: + if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_lcl.type)) + break; + if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_rmt.type)) + break; + /* accepted */ + accept = true; + break; + default: + if (!SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type)) + break; + if (!SMC_IS_NONE_FLOW(lgr->llc_flow_rmt.type)) + break; + /* accepted */ + accept = true; + break; + } + if (accept) { + if (SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type)) { + lgr->llc_flow_lcl.type = type; + refcount_set(&lgr->llc_flow_lcl.parallel_refcnt, 1); + } else { + refcount_inc(&lgr->llc_flow_lcl.parallel_refcnt); + } spin_unlock_bh(&lgr->llc_flow_lock); return 0; } @@ -322,6 +363,16 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr, void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow) { spin_lock_bh(&lgr->llc_flow_lock); + if (!refcount_dec_and_test(&flow->parallel_refcnt)) { + spin_unlock_bh(&lgr->llc_flow_lock); + return; + } + /* free the first parallel flow, At present, + * only confirm rkey and delete rkey flow will use it. + */ + if (flow->qentry) + smc_llc_flow_qentry_del(flow); + memset(flow, 0, sizeof(*flow)); flow->type = SMC_LLC_FLOW_NONE; spin_unlock_bh(&lgr->llc_flow_lock); @@ -1723,16 +1774,14 @@ static void smc_llc_delete_link_work(struct work_struct *work) } /* process a confirm_rkey request from peer, remote flow */ -static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr) +static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry) { struct smc_llc_msg_confirm_rkey *llc; - struct smc_llc_qentry *qentry; struct smc_link *link; int num_entries; int rk_idx; int i; - qentry = lgr->llc_flow_rmt.qentry; llc = &qentry->msg.confirm_rkey; link = qentry->link; @@ -1759,19 +1808,19 @@ static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr) llc->hd.flags |= SMC_LLC_FLAG_RESP; smc_llc_init_msg_hdr(&llc->hd, link->lgr, sizeof(*llc)); smc_llc_send_message(link, &qentry->msg); - smc_llc_flow_qentry_del(&lgr->llc_flow_rmt); + /* parallel subflow, keep the first flow until ref cnt goes to zero */ + if (qentry != lgr->llc_flow_rmt.qentry) + kfree(qentry); } /* process a delete_rkey request from peer, remote flow */ -static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr) +static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry) { struct smc_llc_msg_delete_rkey *llc; - struct smc_llc_qentry *qentry; struct smc_link *link; u8 err_mask = 0; int i, max; - qentry = lgr->llc_flow_rmt.qentry; llc = &qentry->msg.delete_rkey; link = qentry->link; @@ -1809,7 +1858,9 @@ static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr) finish: llc->hd.flags |= SMC_LLC_FLAG_RESP; smc_llc_send_message(link, &qentry->msg); - smc_llc_flow_qentry_del(&lgr->llc_flow_rmt); + /* parallel subflow, keep the first flow until ref cnt goes to zero */ + if (qentry != lgr->llc_flow_rmt.qentry) + kfree(qentry); } static void smc_llc_protocol_violation(struct smc_link_group *lgr, u8 type) @@ -1910,7 +1961,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry) /* new request from remote, assign to remote flow */ if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) { /* process here, does not wait for more llc msgs */ - smc_llc_rmt_conf_rkey(lgr); + smc_llc_rmt_conf_rkey(lgr, qentry); smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt); } return; @@ -1923,7 +1974,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry) /* new request from remote, assign to remote flow */ if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) { /* process here, does not wait for more llc msgs */ - smc_llc_rmt_delete_rkey(lgr); + smc_llc_rmt_delete_rkey(lgr, qentry); smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt); } return; diff --git a/net/smc/smc_llc.h b/net/smc/smc_llc.h index 7e7a316..cb217793 100644 --- a/net/smc/smc_llc.h +++ b/net/smc/smc_llc.h @@ -49,6 +49,12 @@ enum smc_llc_msg_type { #define smc_link_downing(state) \ (cmpxchg(state, SMC_LNK_ACTIVE, SMC_LNK_INACTIVE) == SMC_LNK_ACTIVE) +#define SMC_IS_NONE_FLOW(type) \ + ((type) == SMC_LLC_FLOW_NONE) + +#define SMC_IS_PARALLEL_FLOW(type) \ + (((type) == SMC_LLC_FLOW_RKEY) || SMC_IS_NONE_FLOW(type)) + /* LLC DELETE LINK Request Reason Codes */ #define SMC_LLC_DEL_LOST_PATH 0x00010000 #define SMC_LLC_DEL_OP_INIT_TERM 0x00020000