@@ -657,6 +657,7 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
}
if (!msg->msg_peertxcredit) {
+ spin_lock(&lp->lpni_lock);
LASSERT((lp->lpni_txcredits < 0) ==
!list_empty(&lp->lpni_txq));
@@ -670,8 +671,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
if (lp->lpni_txcredits < 0) {
msg->msg_tx_delayed = 1;
list_add_tail(&msg->msg_list, &lp->lpni_txq);
+ spin_unlock(&lp->lpni_lock);
return LNET_CREDIT_WAIT;
}
+ spin_unlock(&lp->lpni_lock);
}
if (!msg->msg_txcredit) {
@@ -744,6 +747,7 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
LASSERT(!do_recv || msg->msg_rx_delayed);
if (!msg->msg_peerrtrcredit) {
+ spin_lock(&lp->lpni_lock);
LASSERT((lp->lpni_rtrcredits < 0) ==
!list_empty(&lp->lpni_rtrq));
@@ -757,8 +761,10 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
LASSERT(msg->msg_rx_ready_delay);
msg->msg_rx_delayed = 1;
list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
+ spin_unlock(&lp->lpni_lock);
return LNET_CREDIT_WAIT;
}
+ spin_unlock(&lp->lpni_lock);
}
rbp = lnet_msg2bufpool(msg);
@@ -822,6 +828,7 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
LASSERT(msg2->msg_txni == ni);
LASSERT(msg2->msg_tx_delayed);
+ LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
(void)lnet_post_send_locked(msg2, 1);
}
@@ -831,6 +838,7 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
/* give back peer txcredits */
msg->msg_peertxcredit = 0;
+ spin_lock(&txpeer->lpni_lock);
LASSERT((txpeer->lpni_txcredits < 0) ==
!list_empty(&txpeer->lpni_txq));
@@ -842,11 +850,22 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
msg2 = list_entry(txpeer->lpni_txq.next,
struct lnet_msg, msg_list);
list_del(&msg2->msg_list);
+ spin_unlock(&txpeer->lpni_lock);
LASSERT(msg2->msg_txpeer == txpeer);
LASSERT(msg2->msg_tx_delayed);
+ if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+ lnet_net_unlock(msg->msg_tx_cpt);
+ lnet_net_lock(msg2->msg_tx_cpt);
+ }
(void)lnet_post_send_locked(msg2, 1);
+ if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+ lnet_net_unlock(msg2->msg_tx_cpt);
+ lnet_net_lock(msg->msg_tx_cpt);
+ }
+ } else {
+ spin_unlock(&txpeer->lpni_lock);
}
}
@@ -887,17 +906,12 @@ lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
void
lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
{
- struct list_head drop;
struct lnet_msg *msg;
- INIT_LIST_HEAD(&drop);
-
- list_splice_init(list, &drop);
-
lnet_net_unlock(cpt);
- while(!list_empty(&drop)) {
- msg = list_first_entry(&drop, struct lnet_msg, msg_list);
+ while (!list_empty(list)) {
+ msg = list_first_entry(list, struct lnet_msg, msg_list);
lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
0, 0, 0, msg->msg_hdr.payload_length);
list_del_init(&msg->msg_list);
@@ -968,6 +982,7 @@ lnet_return_rx_credits_locked(struct lnet_msg *msg)
/* give back peer router credits */
msg->msg_peerrtrcredit = 0;
+ spin_lock(&rxpeer->lpni_lock);
LASSERT((rxpeer->lpni_rtrcredits < 0) ==
!list_empty(&rxpeer->lpni_rtrq));
@@ -977,14 +992,19 @@ lnet_return_rx_credits_locked(struct lnet_msg *msg)
* peer.
*/
if (!the_lnet.ln_routing) {
- lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq,
- msg->msg_rx_cpt);
+ LIST_HEAD(drop);
+
+ list_splice_init(&rxpeer->lpni_rtrq, &drop);
+ spin_unlock(&rxpeer->lpni_lock);
+ lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
} else if (rxpeer->lpni_rtrcredits <= 0) {
msg2 = list_entry(rxpeer->lpni_rtrq.next,
struct lnet_msg, msg_list);
list_del(&msg2->msg_list);
-
+ spin_unlock(&rxpeer->lpni_lock);
(void)lnet_post_routed_recv_locked(msg2, 1);
+ } else {
+ spin_unlock(&rxpeer->lpni_lock);
}
}
if (rxni) {
@@ -56,12 +56,16 @@ lnet_peer_net_added(struct lnet_net *net)
lpni_on_remote_peer_ni_list) {
if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
lpni->lpni_net = net;
+
+ spin_lock(&lpni->lpni_lock);
lpni->lpni_txcredits =
- lpni->lpni_mintxcredits =
lpni->lpni_net->net_tunables.lct_peer_tx_credits;
+ lpni->lpni_mintxcredits =
+ lpni->lpni_txcredits;
lpni->lpni_rtrcredits =
- lpni->lpni_minrtrcredits =
lnet_peer_buffer_credits(lpni->lpni_net);
+ lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
+ spin_unlock(&lpni->lpni_lock);
lnet_peer_remove_from_remote_list(lpni);
}
@@ -1358,7 +1358,8 @@ lnet_rtrpool_free_bufs(struct lnet_rtrbufpool *rbp, int cpt)
INIT_LIST_HEAD(&tmp);
lnet_net_lock(cpt);
- lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+ list_splice_init(&rbp->rbp_msgs, &tmp);
+ lnet_drop_routed_msgs_locked(&tmp, cpt);
list_splice_init(&rbp->rbp_bufs, &tmp);
rbp->rbp_req_nbuffers = 0;
rbp->rbp_nbuffers = 0;