@@ -242,6 +242,7 @@ enum smc_llc_flowtype {
struct smc_llc_flow {
enum smc_llc_flowtype type;
struct smc_llc_qentry *qentry;
+ refcount_t parallel_refcnt;
};
struct smc_link_group {
@@ -231,15 +231,23 @@ static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow,
flow->qentry = qentry;
}
-static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type,
+static bool smc_llc_flow_parallel(struct smc_link_group *lgr, struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
u8 msg_type = qentry->msg.raw.hdr.common.llc_type;
+ u8 flow_type = flow->type;
+
+ /* SMC_LLC_FLOW_RKEY can be parallel */
+ if (flow_type == SMC_LLC_FLOW_RKEY &&
+ (msg_type == SMC_LLC_CONFIRM_RKEY || msg_type == SMC_LLC_DELETE_RKEY)) {
+ refcount_inc(&flow->parallel_refcnt);
+ return true;
+ }
if ((msg_type == SMC_LLC_ADD_LINK || msg_type == SMC_LLC_DELETE_LINK) &&
flow_type != msg_type && !lgr->delayed_event) {
lgr->delayed_event = qentry;
- return;
+ return false;
}
/* drop parallel or already-in-progress llc requests */
if (flow_type != msg_type)
@@ -250,6 +258,7 @@ static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type,
qentry->msg.raw.hdr.common.type,
flow_type, lgr->role);
kfree(qentry);
+ return false;
}
/* try to start a new llc flow, initiated by an incoming llc msg */
@@ -257,13 +266,14 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
struct smc_link_group *lgr = qentry->link->lgr;
+ bool allow_start = true;
spin_lock_bh(&lgr->llc_flow_lock);
if (flow->type) {
/* a flow is already active */
- smc_llc_flow_parallel(lgr, flow->type, qentry);
+ allow_start = smc_llc_flow_parallel(lgr, flow, qentry);
spin_unlock_bh(&lgr->llc_flow_lock);
- return false;
+ return allow_start;
}
switch (qentry->msg.raw.hdr.common.llc_type) {
case SMC_LLC_ADD_LINK:
@@ -280,8 +290,9 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
flow->type = SMC_LLC_FLOW_NONE;
}
smc_llc_flow_qentry_set(flow, qentry);
+ refcount_set(&flow->parallel_refcnt, 1);
spin_unlock_bh(&lgr->llc_flow_lock);
- return true;
+ return allow_start;
}
/* start a new local llc flow, wait till current flow finished */
@@ -289,6 +300,7 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type)
{
enum smc_llc_flowtype allowed_remote = SMC_LLC_FLOW_NONE;
+ bool accept = false;
int rc;
/* all flows except confirm_rkey and delete_rkey are exclusive,
@@ -300,10 +312,39 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
if (list_empty(&lgr->list))
return -ENODEV;
spin_lock_bh(&lgr->llc_flow_lock);
- if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
- (lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
- lgr->llc_flow_rmt.type == allowed_remote)) {
- lgr->llc_flow_lcl.type = type;
+
+ /* Flow is initialized only if the following conditions are met:
+ * incoming flow local flow remote flow
+ * exclusive NONE NONE
+ * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY
+ * SMC_LLC_FLOW_RKEY NONE SMC_LLC_FLOW_RKEY
+ * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY NONE
+ */
+ switch (type) {
+ case SMC_LLC_FLOW_RKEY:
+ if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_lcl.type))
+ break;
+ if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_rmt.type))
+ break;
+ /* accepted */
+ accept = true;
+ break;
+ default:
+ if (!SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type))
+ break;
+ if (!SMC_IS_NONE_FLOW(lgr->llc_flow_rmt.type))
+ break;
+ /* accepted */
+ accept = true;
+ break;
+ }
+ if (accept) {
+ if (SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type)) {
+ lgr->llc_flow_lcl.type = type;
+ refcount_set(&lgr->llc_flow_lcl.parallel_refcnt, 1);
+ } else {
+ refcount_inc(&lgr->llc_flow_lcl.parallel_refcnt);
+ }
spin_unlock_bh(&lgr->llc_flow_lock);
return 0;
}
@@ -322,6 +363,16 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow)
{
spin_lock_bh(&lgr->llc_flow_lock);
+ if (!refcount_dec_and_test(&flow->parallel_refcnt)) {
+ spin_unlock_bh(&lgr->llc_flow_lock);
+ return;
+ }
+ /* free the first parallel flow, At present,
+ * only confirm rkey and delete rkey flow will use it.
+ */
+ if (flow->qentry)
+ smc_llc_flow_qentry_del(flow);
+
memset(flow, 0, sizeof(*flow));
flow->type = SMC_LLC_FLOW_NONE;
spin_unlock_bh(&lgr->llc_flow_lock);
@@ -1723,16 +1774,14 @@ static void smc_llc_delete_link_work(struct work_struct *work)
}
/* process a confirm_rkey request from peer, remote flow */
-static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr)
+static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry)
{
struct smc_llc_msg_confirm_rkey *llc;
- struct smc_llc_qentry *qentry;
struct smc_link *link;
int num_entries;
int rk_idx;
int i;
- qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.confirm_rkey;
link = qentry->link;
@@ -1759,19 +1808,19 @@ static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr)
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_init_msg_hdr(&llc->hd, link->lgr, sizeof(*llc));
smc_llc_send_message(link, &qentry->msg);
- smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
+ /* parallel subflow, keep the first flow until ref cnt goes to zero */
+ if (qentry != lgr->llc_flow_rmt.qentry)
+ kfree(qentry);
}
/* process a delete_rkey request from peer, remote flow */
-static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
+static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry)
{
struct smc_llc_msg_delete_rkey *llc;
- struct smc_llc_qentry *qentry;
struct smc_link *link;
u8 err_mask = 0;
int i, max;
- qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.delete_rkey;
link = qentry->link;
@@ -1809,7 +1858,9 @@ static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
finish:
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, &qentry->msg);
- smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
+ /* parallel subflow, keep the first flow until ref cnt goes to zero */
+ if (qentry != lgr->llc_flow_rmt.qentry)
+ kfree(qentry);
}
static void smc_llc_protocol_violation(struct smc_link_group *lgr, u8 type)
@@ -1910,7 +1961,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
- smc_llc_rmt_conf_rkey(lgr);
+ smc_llc_rmt_conf_rkey(lgr, qentry);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
@@ -1923,7 +1974,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
- smc_llc_rmt_delete_rkey(lgr);
+ smc_llc_rmt_delete_rkey(lgr, qentry);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
@@ -49,6 +49,12 @@ enum smc_llc_msg_type {
#define smc_link_downing(state) \
(cmpxchg(state, SMC_LNK_ACTIVE, SMC_LNK_INACTIVE) == SMC_LNK_ACTIVE)
+#define SMC_IS_NONE_FLOW(type) \
+ ((type) == SMC_LLC_FLOW_NONE)
+
+#define SMC_IS_PARALLEL_FLOW(type) \
+ (((type) == SMC_LLC_FLOW_RKEY) || SMC_IS_NONE_FLOW(type))
+
/* LLC DELETE LINK Request Reason Codes */
#define SMC_LLC_DEL_LOST_PATH 0x00010000
#define SMC_LLC_DEL_OP_INIT_TERM 0x00020000