@@ -286,6 +286,7 @@ enum smc_llc_flowtype {
struct smc_llc_flow {
enum smc_llc_flowtype type;
struct smc_llc_qentry *qentry;
+ refcount_t parallel_refcnt;
};
struct smc_link_group {
@@ -231,10 +231,18 @@ static inline void smc_llc_flow_qentry_set(struct smc_llc_flow *flow,
flow->qentry = qentry;
}
-static void smc_llc_flow_parallel(struct smc_link_group *lgr, u8 flow_type,
+static void smc_llc_flow_parallel(struct smc_link_group *lgr, struct smc_llc_flow *flow,
struct smc_llc_qentry *qentry)
{
u8 msg_type = qentry->msg.raw.hdr.common.llc_type;
+ u8 flow_type = flow->type;
+
+ /* SMC_LLC_FLOW_RKEY can be parallel */
+ if (flow_type == SMC_LLC_FLOW_RKEY &&
+ (msg_type == SMC_LLC_CONFIRM_RKEY || msg_type == SMC_LLC_DELETE_RKEY)) {
+ refcount_inc(&flow->parallel_refcnt);
+ return;
+ }
if ((msg_type == SMC_LLC_ADD_LINK || msg_type == SMC_LLC_DELETE_LINK) &&
flow_type != msg_type && !lgr->delayed_event) {
@@ -261,7 +269,7 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
spin_lock_bh(&lgr->llc_flow_lock);
if (flow->type) {
/* a flow is already active */
- smc_llc_flow_parallel(lgr, flow->type, qentry);
+ smc_llc_flow_parallel(lgr, flow, qentry);
spin_unlock_bh(&lgr->llc_flow_lock);
return false;
}
@@ -280,6 +288,7 @@ static bool smc_llc_flow_start(struct smc_llc_flow *flow,
flow->type = SMC_LLC_FLOW_NONE;
}
smc_llc_flow_qentry_set(flow, qentry);
+ refcount_set(&flow->parallel_refcnt, 1);
spin_unlock_bh(&lgr->llc_flow_lock);
return true;
}
@@ -289,6 +298,7 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
enum smc_llc_flowtype type)
{
enum smc_llc_flowtype allowed_remote = SMC_LLC_FLOW_NONE;
+ bool accept = false;
int rc;
/* all flows except confirm_rkey and delete_rkey are exclusive,
@@ -300,10 +310,39 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
if (list_empty(&lgr->list))
return -ENODEV;
spin_lock_bh(&lgr->llc_flow_lock);
- if (lgr->llc_flow_lcl.type == SMC_LLC_FLOW_NONE &&
- (lgr->llc_flow_rmt.type == SMC_LLC_FLOW_NONE ||
- lgr->llc_flow_rmt.type == allowed_remote)) {
- lgr->llc_flow_lcl.type = type;
+
+ /* Flow is initialized only if the following conditions are met:
+ * incoming flow local flow remote flow
+ * exclusive NONE NONE
+ * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY
+ * SMC_LLC_FLOW_RKEY NONE SMC_LLC_FLOW_RKEY
+ * SMC_LLC_FLOW_RKEY SMC_LLC_FLOW_RKEY NONE
+ */
+ switch (type) {
+ case SMC_LLC_FLOW_RKEY:
+ if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_lcl.type))
+ break;
+ if (!SMC_IS_PARALLEL_FLOW(lgr->llc_flow_rmt.type))
+ break;
+ /* accepted */
+ accept = true;
+ break;
+ default:
+ if (!SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type))
+ break;
+ if (!SMC_IS_NONE_FLOW(lgr->llc_flow_rmt.type))
+ break;
+ /* accepted */
+ accept = true;
+ break;
+ }
+ if (accept) {
+ if (SMC_IS_NONE_FLOW(lgr->llc_flow_lcl.type)) {
+ lgr->llc_flow_lcl.type = type;
+ refcount_set(&lgr->llc_flow_lcl.parallel_refcnt, 1);
+ } else {
+ refcount_inc(&lgr->llc_flow_lcl.parallel_refcnt);
+ }
spin_unlock_bh(&lgr->llc_flow_lock);
return 0;
}
@@ -322,6 +361,10 @@ int smc_llc_flow_initiate(struct smc_link_group *lgr,
void smc_llc_flow_stop(struct smc_link_group *lgr, struct smc_llc_flow *flow)
{
spin_lock_bh(&lgr->llc_flow_lock);
+ if (!refcount_dec_and_test(&flow->parallel_refcnt)) {
+ spin_unlock_bh(&lgr->llc_flow_lock);
+ return;
+ }
memset(flow, 0, sizeof(*flow));
flow->type = SMC_LLC_FLOW_NONE;
spin_unlock_bh(&lgr->llc_flow_lock);
@@ -1729,16 +1772,14 @@ static void smc_llc_delete_link_work(struct work_struct *work)
}
/* process a confirm_rkey request from peer, remote flow */
-static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr)
+static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry)
{
struct smc_llc_msg_confirm_rkey *llc;
- struct smc_llc_qentry *qentry;
struct smc_link *link;
int num_entries;
int rk_idx;
int i;
- qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.confirm_rkey;
link = qentry->link;
@@ -1765,19 +1806,16 @@ static void smc_llc_rmt_conf_rkey(struct smc_link_group *lgr)
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_init_msg_hdr(&llc->hd, link->lgr, sizeof(*llc));
smc_llc_send_message(link, &qentry->msg);
- smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
}
/* process a delete_rkey request from peer, remote flow */
-static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
+static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr, struct smc_llc_qentry *qentry)
{
struct smc_llc_msg_delete_rkey *llc;
- struct smc_llc_qentry *qentry;
struct smc_link *link;
u8 err_mask = 0;
int i, max;
- qentry = lgr->llc_flow_rmt.qentry;
llc = &qentry->msg.delete_rkey;
link = qentry->link;
@@ -1815,7 +1853,6 @@ static void smc_llc_rmt_delete_rkey(struct smc_link_group *lgr)
finish:
llc->hd.flags |= SMC_LLC_FLAG_RESP;
smc_llc_send_message(link, &qentry->msg);
- smc_llc_flow_qentry_del(&lgr->llc_flow_rmt);
}
static void smc_llc_protocol_violation(struct smc_link_group *lgr, u8 type)
@@ -1916,7 +1953,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
- smc_llc_rmt_conf_rkey(lgr);
+ smc_llc_rmt_conf_rkey(lgr, qentry);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
@@ -1929,7 +1966,7 @@ static void smc_llc_event_handler(struct smc_llc_qentry *qentry)
/* new request from remote, assign to remote flow */
if (smc_llc_flow_start(&lgr->llc_flow_rmt, qentry)) {
/* process here, does not wait for more llc msgs */
- smc_llc_rmt_delete_rkey(lgr);
+ smc_llc_rmt_delete_rkey(lgr, qentry);
smc_llc_flow_stop(lgr, &lgr->llc_flow_rmt);
}
return;
@@ -48,6 +48,12 @@ enum smc_llc_msg_type {
#define smc_link_downing(state) \
(cmpxchg(state, SMC_LNK_ACTIVE, SMC_LNK_INACTIVE) == SMC_LNK_ACTIVE)
+#define SMC_IS_NONE_FLOW(type) \
+ ((type) == SMC_LLC_FLOW_NONE)
+
+#define SMC_IS_PARALLEL_FLOW(type) \
+ (((type) == SMC_LLC_FLOW_RKEY) || SMC_IS_NONE_FLOW(type))
+
/* LLC DELETE LINK Request Reason Codes */
#define SMC_LLC_DEL_LOST_PATH 0x00010000
#define SMC_LLC_DEL_OP_INIT_TERM 0x00020000