Message ID | 153895437824.16383.8664465506271547759.stgit@noble (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Port Dynamic Discovery to drivers/staging | expand |
> From: Olaf Weber <olaf@sgi.com> > > Implement Peer Discovery. > > A peer is queued for discovery by lnet_peer_queue_for_discovery(). > This set LNET_PEER_DISCOVERING, to indicate that discovery is in > progress. > > The discovery thread lnet_peer_discovery() checks the peer and > updates its state as appropriate. > > If LNET_PEER_DATA_PRESENT is set, then a valid Push message or > Ping reply has been received. The peer is updated in accordance > with the data, and LNET_PEER_NIDS_UPTODATE is set. > > If LNET_PEER_PING_FAILED is set, then an attempt to send a Ping > message failed, and peer state is updated accordingly. The discovery > thread can do some cleanup like unlinking an MD that cannot be done > from the message event handler. > > If LNET_PEER_PUSH_FAILED is set, then an attempt to send a Push > message failed, and peer state is updated accordingly. The discovery > thread can do some cleanup like unlinking an MD that cannot be done > from the message event handler. > > If LNET_PEER_PING_REQUIRED is set, we must Ping the peer in order to > correctly update our knowledge of it. This is set, for example, if > we receive a Push message for a peer, but cannot handle it because > the Push target was too small. In such a case we know that the > state of the peer is incorrect, but need to do extra work to obtain > the required information. > > If discovery is not enabled, then the discovery process stops here > and the peer is marked with LNET_PEER_UNDISCOVERED. This tells the > discovery process that it doesn't need to revisit the peer while > discovery remains disabled. > > If LNET_PEER_NIDS_UPTODATE is not set, then we have reason to think > the lnet_peer is not up to date, and will Ping it. > > The peer needs a Push if it is multi-rail and the ping buffer > sequence number for this node is newer than the sequence number it > has acknowledged receiving by sending an Ack of a Push. > > If none of the above is true, then discovery has completed its work > on the peer. > > Discovery signals that it is done with a peer by clearing the > LNET_PEER_DISCOVERING flag, and setting LNET_PEER_DISCOVERED or > LNET_PEER_UNDISCOVERED as appropriate. It then dequeues the peer > and clears the LNET_PEER_QUEUED flag. > > When the local node is discovered via the loopback network, the > peer structure that is created will have an lnet_peer_ni for the > local loopback interface. Subsequent traffic from this node to > itself will use the loopback net. Reviewed-by: James Simmons <jsimmons@infradead.org> > WC-bug-id: https://jira.whamcloud.com/browse/LU-9480 > Signed-off-by: Olaf Weber <olaf@sgi.com> > Reviewed-on: https://review.whamcloud.com/25789 > Reviewed-by: Olaf Weber <olaf.weber@hpe.com> > Reviewed-by: Amir Shehata <amir.shehata@intel.com> > Tested-by: Amir Shehata <amir.shehata@intel.com> > Signed-off-by: NeilBrown <neilb@suse.com> > --- > .../staging/lustre/include/linux/lnet/lib-lnet.h | 20 > .../staging/lustre/include/linux/lnet/lib-types.h | 39 + > drivers/staging/lustre/lnet/lnet/api-ni.c | 59 + > drivers/staging/lustre/lnet/lnet/lib-move.c | 18 > drivers/staging/lustre/lnet/lnet/peer.c | 1499 +++++++++++++++++++- > 5 files changed, 1543 insertions(+), 92 deletions(-) > > diff --git a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h > index 5632e5aadf41..f82a699371f2 100644 > --- a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h > +++ b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h > @@ -76,6 +76,9 @@ extern struct lnet the_lnet; /* THE network */ > #define LNET_ACCEPTOR_MIN_RESERVED_PORT 512 > #define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023 > > +/* Discovery timeout - same as default peer_timeout */ > +#define DISCOVERY_TIMEOUT 180 > + > static inline int lnet_is_route_alive(struct lnet_route *route) > { > /* gateway is down */ > @@ -713,9 +716,10 @@ struct lnet_peer_ni *lnet_nid2peerni_ex(lnet_nid_t nid, int cpt); > struct lnet_peer_ni *lnet_find_peer_ni_locked(lnet_nid_t nid); > void lnet_peer_net_added(struct lnet_net *net); > lnet_nid_t lnet_peer_primary_nid_locked(lnet_nid_t nid); > -int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt); > +int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block); > int lnet_peer_discovery_start(void); > void lnet_peer_discovery_stop(void); > +void lnet_push_update_to_peers(int force); > void lnet_peer_tables_cleanup(struct lnet_net *net); > void lnet_peer_uninit(void); > int lnet_peer_tables_create(void); > @@ -805,4 +809,18 @@ lnet_peer_ni_is_primary(struct lnet_peer_ni *lpni) > > bool lnet_peer_is_uptodate(struct lnet_peer *lp); > > +static inline bool > +lnet_peer_needs_push(struct lnet_peer *lp) > +{ > + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) > + return false; > + if (lp->lp_state & LNET_PEER_FORCE_PUSH) > + return true; > + if (lp->lp_state & LNET_PEER_NO_DISCOVERY) > + return false; > + if (lp->lp_node_seqno < atomic_read(&the_lnet.ln_ping_target_seqno)) > + return true; > + return false; > +} > + > #endif > diff --git a/drivers/staging/lustre/include/linux/lnet/lib-types.h b/drivers/staging/lustre/include/linux/lnet/lib-types.h > index e00c13355d43..07baa86e61ab 100644 > --- a/drivers/staging/lustre/include/linux/lnet/lib-types.h > +++ b/drivers/staging/lustre/include/linux/lnet/lib-types.h > @@ -67,6 +67,13 @@ struct lnet_msg { > lnet_nid_t msg_from; > __u32 msg_type; > > + /* > + * hold parameters in case message is with held due > + * to discovery > + */ > + lnet_nid_t msg_src_nid_param; > + lnet_nid_t msg_rtr_nid_param; > + > /* committed for sending */ > unsigned int msg_tx_committed:1; > /* CPT # this message committed for sending */ > @@ -395,6 +402,8 @@ struct lnet_ping_buffer { > #define LNET_PING_BUFFER_LONI(PBUF) ((PBUF)->pb_info.pi_ni[0].ns_nid) > #define LNET_PING_BUFFER_SEQNO(PBUF) ((PBUF)->pb_info.pi_ni[0].ns_status) > > +#define LNET_PING_INFO_TO_BUFFER(PINFO) \ > + container_of((PINFO), struct lnet_ping_buffer, pb_info) > > /* router checker data, per router */ > struct lnet_rc_data { > @@ -503,6 +512,9 @@ struct lnet_peer { > /* list of peer nets */ > struct list_head lp_peer_nets; > > + /* list of messages pending discovery*/ > + struct list_head lp_dc_pendq; > + > /* primary NID of the peer */ > lnet_nid_t lp_primary_nid; > > @@ -524,15 +536,36 @@ struct lnet_peer { > /* buffer for data pushed by peer */ > struct lnet_ping_buffer *lp_data; > > + /* MD handle for ping in progress */ > + struct lnet_handle_md lp_ping_mdh; > + > + /* MD handle for push in progress */ > + struct lnet_handle_md lp_push_mdh; > + > /* number of NIDs for sizing push data */ > int lp_data_nnis; > > /* NI config sequence number of peer */ > __u32 lp_peer_seqno; > > - /* Local NI config sequence number peer knows */ > + /* Local NI config sequence number acked by peer */ > __u32 lp_node_seqno; > > + /* Local NI config sequence number sent to peer */ > + __u32 lp_node_seqno_sent; > + > + /* Ping error encountered during discovery. */ > + int lp_ping_error; > + > + /* Push error encountered during discovery. */ > + int lp_push_error; > + > + /* Error encountered during discovery. */ > + int lp_dc_error; > + > + /* time it was put on the ln_dc_working queue */ > + time64_t lp_last_queued; > + > /* link on discovery-related lists */ > struct list_head lp_dc_list; > > @@ -691,6 +724,8 @@ struct lnet_remotenet { > #define LNET_CREDIT_OK 0 > /** lnet message is waiting for credit */ > #define LNET_CREDIT_WAIT 1 > +/** lnet message is waiting for discovery */ > +#define LNET_DC_WAIT 2 > > struct lnet_rtrbufpool { > struct list_head rbp_bufs; /* my free buffer pool */ > @@ -943,6 +978,8 @@ struct lnet { > struct list_head ln_dc_request; > /* discovery working list */ > struct list_head ln_dc_working; > + /* discovery expired list */ > + struct list_head ln_dc_expired; > /* discovery thread wait queue */ > wait_queue_head_t ln_dc_waitq; > /* discovery startup/shutdown state */ > diff --git a/drivers/staging/lustre/lnet/lnet/api-ni.c b/drivers/staging/lustre/lnet/lnet/api-ni.c > index e6bc54e9de71..955d1711eda4 100644 > --- a/drivers/staging/lustre/lnet/lnet/api-ni.c > +++ b/drivers/staging/lustre/lnet/lnet/api-ni.c > @@ -41,7 +41,14 @@ > > #define D_LNI D_CONSOLE > > -struct lnet the_lnet; /* THE state of the network */ > +/* > + * initialize ln_api_mutex statically, since it needs to be used in > + * discovery_set callback. That module parameter callback can be called > + * before module init completes. The mutex needs to be ready for use then. > + */ > +struct lnet the_lnet = { > + .ln_api_mutex = __MUTEX_INITIALIZER(the_lnet.ln_api_mutex), > +}; /* THE state of the network */ > EXPORT_SYMBOL(the_lnet); > > static char *ip2nets = ""; > @@ -101,7 +108,9 @@ static int > discovery_set(const char *val, const struct kernel_param *kp) > { > int rc; > + unsigned int *discovery = (unsigned int *)kp->arg; > unsigned long value; > + struct lnet_ping_buffer *pbuf; > > rc = kstrtoul(val, 0, &value); > if (rc) { > @@ -109,7 +118,38 @@ discovery_set(const char *val, const struct kernel_param *kp) > return rc; > } > > - *(unsigned int *)kp->arg = !!value; > + value = !!value; > + > + /* > + * The purpose of locking the api_mutex here is to ensure that > + * the correct value ends up stored properly. > + */ > + mutex_lock(&the_lnet.ln_api_mutex); > + > + if (value == *discovery) { > + mutex_unlock(&the_lnet.ln_api_mutex); > + return 0; > + } > + > + *discovery = value; > + > + if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) { > + mutex_unlock(&the_lnet.ln_api_mutex); > + return 0; > + } > + > + /* tell peers that discovery setting has changed */ > + lnet_net_lock(LNET_LOCK_EX); > + pbuf = the_lnet.ln_ping_target; > + if (value) > + pbuf->pb_info.pi_features &= ~LNET_PING_FEAT_DISCOVERY; > + else > + pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY; > + lnet_net_unlock(LNET_LOCK_EX); > + > + lnet_push_update_to_peers(1); > + > + mutex_unlock(&the_lnet.ln_api_mutex); > > return 0; > } > @@ -171,7 +211,6 @@ lnet_init_locks(void) > init_waitqueue_head(&the_lnet.ln_eq_waitq); > init_waitqueue_head(&the_lnet.ln_rc_waitq); > mutex_init(&the_lnet.ln_lnd_mutex); > - mutex_init(&the_lnet.ln_api_mutex); > } > > static int > @@ -654,6 +693,10 @@ lnet_prepare(lnet_pid_t requested_pid) > INIT_LIST_HEAD(&the_lnet.ln_routers); > INIT_LIST_HEAD(&the_lnet.ln_drop_rules); > INIT_LIST_HEAD(&the_lnet.ln_delay_rules); > + INIT_LIST_HEAD(&the_lnet.ln_dc_request); > + INIT_LIST_HEAD(&the_lnet.ln_dc_working); > + INIT_LIST_HEAD(&the_lnet.ln_dc_expired); > + init_waitqueue_head(&the_lnet.ln_dc_waitq); > > rc = lnet_create_remote_nets_table(); > if (rc) > @@ -998,7 +1041,8 @@ lnet_ping_target_create(int nnis) > pbuf->pb_info.pi_nnis = nnis; > pbuf->pb_info.pi_pid = the_lnet.ln_pid; > pbuf->pb_info.pi_magic = LNET_PROTO_PING_MAGIC; > - pbuf->pb_info.pi_features = LNET_PING_FEAT_NI_STATUS; > + pbuf->pb_info.pi_features = > + LNET_PING_FEAT_NI_STATUS | LNET_PING_FEAT_MULTI_RAIL; > > return pbuf; > } > @@ -1231,6 +1275,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf, > > if (!the_lnet.ln_routing) > pbuf->pb_info.pi_features |= LNET_PING_FEAT_RTE_DISABLED; > + if (!lnet_peer_discovery_disabled) > + pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY; > > /* Ensure only known feature bits have been set. */ > LASSERT(pbuf->pb_info.pi_features & LNET_PING_FEAT_BITS); > @@ -1252,6 +1298,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf, > lnet_ping_md_unlink(old_pbuf, &old_ping_md); > lnet_ping_buffer_decref(old_pbuf); > } > + > + lnet_push_update_to_peers(0); > } > > static void > @@ -1353,6 +1401,7 @@ static void lnet_push_target_event_handler(struct lnet_event *ev) > if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) > lnet_swap_pinginfo(pbuf); > > + lnet_peer_push_event(ev); > if (ev->unlinked) > lnet_ping_buffer_decref(pbuf); > } > @@ -1910,8 +1959,6 @@ int lnet_lib_init(void) > > lnet_assert_wire_constants(); > > - memset(&the_lnet, 0, sizeof(the_lnet)); > - > /* refer to global cfs_cpt_tab for now */ > the_lnet.ln_cpt_table = cfs_cpt_tab; > the_lnet.ln_cpt_number = cfs_cpt_number(cfs_cpt_tab); > diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c > index 4773180cc7b3..2ff329bf91ba 100644 > --- a/drivers/staging/lustre/lnet/lnet/lib-move.c > +++ b/drivers/staging/lustre/lnet/lnet/lib-move.c > @@ -444,6 +444,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target, > > memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr)); > msg->msg_hdr.type = cpu_to_le32(type); > + /* dest_nid will be overwritten by lnet_select_pathway() */ > + msg->msg_hdr.dest_nid = cpu_to_le64(target.nid); > msg->msg_hdr.dest_pid = cpu_to_le32(target.pid); > /* src_nid will be set later */ > msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid); > @@ -1292,7 +1294,7 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, > */ > peer = lpni->lpni_peer_net->lpn_peer; > if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) { > - rc = lnet_discover_peer_locked(lpni, cpt); > + rc = lnet_discover_peer_locked(lpni, cpt, false); > if (rc) { > lnet_peer_ni_decref_locked(lpni); > lnet_net_unlock(cpt); > @@ -1300,6 +1302,18 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, > } > /* The peer may have changed. */ > peer = lpni->lpni_peer_net->lpn_peer; > + /* queue message and return */ > + msg->msg_src_nid_param = src_nid; > + msg->msg_rtr_nid_param = rtr_nid; > + msg->msg_sending = 0; > + list_add_tail(&msg->msg_list, &peer->lp_dc_pendq); > + lnet_peer_ni_decref_locked(lpni); > + lnet_net_unlock(cpt); > + > + CDEBUG(D_NET, "%s pending discovery\n", > + libcfs_nid2str(peer->lp_primary_nid)); > + > + return LNET_DC_WAIT; > } > lnet_peer_ni_decref_locked(lpni); > > @@ -1840,7 +1854,7 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) > if (rc == LNET_CREDIT_OK) > lnet_ni_send(msg->msg_txni, msg); > > - /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */ > + /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */ > return 0; > } > > diff --git a/drivers/staging/lustre/lnet/lnet/peer.c b/drivers/staging/lustre/lnet/lnet/peer.c > index b78f99c354de..1ef4a44e752e 100644 > --- a/drivers/staging/lustre/lnet/lnet/peer.c > +++ b/drivers/staging/lustre/lnet/lnet/peer.c > @@ -38,6 +38,11 @@ > #include <linux/lnet/lib-lnet.h> > #include <uapi/linux/lnet/lnet-dlc.h> > > +/* Value indicating that recovery needs to re-check a peer immediately. */ > +#define LNET_REDISCOVER_PEER (1) > + > +static int lnet_peer_queue_for_discovery(struct lnet_peer *lp); > + > static void > lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni) > { > @@ -202,6 +207,7 @@ lnet_peer_alloc(lnet_nid_t nid) > INIT_LIST_HEAD(&lp->lp_peer_list); > INIT_LIST_HEAD(&lp->lp_peer_nets); > INIT_LIST_HEAD(&lp->lp_dc_list); > + INIT_LIST_HEAD(&lp->lp_dc_pendq); > init_waitqueue_head(&lp->lp_dc_waitq); > spin_lock_init(&lp->lp_lock); > lp->lp_primary_nid = nid; > @@ -220,6 +226,10 @@ lnet_destroy_peer_locked(struct lnet_peer *lp) > LASSERT(atomic_read(&lp->lp_refcount) == 0); > LASSERT(list_empty(&lp->lp_peer_nets)); > LASSERT(list_empty(&lp->lp_peer_list)); > + LASSERT(list_empty(&lp->lp_dc_list)); > + > + if (lp->lp_data) > + lnet_ping_buffer_decref(lp->lp_data); > > kfree(lp); > } > @@ -260,10 +270,19 @@ lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni) > /* > * If there are no more peer nets, make the peer unfindable > * via the peer_tables. > + * > + * Otherwise, if the peer is DISCOVERED, tell discovery to > + * take another look at it. This is a no-op if discovery for > + * this peer did the detaching. > */ > if (list_empty(&lp->lp_peer_nets)) { > list_del_init(&lp->lp_peer_list); > ptable->pt_peers--; > + } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) { > + /* Discovery isn't running, nothing to do here. */ > + } else if (lp->lp_state & LNET_PEER_DISCOVERED) { > + lnet_peer_queue_for_discovery(lp); > + wake_up(&the_lnet.ln_dc_waitq); > } > CDEBUG(D_NET, "peer %s NID %s\n", > libcfs_nid2str(lp->lp_primary_nid), > @@ -599,6 +618,25 @@ lnet_find_peer_ni_locked(lnet_nid_t nid) > return lpni; > } > > +struct lnet_peer * > +lnet_find_peer(lnet_nid_t nid) > +{ > + struct lnet_peer_ni *lpni; > + struct lnet_peer *lp = NULL; > + int cpt; > + > + cpt = lnet_net_lock_current(); > + lpni = lnet_find_peer_ni_locked(nid); > + if (lpni) { > + lp = lpni->lpni_peer_net->lpn_peer; > + lnet_peer_addref_locked(lp); > + lnet_peer_ni_decref_locked(lpni); > + } > + lnet_net_unlock(cpt); > + > + return lp; > +} > + > struct lnet_peer_ni * > lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn, > struct lnet_peer **lp) > @@ -696,6 +734,37 @@ lnet_get_next_peer_ni_locked(struct lnet_peer *peer, > return lpni; > } > > +/* > + * Start pushes to peers that need to be updated for a configuration > + * change on this node. > + */ > +void > +lnet_push_update_to_peers(int force) > +{ > + struct lnet_peer_table *ptable; > + struct lnet_peer *lp; > + int lncpt; > + int cpt; > + > + lnet_net_lock(LNET_LOCK_EX); > + lncpt = cfs_percpt_number(the_lnet.ln_peer_tables); > + for (cpt = 0; cpt < lncpt; cpt++) { > + ptable = the_lnet.ln_peer_tables[cpt]; > + list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) { > + if (force) { > + spin_lock(&lp->lp_lock); > + if (lp->lp_state & LNET_PEER_MULTI_RAIL) > + lp->lp_state |= LNET_PEER_FORCE_PUSH; > + spin_unlock(&lp->lp_lock); > + } > + if (lnet_peer_needs_push(lp)) > + lnet_peer_queue_for_discovery(lp); > + } > + } > + lnet_net_unlock(LNET_LOCK_EX); > + wake_up(&the_lnet.ln_dc_waitq); > +} > + > /* > * Test whether a ni is a preferred ni for this peer_ni, e.g, whether > * this is a preferred point-to-point path. Call with lnet_net_lock in > @@ -941,6 +1010,7 @@ lnet_peer_primary_nid_locked(lnet_nid_t nid) > lnet_nid_t > LNetPrimaryNID(lnet_nid_t nid) > { > + struct lnet_peer *lp; > struct lnet_peer_ni *lpni; > lnet_nid_t primary_nid = nid; > int rc = 0; > @@ -952,7 +1022,15 @@ LNetPrimaryNID(lnet_nid_t nid) > rc = PTR_ERR(lpni); > goto out_unlock; > } > - primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid; > + lp = lpni->lpni_peer_net->lpn_peer; > + while (!lnet_peer_is_uptodate(lp)) { > + rc = lnet_discover_peer_locked(lpni, cpt, true); > + if (rc) > + goto out_decref; > + lp = lpni->lpni_peer_net->lpn_peer; > + } > + primary_nid = lp->lp_primary_nid; > +out_decref: > lnet_peer_ni_decref_locked(lpni); > out_unlock: > lnet_net_unlock(cpt); > @@ -1229,6 +1307,30 @@ lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned int flags) > return rc; > } > > +/* > + * Update the primary NID of a peer, if possible. > + * > + * Call with the lnet_api_mutex held. > + */ > +static int > +lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, > + unsigned int flags) > +{ > + lnet_nid_t old = lp->lp_primary_nid; > + int rc = 0; > + > + if (lp->lp_primary_nid == nid) > + goto out; > + rc = lnet_peer_add_nid(lp, nid, flags); > + if (rc) > + goto out; > + lp->lp_primary_nid = nid; > +out: > + CDEBUG(D_NET, "peer %s NID %s: %d\n", > + libcfs_nid2str(old), libcfs_nid2str(nid), rc); > + return rc; > +} > + > /* > * lpni creation initiated due to traffic either sending or receiving. > */ > @@ -1548,11 +1650,15 @@ lnet_peer_is_uptodate(struct lnet_peer *lp) > LNET_PEER_FORCE_PING | > LNET_PEER_FORCE_PUSH)) { > rc = false; > + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { > + rc = true; > } else if (lp->lp_state & LNET_PEER_REDISCOVER) { > if (lnet_peer_discovery_disabled) > rc = true; > else > rc = false; > + } else if (lnet_peer_needs_push(lp)) { > + rc = false; > } else if (lp->lp_state & LNET_PEER_DISCOVERED) { > if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) > rc = true; > @@ -1588,6 +1694,9 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp) > rc = -EALREADY; > } > > + CDEBUG(D_NET, "Queue peer %s: %d\n", > + libcfs_nid2str(lp->lp_primary_nid), rc); > + > return rc; > } > > @@ -1597,9 +1706,252 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp) > */ > static void lnet_peer_discovery_complete(struct lnet_peer *lp) > { > + struct lnet_msg *msg = NULL; > + int rc = 0; > + struct list_head pending_msgs; > + > + INIT_LIST_HEAD(&pending_msgs); > + > + CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + > list_del_init(&lp->lp_dc_list); > + list_splice_init(&lp->lp_dc_pendq, &pending_msgs); > wake_up_all(&lp->lp_dc_waitq); > + > + lnet_net_unlock(LNET_LOCK_EX); > + > + /* iterate through all pending messages and send them again */ > + list_for_each_entry(msg, &pending_msgs, msg_list) { > + if (lp->lp_dc_error) { > + lnet_finalize(msg, lp->lp_dc_error); > + continue; > + } > + > + CDEBUG(D_NET, "sending pending message %s to target %s\n", > + lnet_msgtyp2str(msg->msg_type), > + libcfs_id2str(msg->msg_target)); > + rc = lnet_send(msg->msg_src_nid_param, msg, > + msg->msg_rtr_nid_param); > + if (rc < 0) { > + CNETERR("Error sending %s to %s: %d\n", > + lnet_msgtyp2str(msg->msg_type), > + libcfs_id2str(msg->msg_target), rc); > + lnet_finalize(msg, rc); > + } > + } > + lnet_net_lock(LNET_LOCK_EX); > + lnet_peer_decref_locked(lp); > +} > + > +/* > + * Handle inbound push. > + * Like any event handler, called with lnet_res_lock/CPT held. > + */ > +void lnet_peer_push_event(struct lnet_event *ev) > +{ > + struct lnet_ping_buffer *pbuf = ev->md.user_ptr; > + struct lnet_peer *lp; > + > + /* lnet_find_peer() adds a refcount */ > + lp = lnet_find_peer(ev->source.nid); > + if (!lp) { > + CERROR("Push Put from unknown %s (source %s)\n", > + libcfs_nid2str(ev->initiator.nid), > + libcfs_nid2str(ev->source.nid)); > + return; > + } > + > + /* Ensure peer state remains consistent while we modify it. */ > + spin_lock(&lp->lp_lock); > + > + /* > + * If some kind of error happened the contents of the message > + * cannot be used. Clear the NIDS_UPTODATE and set the > + * FORCE_PING flag to trigger a ping. > + */ > + if (ev->status) { > + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; > + lp->lp_state |= LNET_PEER_FORCE_PING; > + CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n", > + ev->status, > + libcfs_nid2str(lp->lp_primary_nid), > + libcfs_nid2str(ev->source.nid)); > + goto out; > + } > + > + /* > + * A push with invalid or corrupted info. Clear the UPTODATE > + * flag to trigger a ping. > + */ > + if (lnet_ping_info_validate(&pbuf->pb_info)) { > + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; > + lp->lp_state |= LNET_PEER_FORCE_PING; > + CDEBUG(D_NET, "Corrupted Push from %s\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + goto out; > + } > + > + /* > + * Make sure we'll allocate the correct size ping buffer when > + * pinging the peer. > + */ > + if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis) > + lp->lp_data_nnis = pbuf->pb_info.pi_nnis; > + > + /* > + * A non-Multi-Rail peer is not supposed to be capable of > + * sending a push. > + */ > + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) { > + CERROR("Push from non-Multi-Rail peer %s dropped\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + goto out; > + } > + > + /* > + * Check the MULTIRAIL flag. Complain if the peer was DLC > + * configured without it. > + */ > + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) { > + if (lp->lp_state & LNET_PEER_CONFIGURED) { > + CERROR("Push says %s is Multi-Rail, DLC says not\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + } else { > + lp->lp_state |= LNET_PEER_MULTI_RAIL; > + lnet_peer_clr_non_mr_pref_nids(lp); > + } > + } > + > + /* > + * The peer may have discovery disabled at its end. Set > + * NO_DISCOVERY as appropriate. > + */ > + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) { > + CDEBUG(D_NET, "Peer %s has discovery disabled\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + lp->lp_state |= LNET_PEER_NO_DISCOVERY; > + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { > + CDEBUG(D_NET, "Peer %s has discovery enabled\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + lp->lp_state &= ~LNET_PEER_NO_DISCOVERY; > + } > + > + /* > + * Check for truncation of the Put message. Clear the > + * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping, > + * and tell discovery to allocate a bigger buffer. > + */ > + if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) { > + if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis) > + the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis; > + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; > + lp->lp_state |= LNET_PEER_FORCE_PING; > + CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n", > + libcfs_nid2str(lp->lp_primary_nid), > + pbuf->pb_info.pi_nnis); > + goto out; > + } > + > + /* > + * Check whether the Put data is stale. Stale data can just be > + * dropped. > + */ > + if (pbuf->pb_info.pi_nnis > 1 && > + lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid && > + LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) { > + CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf), > + lp->lp_peer_seqno); > + goto out; > + } > + > + /* > + * Check whether the Put data is new, in which case we clear > + * the UPTODATE flag and prepare to process it. > + * > + * If the Put data is current, and the peer is UPTODATE then > + * we assome everything is all right and drop the data as > + * stale. > + */ > + if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) { > + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); > + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; > + } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) { > + CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf), > + lp->lp_peer_seqno); > + goto out; > + } > + > + /* > + * If there is data present that hasn't been processed yet, > + * we'll replace it if the Put contained newer data and it > + * fits. We're racing with a Ping or earlier Push in this > + * case. > + */ > + if (lp->lp_state & LNET_PEER_DATA_PRESENT) { > + if (LNET_PING_BUFFER_SEQNO(pbuf) > > + LNET_PING_BUFFER_SEQNO(lp->lp_data) && > + pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) { > + memcpy(&lp->lp_data->pb_info, &pbuf->pb_info, > + LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis)); > + CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf), > + LNET_PING_BUFFER_SEQNO(lp->lp_data)); > + } > + goto out; > + } > + > + /* > + * Allocate a buffer to copy the data. On a failure we drop > + * the Push and set FORCE_PING to force the discovery > + * thread to fix the problem by pinging the peer. > + */ > + lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC); > + if (!lp->lp_data) { > + lp->lp_state |= LNET_PEER_FORCE_PING; > + CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf)); > + goto out; > + } > + > + /* Success */ > + memcpy(&lp->lp_data->pb_info, &pbuf->pb_info, > + LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis)); > + lp->lp_state |= LNET_PEER_DATA_PRESENT; > + CDEBUG(D_NET, "Received Push %s %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf)); > + > +out: > + /* > + * Queue the peer for discovery, and wake the discovery thread > + * if the peer was already queued, because its status changed. > + */ > + spin_unlock(&lp->lp_lock); > + lnet_net_lock(LNET_LOCK_EX); > + if (lnet_peer_queue_for_discovery(lp)) > + wake_up(&the_lnet.ln_dc_waitq); > + /* Drop refcount from lookup */ > lnet_peer_decref_locked(lp); > + lnet_net_unlock(LNET_LOCK_EX); > +} > + > +/* > + * Clear the discovery error state, unless we're already discovering > + * this peer, in which case the error is current. > + */ > +static void lnet_peer_clear_discovery_error(struct lnet_peer *lp) > +{ > + spin_lock(&lp->lp_lock); > + if (!(lp->lp_state & LNET_PEER_DISCOVERING)) > + lp->lp_dc_error = 0; > + spin_unlock(&lp->lp_lock); > } > > /* > @@ -1608,7 +1960,7 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp) > * because discovery could tear down an lnet_peer. > */ > int > -lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) > +lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block) > { > DEFINE_WAIT(wait); > struct lnet_peer *lp; > @@ -1617,25 +1969,40 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) > again: > lnet_net_unlock(cpt); > lnet_net_lock(LNET_LOCK_EX); > + lp = lpni->lpni_peer_net->lpn_peer; > + lnet_peer_clear_discovery_error(lp); > > - /* We're willing to be interrupted. */ > + /* > + * We're willing to be interrupted. The lpni can become a > + * zombie if we race with DLC, so we must check for that. > + */ > for (;;) { > - lp = lpni->lpni_peer_net->lpn_peer; > prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE); > if (signal_pending(current)) > break; > if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) > break; > + if (lp->lp_dc_error) > + break; > if (lnet_peer_is_uptodate(lp)) > break; > lnet_peer_queue_for_discovery(lp); > lnet_peer_addref_locked(lp); > + /* > + * if caller requested a non-blocking operation then > + * return immediately. Once discovery is complete then the > + * peer ref will be decremented and any pending messages > + * that were stopped due to discovery will be transmitted. > + */ > + if (!block) > + break; > lnet_net_unlock(LNET_LOCK_EX); > schedule(); > finish_wait(&lp->lp_dc_waitq, &wait); > lnet_net_lock(LNET_LOCK_EX); > lnet_peer_decref_locked(lp); > - /* Do not use lp beyond this point. */ > + /* Peer may have changed */ > + lp = lpni->lpni_peer_net->lpn_peer; > } > finish_wait(&lp->lp_dc_waitq, &wait); > > @@ -1646,71 +2013,969 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) > rc = -EINTR; > else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) > rc = -ESHUTDOWN; > + else if (lp->lp_dc_error) > + rc = lp->lp_dc_error; > + else if (!block) > + CDEBUG(D_NET, "non-blocking discovery\n"); > else if (!lnet_peer_is_uptodate(lp)) > goto again; > > + CDEBUG(D_NET, "peer %s NID %s: %d. %s\n", > + (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"), > + libcfs_nid2str(lpni->lpni_nid), rc, > + (!block) ? "pending discovery" : "discovery complete"); > + > return rc; > } > > -/* > - * Event handler for the discovery EQ. > - * > - * Called with lnet_res_lock(cpt) held. The cpt is the > - * lnet_cpt_of_cookie() of the md handle cookie. > - */ > -static void lnet_discovery_event_handler(struct lnet_event *event) > +/* Handle an incoming ack for a push. */ > +static void > +lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev) > { > - wake_up(&the_lnet.ln_dc_waitq); > + struct lnet_ping_buffer *pbuf; > + > + pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start); > + spin_lock(&lp->lp_lock); > + lp->lp_state &= ~LNET_PEER_PUSH_SENT; > + lp->lp_push_error = ev->status; > + if (ev->status) > + lp->lp_state |= LNET_PEER_PUSH_FAILED; > + else > + lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf); > + spin_unlock(&lp->lp_lock); > + > + CDEBUG(D_NET, "peer %s ev->status %d\n", > + libcfs_nid2str(lp->lp_primary_nid), ev->status); > } > > -/* > - * Wait for work to be queued or some other change that must be > - * attended to. Returns non-zero if the discovery thread should shut > - * down. > - */ > -static int lnet_peer_discovery_wait_for_work(void) > +/* Handle a Reply message. This is the reply to a Ping message. */ > +static void > +lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev) > { > - int cpt; > - int rc = 0; > + struct lnet_ping_buffer *pbuf; > + int rc; > > - DEFINE_WAIT(wait); > + spin_lock(&lp->lp_lock); > > - cpt = lnet_net_lock_current(); > - for (;;) { > - prepare_to_wait(&the_lnet.ln_dc_waitq, &wait, > - TASK_IDLE); > - if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > - break; > - if (lnet_push_target_resize_needed()) > - break; > - if (!list_empty(&the_lnet.ln_dc_request)) > - break; > - lnet_net_unlock(cpt); > - schedule(); > - finish_wait(&the_lnet.ln_dc_waitq, &wait); > - cpt = lnet_net_lock_current(); > + /* > + * If some kind of error happened the contents of message > + * cannot be used. Set PING_FAILED to trigger a retry. > + */ > + if (ev->status) { > + lp->lp_state |= LNET_PEER_PING_FAILED; > + lp->lp_ping_error = ev->status; > + CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n", > + ev->status, > + libcfs_nid2str(lp->lp_primary_nid), > + libcfs_nid2str(ev->source.nid)); > + goto out; > } > - finish_wait(&the_lnet.ln_dc_waitq, &wait); > - > - if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > - rc = -ESHUTDOWN; > > - lnet_net_unlock(cpt); > + pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start); > + if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) > + lnet_swap_pinginfo(pbuf); > > - CDEBUG(D_NET, "woken: %d\n", rc); > + /* > + * A reply with invalid or corrupted info. Set PING_FAILED to > + * trigger a retry. > + */ > + rc = lnet_ping_info_validate(&pbuf->pb_info); > + if (rc) { > + lp->lp_state |= LNET_PEER_PING_FAILED; > + lp->lp_ping_error = 0; > + CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n", > + libcfs_nid2str(lp->lp_primary_nid), rc); > + goto out; > + } > > - return rc; > -} > + /* > + * Update the MULTI_RAIL flag based on the reply. If the peer > + * was configured with DLC then the setting should match what > + * DLC put in. > + */ > + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) { > + if (lp->lp_state & LNET_PEER_MULTI_RAIL) { > + /* Everything's fine */ > + } else if (lp->lp_state & LNET_PEER_CONFIGURED) { > + CWARN("Reply says %s is Multi-Rail, DLC says not\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + } else { > + lp->lp_state |= LNET_PEER_MULTI_RAIL; > + lnet_peer_clr_non_mr_pref_nids(lp); > + } > + } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) { > + if (lp->lp_state & LNET_PEER_CONFIGURED) { > + CWARN("DLC says %s is Multi-Rail, Reply says not\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + } else { > + CERROR("Multi-Rail state vanished from %s\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + lp->lp_state &= ~LNET_PEER_MULTI_RAIL; > + } > + } > > -/* The discovery thread. */ > -static int lnet_peer_discovery(void *arg) > -{ > - struct lnet_peer *lp; > + /* > + * Make sure we'll allocate the correct size ping buffer when > + * pinging the peer. > + */ > + if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis) > + lp->lp_data_nnis = pbuf->pb_info.pi_nnis; > > - CDEBUG(D_NET, "started\n"); > + /* > + * The peer may have discovery disabled at its end. Set > + * NO_DISCOVERY as appropriate. > + */ > + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) { > + CDEBUG(D_NET, "Peer %s has discovery disabled\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + lp->lp_state |= LNET_PEER_NO_DISCOVERY; > + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { > + CDEBUG(D_NET, "Peer %s has discovery enabled\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + lp->lp_state &= ~LNET_PEER_NO_DISCOVERY; > + } > > - for (;;) { > - if (lnet_peer_discovery_wait_for_work()) > + /* > + * Check for truncation of the Reply. Clear PING_SENT and set > + * PING_FAILED to trigger a retry. > + */ > + if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) { > + if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis) > + the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis; > + lp->lp_state |= LNET_PEER_PING_FAILED; > + lp->lp_ping_error = 0; > + CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n", > + libcfs_nid2str(lp->lp_primary_nid), > + pbuf->pb_info.pi_nnis); > + goto out; > + } > + > + /* > + * Check the sequence numbers in the reply. These are only > + * available if the reply came from a Multi-Rail peer. > + */ > + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL && > + pbuf->pb_info.pi_nnis > 1 && > + lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) { > + if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) { > + CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n", > + libcfs_nid2str(lp->lp_primary_nid), > + LNET_PING_BUFFER_SEQNO(pbuf), > + lp->lp_peer_seqno); > + goto out; > + } > + > + if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) > + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); > + } > + > + /* We're happy with the state of the data in the buffer. */ > + CDEBUG(D_NET, "peer %s data present %u\n", > + libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno); > + if (lp->lp_state & LNET_PEER_DATA_PRESENT) > + lnet_ping_buffer_decref(lp->lp_data); > + else > + lp->lp_state |= LNET_PEER_DATA_PRESENT; > + lnet_ping_buffer_addref(pbuf); > + lp->lp_data = pbuf; > +out: > + lp->lp_state &= ~LNET_PEER_PING_SENT; > + spin_unlock(&lp->lp_lock); > +} > + > +/* > + * Send event handling. Only matters for error cases, where we clean > + * up state on the peer and peer_ni that would otherwise be updated in > + * the REPLY event handler for a successful Ping, and the ACK event > + * handler for a successful Push. > + */ > +static int > +lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev) > +{ > + int rc = 0; > + > + if (!ev->status) > + goto out; > + > + spin_lock(&lp->lp_lock); > + if (ev->msg_type == LNET_MSG_GET) { > + lp->lp_state &= ~LNET_PEER_PING_SENT; > + lp->lp_state |= LNET_PEER_PING_FAILED; > + lp->lp_ping_error = ev->status; > + } else { /* ev->msg_type == LNET_MSG_PUT */ > + lp->lp_state &= ~LNET_PEER_PUSH_SENT; > + lp->lp_state |= LNET_PEER_PUSH_FAILED; > + lp->lp_push_error = ev->status; > + } > + spin_unlock(&lp->lp_lock); > + rc = LNET_REDISCOVER_PEER; > +out: > + CDEBUG(D_NET, "%s Send to %s: %d\n", > + (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"), > + libcfs_nid2str(ev->target.nid), rc); > + return rc; > +} > + > +/* > + * Unlink event handling. This event is only seen if a call to > + * LNetMDUnlink() caused the event to be unlinked. If this call was > + * made after the event was set up in LNetGet() or LNetPut() then we > + * assume the Ping or Push timed out. > + */ > +static void > +lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev) > +{ > + spin_lock(&lp->lp_lock); > + /* We've passed through LNetGet() */ > + if (lp->lp_state & LNET_PEER_PING_SENT) { > + lp->lp_state &= ~LNET_PEER_PING_SENT; > + lp->lp_state |= LNET_PEER_PING_FAILED; > + lp->lp_ping_error = -ETIMEDOUT; > + CDEBUG(D_NET, "Ping Unlink for message to peer %s\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + } > + /* We've passed through LNetPut() */ > + if (lp->lp_state & LNET_PEER_PUSH_SENT) { > + lp->lp_state &= ~LNET_PEER_PUSH_SENT; > + lp->lp_state |= LNET_PEER_PUSH_FAILED; > + lp->lp_push_error = -ETIMEDOUT; > + CDEBUG(D_NET, "Push Unlink for message to peer %s\n", > + libcfs_nid2str(lp->lp_primary_nid)); > + } > + spin_unlock(&lp->lp_lock); > +} > + > +/* > + * Event handler for the discovery EQ. > + * > + * Called with lnet_res_lock(cpt) held. The cpt is the > + * lnet_cpt_of_cookie() of the md handle cookie. > + */ > +static void lnet_discovery_event_handler(struct lnet_event *event) > +{ > + struct lnet_peer *lp = event->md.user_ptr; > + struct lnet_ping_buffer *pbuf; > + int rc; > + > + /* discovery needs to take another look */ > + rc = LNET_REDISCOVER_PEER; > + > + CDEBUG(D_NET, "Received event: %d\n", event->type); > + > + switch (event->type) { > + case LNET_EVENT_ACK: > + lnet_discovery_event_ack(lp, event); > + break; > + case LNET_EVENT_REPLY: > + lnet_discovery_event_reply(lp, event); > + break; > + case LNET_EVENT_SEND: > + /* Only send failure triggers a retry. */ > + rc = lnet_discovery_event_send(lp, event); > + break; > + case LNET_EVENT_UNLINK: > + /* LNetMDUnlink() was called */ > + lnet_discovery_event_unlink(lp, event); > + break; > + default: > + /* Invalid events. */ > + LBUG(); > + } > + lnet_net_lock(LNET_LOCK_EX); > + if (event->unlinked) { > + pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start); > + lnet_ping_buffer_decref(pbuf); > + lnet_peer_decref_locked(lp); > + } > + if (rc == LNET_REDISCOVER_PEER) { > + list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request); > + wake_up(&the_lnet.ln_dc_waitq); > + } > + lnet_net_unlock(LNET_LOCK_EX); > +} > + > +/* > + * Build a peer from incoming data. > + * > + * The NIDs in the incoming data are supposed to be structured as follows: > + * - loopback > + * - primary NID > + * - other NIDs in same net > + * - NIDs in second net > + * - NIDs in third net > + * - ... > + * This due to the way the list of NIDs in the data is created. > + * > + * Note that this function will mark the peer uptodate unless an > + * ENOMEM is encontered. All other errors are due to a conflict > + * between the DLC configuration and what discovery sees. We treat DLC > + * as binding, and therefore set the NIDS_UPTODATE flag to prevent the > + * peer from becoming stuck in discovery. > + */ > +static int lnet_peer_merge_data(struct lnet_peer *lp, > + struct lnet_ping_buffer *pbuf) > +{ > + struct lnet_peer_ni *lpni; > + lnet_nid_t *curnis = NULL; > + lnet_nid_t *addnis = NULL; > + lnet_nid_t *delnis = NULL; > + unsigned int flags; > + int ncurnis; > + int naddnis; > + int ndelnis; > + int nnis = 0; > + int i; > + int j; > + int rc; > + > + flags = LNET_PEER_DISCOVERED; > + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) > + flags |= LNET_PEER_MULTI_RAIL; > + > + nnis = max_t(int, lp->lp_nnis, pbuf->pb_info.pi_nnis); > + curnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); > + addnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); > + delnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); > + if (!curnis || !addnis || !delnis) { > + rc = -ENOMEM; > + goto out; > + } > + ncurnis = 0; > + naddnis = 0; > + ndelnis = 0; > + > + /* Construct the list of NIDs present in peer. */ > + lpni = NULL; > + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) > + curnis[ncurnis++] = lpni->lpni_nid; > + > + /* > + * Check for NIDs in pbuf not present in curnis[]. > + * The loop starts at 1 to skip the loopback NID. > + */ > + for (i = 1; i < pbuf->pb_info.pi_nnis; i++) { > + for (j = 0; j < ncurnis; j++) > + if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j]) > + break; > + if (j == ncurnis) > + addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid; > + } > + /* > + * Check for NIDs in curnis[] not present in pbuf. > + * The nested loop starts at 1 to skip the loopback NID. > + * > + * But never add the loopback NID to delnis[]: if it is > + * present in curnis[] then this peer is for this node. > + */ > + for (i = 0; i < ncurnis; i++) { > + if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND) > + continue; > + for (j = 1; j < pbuf->pb_info.pi_nnis; j++) > + if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid) > + break; > + if (j == pbuf->pb_info.pi_nnis) > + delnis[ndelnis++] = curnis[i]; > + } > + > + for (i = 0; i < naddnis; i++) { > + rc = lnet_peer_add_nid(lp, addnis[i], flags); > + if (rc) { > + CERROR("Error adding NID %s to peer %s: %d\n", > + libcfs_nid2str(addnis[i]), > + libcfs_nid2str(lp->lp_primary_nid), rc); > + if (rc == -ENOMEM) > + goto out; > + } > + } > + for (i = 0; i < ndelnis; i++) { > + rc = lnet_peer_del_nid(lp, delnis[i], flags); > + if (rc) { > + CERROR("Error deleting NID %s from peer %s: %d\n", > + libcfs_nid2str(delnis[i]), > + libcfs_nid2str(lp->lp_primary_nid), rc); > + if (rc == -ENOMEM) > + goto out; > + } > + } > + /* > + * Errors other than -ENOMEM are due to peers having been > + * configured with DLC. Ignore these because DLC overrides > + * Discovery. > + */ > + rc = 0; > +out: > + kfree(curnis); > + kfree(addnis); > + kfree(delnis); > + lnet_ping_buffer_decref(pbuf); > + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); > + > + if (rc) { > + spin_lock(&lp->lp_lock); > + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; > + lp->lp_state |= LNET_PEER_FORCE_PING; > + spin_unlock(&lp->lp_lock); > + } > + return rc; > +} > + > +/* > + * The data in pbuf says lp is its primary peer, but the data was > + * received by a different peer. Try to update lp with the data. > + */ > +static int > +lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf) > +{ > + struct lnet_handle_md mdh; > + > + /* Queue lp for discovery, and force it on the request queue. */ > + lnet_net_lock(LNET_LOCK_EX); > + if (lnet_peer_queue_for_discovery(lp)) > + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request); > + lnet_net_unlock(LNET_LOCK_EX); > + > + LNetInvalidateMDHandle(&mdh); > + > + /* > + * Decide whether we can move the peer to the DATA_PRESENT state. > + * > + * We replace stale data for a multi-rail peer, repair PING_FAILED > + * status, and preempt FORCE_PING. > + * > + * If after that we have DATA_PRESENT, we merge it into this peer. > + */ > + spin_lock(&lp->lp_lock); > + if (lp->lp_state & LNET_PEER_MULTI_RAIL) { > + if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) { > + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); > + } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) { > + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; > + lnet_ping_buffer_decref(pbuf); > + pbuf = lp->lp_data; > + lp->lp_data = NULL; > + } > + } > + if (lp->lp_state & LNET_PEER_DATA_PRESENT) { > + lnet_ping_buffer_decref(lp->lp_data); > + lp->lp_data = NULL; > + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; > + } > + if (lp->lp_state & LNET_PEER_PING_FAILED) { > + mdh = lp->lp_ping_mdh; > + LNetInvalidateMDHandle(&lp->lp_ping_mdh); > + lp->lp_state &= ~LNET_PEER_PING_FAILED; > + lp->lp_ping_error = 0; > + } > + if (lp->lp_state & LNET_PEER_FORCE_PING) > + lp->lp_state &= ~LNET_PEER_FORCE_PING; > + lp->lp_state |= LNET_PEER_NIDS_UPTODATE; > + spin_unlock(&lp->lp_lock); > + > + if (!LNetMDHandleIsInvalid(mdh)) > + LNetMDUnlink(mdh); > + > + if (pbuf) > + return lnet_peer_merge_data(lp, pbuf); > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + return 0; > +} > + > +/* > + * Update a peer using the data received. > + */ > +static int lnet_peer_data_present(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + struct lnet_ping_buffer *pbuf; > + struct lnet_peer_ni *lpni; > + lnet_nid_t nid = LNET_NID_ANY; > + unsigned int flags; > + int rc = 0; > + > + pbuf = lp->lp_data; > + lp->lp_data = NULL; > + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; > + lp->lp_state |= LNET_PEER_NIDS_UPTODATE; > + spin_unlock(&lp->lp_lock); > + > + /* > + * Modifications of peer structures are done while holding the > + * ln_api_mutex. A global lock is required because we may be > + * modifying multiple peer structures, and a mutex greatly > + * simplifies memory management. > + * > + * The actual changes to the data structures must also protect > + * against concurrent lookups, for which the lnet_net_lock in > + * LNET_LOCK_EX mode is used. > + */ > + mutex_lock(&the_lnet.ln_api_mutex); > + if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) { > + rc = -ESHUTDOWN; > + goto out; > + } > + > + /* > + * If this peer is not on the peer list then it is being torn > + * down, and our reference count may be all that is keeping it > + * alive. Don't do any work on it. > + */ > + if (list_empty(&lp->lp_peer_list)) > + goto out; > + > + flags = LNET_PEER_DISCOVERED; > + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) > + flags |= LNET_PEER_MULTI_RAIL; > + > + /* > + * Check whether the primary NID in the message matches the > + * primary NID of the peer. If it does, update the peer, if > + * it it does not, check whether there is already a peer with > + * that primary NID. If no such peer exists, try to update > + * the primary NID of the current peer (allowed if it was > + * created due to message traffic) and complete the update. > + * If the peer did exist, hand off the data to it. > + * > + * The peer for the loopback interface is a special case: this > + * is the peer for the local node, and we want to set its > + * primary NID to the correct value here. > + */ > + if (pbuf->pb_info.pi_nnis > 1) > + nid = pbuf->pb_info.pi_ni[1].ns_nid; > + if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) { > + rc = lnet_peer_set_primary_nid(lp, nid, flags); > + if (!rc) > + rc = lnet_peer_merge_data(lp, pbuf); > + } else if (lp->lp_primary_nid == nid) { > + rc = lnet_peer_merge_data(lp, pbuf); > + } else { > + lpni = lnet_find_peer_ni_locked(nid); > + if (!lpni) { > + rc = lnet_peer_set_primary_nid(lp, nid, flags); > + if (rc) { > + CERROR("Primary NID error %s versus %s: %d\n", > + libcfs_nid2str(lp->lp_primary_nid), > + libcfs_nid2str(nid), rc); > + } else { > + rc = lnet_peer_merge_data(lp, pbuf); > + } > + } else { > + rc = lnet_peer_set_primary_data( > + lpni->lpni_peer_net->lpn_peer, pbuf); > + lnet_peer_ni_decref_locked(lpni); > + } > + } > +out: > + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); > + mutex_unlock(&the_lnet.ln_api_mutex); > + > + spin_lock(&lp->lp_lock); > + /* Tell discovery to re-check the peer immediately. */ > + if (!rc) > + rc = LNET_REDISCOVER_PEER; > + return rc; > +} > + > +/* > + * A ping failed. Clear the PING_FAILED state and set the > + * FORCE_PING state, to ensure a retry even if discovery is > + * disabled. This avoids being left with incorrect state. > + */ > +static int lnet_peer_ping_failed(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + struct lnet_handle_md mdh; > + int rc; > + > + mdh = lp->lp_ping_mdh; > + LNetInvalidateMDHandle(&lp->lp_ping_mdh); > + lp->lp_state &= ~LNET_PEER_PING_FAILED; > + lp->lp_state |= LNET_PEER_FORCE_PING; > + rc = lp->lp_ping_error; > + lp->lp_ping_error = 0; > + spin_unlock(&lp->lp_lock); > + > + if (!LNetMDHandleIsInvalid(mdh)) > + LNetMDUnlink(mdh); > + > + CDEBUG(D_NET, "peer %s:%d\n", > + libcfs_nid2str(lp->lp_primary_nid), rc); > + > + spin_lock(&lp->lp_lock); > + return rc ? rc : LNET_REDISCOVER_PEER; > +} > + > +/* > + * Select NID to send a Ping or Push to. > + */ > +static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp) > +{ > + struct lnet_peer_ni *lpni; > + > + /* Look for a direct-connected NID for this peer. */ > + lpni = NULL; > + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) { > + if (!lnet_is_peer_ni_healthy_locked(lpni)) > + continue; > + if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id)) > + continue; > + break; > + } > + if (lpni) > + return lpni->lpni_nid; > + > + /* Look for a routed-connected NID for this peer. */ > + lpni = NULL; > + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) { > + if (!lnet_is_peer_ni_healthy_locked(lpni)) > + continue; > + if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id)) > + continue; > + break; > + } > + if (lpni) > + return lpni->lpni_nid; > + > + return LNET_NID_ANY; > +} > + > +/* Active side of ping. */ > +static int lnet_peer_send_ping(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + struct lnet_md md = { NULL }; > + struct lnet_process_id id; > + struct lnet_ping_buffer *pbuf; > + int nnis; > + int rc; > + int cpt; > + > + lp->lp_state |= LNET_PEER_PING_SENT; > + lp->lp_state &= ~LNET_PEER_FORCE_PING; > + spin_unlock(&lp->lp_lock); > + > + nnis = max_t(int, lp->lp_data_nnis, LNET_INTERFACES_MIN); > + pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS); > + if (!pbuf) { > + rc = -ENOMEM; > + goto fail_error; > + } > + > + /* initialize md content */ > + md.start = &pbuf->pb_info; > + md.length = LNET_PING_INFO_SIZE(nnis); > + md.threshold = 2; /* GET/REPLY */ > + md.max_size = 0; > + md.options = LNET_MD_TRUNCATE; > + md.user_ptr = lp; > + md.eq_handle = the_lnet.ln_dc_eqh; > + > + rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh); > + if (rc != 0) { > + lnet_ping_buffer_decref(pbuf); > + CERROR("Can't bind MD: %d\n", rc); > + goto fail_error; > + } > + cpt = lnet_net_lock_current(); > + /* Refcount for MD. */ > + lnet_peer_addref_locked(lp); > + id.pid = LNET_PID_LUSTRE; > + id.nid = lnet_peer_select_nid(lp); > + lnet_net_unlock(cpt); > + > + if (id.nid == LNET_NID_ANY) { > + rc = -EHOSTUNREACH; > + goto fail_unlink_md; > + } > + > + rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id, > + LNET_RESERVED_PORTAL, > + LNET_PROTO_PING_MATCHBITS, 0); > + > + if (rc) > + goto fail_unlink_md; > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + > + spin_lock(&lp->lp_lock); > + return 0; > + > +fail_unlink_md: > + LNetMDUnlink(lp->lp_ping_mdh); > + LNetInvalidateMDHandle(&lp->lp_ping_mdh); > +fail_error: > + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); > + /* > + * The errors that get us here are considered hard errors and > + * cause Discovery to terminate. So we clear PING_SENT, but do > + * not set either PING_FAILED or FORCE_PING. In fact we need > + * to clear PING_FAILED, because the unlink event handler will > + * have set it if we called LNetMDUnlink() above. > + */ > + spin_lock(&lp->lp_lock); > + lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED); > + return rc; > +} > + > +/* > + * This function exists because you cannot call LNetMDUnlink() from an > + * event handler. > + */ > +static int lnet_peer_push_failed(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + struct lnet_handle_md mdh; > + int rc; > + > + mdh = lp->lp_push_mdh; > + LNetInvalidateMDHandle(&lp->lp_push_mdh); > + lp->lp_state &= ~LNET_PEER_PUSH_FAILED; > + rc = lp->lp_push_error; > + lp->lp_push_error = 0; > + spin_unlock(&lp->lp_lock); > + > + if (!LNetMDHandleIsInvalid(mdh)) > + LNetMDUnlink(mdh); > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + spin_lock(&lp->lp_lock); > + return rc ? rc : LNET_REDISCOVER_PEER; > +} > + > +/* Active side of push. */ > +static int lnet_peer_send_push(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + struct lnet_ping_buffer *pbuf; > + struct lnet_process_id id; > + struct lnet_md md; > + int cpt; > + int rc; > + > + /* Don't push to a non-multi-rail peer. */ > + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) { > + lp->lp_state &= ~LNET_PEER_FORCE_PUSH; > + return 0; > + } > + > + lp->lp_state |= LNET_PEER_PUSH_SENT; > + lp->lp_state &= ~LNET_PEER_FORCE_PUSH; > + spin_unlock(&lp->lp_lock); > + > + cpt = lnet_net_lock_current(); > + pbuf = the_lnet.ln_ping_target; > + lnet_ping_buffer_addref(pbuf); > + lnet_net_unlock(cpt); > + > + /* Push source MD */ > + md.start = &pbuf->pb_info; > + md.length = LNET_PING_INFO_SIZE(pbuf->pb_nnis); > + md.threshold = 2; /* Put/Ack */ > + md.max_size = 0; > + md.options = 0; > + md.eq_handle = the_lnet.ln_dc_eqh; > + md.user_ptr = lp; > + > + rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh); > + if (rc) { > + lnet_ping_buffer_decref(pbuf); > + CERROR("Can't bind push source MD: %d\n", rc); > + goto fail_error; > + } > + cpt = lnet_net_lock_current(); > + /* Refcount for MD. */ > + lnet_peer_addref_locked(lp); > + id.pid = LNET_PID_LUSTRE; > + id.nid = lnet_peer_select_nid(lp); > + lnet_net_unlock(cpt); > + > + if (id.nid == LNET_NID_ANY) { > + rc = -EHOSTUNREACH; > + goto fail_unlink; > + } > + > + rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh, > + LNET_ACK_REQ, id, LNET_RESERVED_PORTAL, > + LNET_PROTO_PING_MATCHBITS, 0, 0); > + > + if (rc) > + goto fail_unlink; > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + > + spin_lock(&lp->lp_lock); > + return 0; > + > +fail_unlink: > + LNetMDUnlink(lp->lp_push_mdh); > + LNetInvalidateMDHandle(&lp->lp_push_mdh); > +fail_error: > + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); > + /* > + * The errors that get us here are considered hard errors and > + * cause Discovery to terminate. So we clear PUSH_SENT, but do > + * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED, > + * because the unlink event handler will have set it if we > + * called LNetMDUnlink() above. > + */ > + spin_lock(&lp->lp_lock); > + lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED); > + return rc; > +} > + > +/* > + * An unrecoverable error was encountered during discovery. > + * Set error status in peer and abort discovery. > + */ > +static void lnet_peer_discovery_error(struct lnet_peer *lp, int error) > +{ > + CDEBUG(D_NET, "Discovery error %s: %d\n", > + libcfs_nid2str(lp->lp_primary_nid), error); > + > + spin_lock(&lp->lp_lock); > + lp->lp_dc_error = error; > + lp->lp_state &= ~LNET_PEER_DISCOVERING; > + lp->lp_state |= LNET_PEER_REDISCOVER; > + spin_unlock(&lp->lp_lock); > +} > + > +/* > + * Mark the peer as discovered. > + */ > +static int lnet_peer_discovered(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + lp->lp_state |= LNET_PEER_DISCOVERED; > + lp->lp_state &= ~(LNET_PEER_DISCOVERING | > + LNET_PEER_REDISCOVER); > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + > + return 0; > +} > + > +/* > + * Mark the peer as to be rediscovered. > + */ > +static int lnet_peer_rediscover(struct lnet_peer *lp) > +__must_hold(&lp->lp_lock) > +{ > + lp->lp_state |= LNET_PEER_REDISCOVER; > + lp->lp_state &= ~LNET_PEER_DISCOVERING; > + > + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); > + > + return 0; > +} > + > +/* > + * Returns the first peer on the ln_dc_working queue if its timeout > + * has expired. Takes the current time as an argument so as to not > + * obsessively re-check the clock. The oldest discovery request will > + * be at the head of the queue. > + */ > +static struct lnet_peer *lnet_peer_dc_timed_out(time64_t now) > +{ > + struct lnet_peer *lp; > + > + if (list_empty(&the_lnet.ln_dc_working)) > + return NULL; > + lp = list_first_entry(&the_lnet.ln_dc_working, > + struct lnet_peer, lp_dc_list); > + if (now < lp->lp_last_queued + DISCOVERY_TIMEOUT) > + return NULL; > + return lp; > +} > + > +/* > + * Discovering this peer is taking too long. Cancel any Ping or Push > + * that discovery is waiting on by unlinking the relevant MDs. The > + * lnet_discovery_event_handler() will proceed from here and complete > + * the cleanup. > + */ > +static void lnet_peer_discovery_timeout(struct lnet_peer *lp) > +{ > + struct lnet_handle_md ping_mdh; > + struct lnet_handle_md push_mdh; > + > + LNetInvalidateMDHandle(&ping_mdh); > + LNetInvalidateMDHandle(&push_mdh); > + > + spin_lock(&lp->lp_lock); > + if (lp->lp_state & LNET_PEER_PING_SENT) { > + ping_mdh = lp->lp_ping_mdh; > + LNetInvalidateMDHandle(&lp->lp_ping_mdh); > + } > + if (lp->lp_state & LNET_PEER_PUSH_SENT) { > + push_mdh = lp->lp_push_mdh; > + LNetInvalidateMDHandle(&lp->lp_push_mdh); > + } > + spin_unlock(&lp->lp_lock); > + > + if (!LNetMDHandleIsInvalid(ping_mdh)) > + LNetMDUnlink(ping_mdh); > + if (!LNetMDHandleIsInvalid(push_mdh)) > + LNetMDUnlink(push_mdh); > +} > + > +/* > + * Wait for work to be queued or some other change that must be > + * attended to. Returns non-zero if the discovery thread should shut > + * down. > + */ > +static int lnet_peer_discovery_wait_for_work(void) > +{ > + int cpt; > + int rc = 0; > + > + DEFINE_WAIT(wait); > + > + cpt = lnet_net_lock_current(); > + for (;;) { > + prepare_to_wait(&the_lnet.ln_dc_waitq, &wait, > + TASK_IDLE); > + if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > + break; > + if (lnet_push_target_resize_needed()) > + break; > + if (!list_empty(&the_lnet.ln_dc_request)) > + break; > + if (lnet_peer_dc_timed_out(ktime_get_real_seconds())) > + break; > + lnet_net_unlock(cpt); > + > + /* > + * wakeup max every second to check if there are peers that > + * have been stuck on the working queue for greater than > + * the peer timeout. > + */ > + schedule_timeout(HZ); > + finish_wait(&the_lnet.ln_dc_waitq, &wait); > + cpt = lnet_net_lock_current(); > + } > + finish_wait(&the_lnet.ln_dc_waitq, &wait); > + > + if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > + rc = -ESHUTDOWN; > + > + lnet_net_unlock(cpt); > + > + CDEBUG(D_NET, "woken: %d\n", rc); > + > + return rc; > +} > + > +/* The discovery thread. */ > +static int lnet_peer_discovery(void *arg) > +{ > + struct lnet_peer *lp; > + time64_t now; > + int rc; > + > + CDEBUG(D_NET, "started\n"); > + > + for (;;) { > + if (lnet_peer_discovery_wait_for_work()) > break; > > if (lnet_push_target_resize_needed()) > @@ -1719,33 +2984,97 @@ static int lnet_peer_discovery(void *arg) > lnet_net_lock(LNET_LOCK_EX); > if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > break; > + > + /* > + * Process all incoming discovery work requests. When > + * discovery must wait on a peer to change state, it > + * is added to the tail of the ln_dc_working queue. A > + * timestamp keeps track of when the peer was added, > + * so we can time out discovery requests that take too > + * long. > + */ > while (!list_empty(&the_lnet.ln_dc_request)) { > lp = list_first_entry(&the_lnet.ln_dc_request, > struct lnet_peer, lp_dc_list); > list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working); > + /* > + * set the time the peer was put on the dc_working > + * queue. It shouldn't remain on the queue > + * forever, in case the GET message (for ping) > + * doesn't get a REPLY or the PUT message (for > + * push) doesn't get an ACK. > + * > + * TODO: LNet Health will deal with this scenario > + * in a generic way. > + */ > + lp->lp_last_queued = ktime_get_real_seconds(); > lnet_net_unlock(LNET_LOCK_EX); > > - /* Just tag and release for now. */ > + /* > + * Select an action depending on the state of > + * the peer and whether discovery is disabled. > + * The check whether discovery is disabled is > + * done after the code that handles processing > + * for arrived data, cleanup for failures, and > + * forcing a Ping or Push. > + */ > spin_lock(&lp->lp_lock); > - if (lnet_peer_discovery_disabled) { > - lp->lp_state |= LNET_PEER_REDISCOVER; > - lp->lp_state &= ~(LNET_PEER_DISCOVERED | > - LNET_PEER_NIDS_UPTODATE | > - LNET_PEER_DISCOVERING); > - } else { > - lp->lp_state |= (LNET_PEER_DISCOVERED | > - LNET_PEER_NIDS_UPTODATE); > - lp->lp_state &= ~(LNET_PEER_REDISCOVER | > - LNET_PEER_DISCOVERING); > - } > + CDEBUG(D_NET, "peer %s state %#x\n", > + libcfs_nid2str(lp->lp_primary_nid), > + lp->lp_state); > + if (lp->lp_state & LNET_PEER_DATA_PRESENT) > + rc = lnet_peer_data_present(lp); > + else if (lp->lp_state & LNET_PEER_PING_FAILED) > + rc = lnet_peer_ping_failed(lp); > + else if (lp->lp_state & LNET_PEER_PUSH_FAILED) > + rc = lnet_peer_push_failed(lp); > + else if (lp->lp_state & LNET_PEER_FORCE_PING) > + rc = lnet_peer_send_ping(lp); > + else if (lp->lp_state & LNET_PEER_FORCE_PUSH) > + rc = lnet_peer_send_push(lp); > + else if (lnet_peer_discovery_disabled) > + rc = lnet_peer_rediscover(lp); > + else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE)) > + rc = lnet_peer_send_ping(lp); > + else if (lnet_peer_needs_push(lp)) > + rc = lnet_peer_send_push(lp); > + else > + rc = lnet_peer_discovered(lp); > + CDEBUG(D_NET, "peer %s state %#x rc %d\n", > + libcfs_nid2str(lp->lp_primary_nid), > + lp->lp_state, rc); > spin_unlock(&lp->lp_lock); > > lnet_net_lock(LNET_LOCK_EX); > + if (rc == LNET_REDISCOVER_PEER) { > + list_move(&lp->lp_dc_list, > + &the_lnet.ln_dc_request); > + } else if (rc) { > + lnet_peer_discovery_error(lp, rc); > + } > if (!(lp->lp_state & LNET_PEER_DISCOVERING)) > lnet_peer_discovery_complete(lp); > if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) > break; > } > + > + /* > + * Now that the ln_dc_request queue has been emptied > + * check the ln_dc_working queue for peers that are > + * taking too long. Move all that are found to the > + * ln_dc_expired queue and time out any pending > + * Ping or Push. We have to drop the lnet_net_lock > + * in the loop because lnet_peer_discovery_timeout() > + * calls LNetMDUnlink(). > + */ > + now = ktime_get_real_seconds(); > + while ((lp = lnet_peer_dc_timed_out(now)) != NULL) { > + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired); > + lnet_net_unlock(LNET_LOCK_EX); > + lnet_peer_discovery_timeout(lp); > + lnet_net_lock(LNET_LOCK_EX); > + } > + > lnet_net_unlock(LNET_LOCK_EX); > } > > @@ -1759,23 +3088,28 @@ static int lnet_peer_discovery(void *arg) > LNetEQFree(the_lnet.ln_dc_eqh); > LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh); > > + /* Queue cleanup 1: stop all pending pings and pushes. */ > lnet_net_lock(LNET_LOCK_EX); > - list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) { > - spin_lock(&lp->lp_lock); > - lp->lp_state |= LNET_PEER_REDISCOVER; > - lp->lp_state &= ~(LNET_PEER_DISCOVERED | > - LNET_PEER_DISCOVERING | > - LNET_PEER_NIDS_UPTODATE); > - spin_unlock(&lp->lp_lock); > - lnet_peer_discovery_complete(lp); > + while (!list_empty(&the_lnet.ln_dc_working)) { > + lp = list_first_entry(&the_lnet.ln_dc_working, > + struct lnet_peer, lp_dc_list); > + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired); > + lnet_net_unlock(LNET_LOCK_EX); > + lnet_peer_discovery_timeout(lp); > + lnet_net_lock(LNET_LOCK_EX); > } > - list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) { > - spin_lock(&lp->lp_lock); > - lp->lp_state |= LNET_PEER_REDISCOVER; > - lp->lp_state &= ~(LNET_PEER_DISCOVERED | > - LNET_PEER_DISCOVERING | > - LNET_PEER_NIDS_UPTODATE); > - spin_unlock(&lp->lp_lock); > + lnet_net_unlock(LNET_LOCK_EX); > + > + /* Queue cleanup 2: wait for the expired queue to clear. */ > + while (!list_empty(&the_lnet.ln_dc_expired)) > + schedule_timeout_uninterruptible(HZ); > + > + /* Queue cleanup 3: clear the request queue. */ > + lnet_net_lock(LNET_LOCK_EX); > + while (!list_empty(&the_lnet.ln_dc_request)) { > + lp = list_first_entry(&the_lnet.ln_dc_request, > + struct lnet_peer, lp_dc_list); > + lnet_peer_discovery_error(lp, -ESHUTDOWN); > lnet_peer_discovery_complete(lp); > } > lnet_net_unlock(LNET_LOCK_EX); > @@ -1797,10 +3131,6 @@ int lnet_peer_discovery_start(void) > if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN) > return -EALREADY; > > - INIT_LIST_HEAD(&the_lnet.ln_dc_request); > - INIT_LIST_HEAD(&the_lnet.ln_dc_working); > - init_waitqueue_head(&the_lnet.ln_dc_waitq); > - > rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh); > if (rc != 0) { > CERROR("Can't allocate discovery EQ: %d\n", rc); > @@ -1819,6 +3149,8 @@ int lnet_peer_discovery_start(void) > the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN; > } > > + CDEBUG(D_NET, "discovery start: %d\n", rc); > + > return rc; > } > > @@ -1837,6 +3169,9 @@ void lnet_peer_discovery_stop(void) > > LASSERT(list_empty(&the_lnet.ln_dc_request)); > LASSERT(list_empty(&the_lnet.ln_dc_working)); > + LASSERT(list_empty(&the_lnet.ln_dc_expired)); > + > + CDEBUG(D_NET, "discovery stopped\n"); > } > > /* Debugging */ > > >
diff --git a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h index 5632e5aadf41..f82a699371f2 100644 --- a/drivers/staging/lustre/include/linux/lnet/lib-lnet.h +++ b/drivers/staging/lustre/include/linux/lnet/lib-lnet.h @@ -76,6 +76,9 @@ extern struct lnet the_lnet; /* THE network */ #define LNET_ACCEPTOR_MIN_RESERVED_PORT 512 #define LNET_ACCEPTOR_MAX_RESERVED_PORT 1023 +/* Discovery timeout - same as default peer_timeout */ +#define DISCOVERY_TIMEOUT 180 + static inline int lnet_is_route_alive(struct lnet_route *route) { /* gateway is down */ @@ -713,9 +716,10 @@ struct lnet_peer_ni *lnet_nid2peerni_ex(lnet_nid_t nid, int cpt); struct lnet_peer_ni *lnet_find_peer_ni_locked(lnet_nid_t nid); void lnet_peer_net_added(struct lnet_net *net); lnet_nid_t lnet_peer_primary_nid_locked(lnet_nid_t nid); -int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt); +int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block); int lnet_peer_discovery_start(void); void lnet_peer_discovery_stop(void); +void lnet_push_update_to_peers(int force); void lnet_peer_tables_cleanup(struct lnet_net *net); void lnet_peer_uninit(void); int lnet_peer_tables_create(void); @@ -805,4 +809,18 @@ lnet_peer_ni_is_primary(struct lnet_peer_ni *lpni) bool lnet_peer_is_uptodate(struct lnet_peer *lp); +static inline bool +lnet_peer_needs_push(struct lnet_peer *lp) +{ + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) + return false; + if (lp->lp_state & LNET_PEER_FORCE_PUSH) + return true; + if (lp->lp_state & LNET_PEER_NO_DISCOVERY) + return false; + if (lp->lp_node_seqno < atomic_read(&the_lnet.ln_ping_target_seqno)) + return true; + return false; +} + #endif diff --git a/drivers/staging/lustre/include/linux/lnet/lib-types.h b/drivers/staging/lustre/include/linux/lnet/lib-types.h index e00c13355d43..07baa86e61ab 100644 --- a/drivers/staging/lustre/include/linux/lnet/lib-types.h +++ b/drivers/staging/lustre/include/linux/lnet/lib-types.h @@ -67,6 +67,13 @@ struct lnet_msg { lnet_nid_t msg_from; __u32 msg_type; + /* + * hold parameters in case message is with held due + * to discovery + */ + lnet_nid_t msg_src_nid_param; + lnet_nid_t msg_rtr_nid_param; + /* committed for sending */ unsigned int msg_tx_committed:1; /* CPT # this message committed for sending */ @@ -395,6 +402,8 @@ struct lnet_ping_buffer { #define LNET_PING_BUFFER_LONI(PBUF) ((PBUF)->pb_info.pi_ni[0].ns_nid) #define LNET_PING_BUFFER_SEQNO(PBUF) ((PBUF)->pb_info.pi_ni[0].ns_status) +#define LNET_PING_INFO_TO_BUFFER(PINFO) \ + container_of((PINFO), struct lnet_ping_buffer, pb_info) /* router checker data, per router */ struct lnet_rc_data { @@ -503,6 +512,9 @@ struct lnet_peer { /* list of peer nets */ struct list_head lp_peer_nets; + /* list of messages pending discovery*/ + struct list_head lp_dc_pendq; + /* primary NID of the peer */ lnet_nid_t lp_primary_nid; @@ -524,15 +536,36 @@ struct lnet_peer { /* buffer for data pushed by peer */ struct lnet_ping_buffer *lp_data; + /* MD handle for ping in progress */ + struct lnet_handle_md lp_ping_mdh; + + /* MD handle for push in progress */ + struct lnet_handle_md lp_push_mdh; + /* number of NIDs for sizing push data */ int lp_data_nnis; /* NI config sequence number of peer */ __u32 lp_peer_seqno; - /* Local NI config sequence number peer knows */ + /* Local NI config sequence number acked by peer */ __u32 lp_node_seqno; + /* Local NI config sequence number sent to peer */ + __u32 lp_node_seqno_sent; + + /* Ping error encountered during discovery. */ + int lp_ping_error; + + /* Push error encountered during discovery. */ + int lp_push_error; + + /* Error encountered during discovery. */ + int lp_dc_error; + + /* time it was put on the ln_dc_working queue */ + time64_t lp_last_queued; + /* link on discovery-related lists */ struct list_head lp_dc_list; @@ -691,6 +724,8 @@ struct lnet_remotenet { #define LNET_CREDIT_OK 0 /** lnet message is waiting for credit */ #define LNET_CREDIT_WAIT 1 +/** lnet message is waiting for discovery */ +#define LNET_DC_WAIT 2 struct lnet_rtrbufpool { struct list_head rbp_bufs; /* my free buffer pool */ @@ -943,6 +978,8 @@ struct lnet { struct list_head ln_dc_request; /* discovery working list */ struct list_head ln_dc_working; + /* discovery expired list */ + struct list_head ln_dc_expired; /* discovery thread wait queue */ wait_queue_head_t ln_dc_waitq; /* discovery startup/shutdown state */ diff --git a/drivers/staging/lustre/lnet/lnet/api-ni.c b/drivers/staging/lustre/lnet/lnet/api-ni.c index e6bc54e9de71..955d1711eda4 100644 --- a/drivers/staging/lustre/lnet/lnet/api-ni.c +++ b/drivers/staging/lustre/lnet/lnet/api-ni.c @@ -41,7 +41,14 @@ #define D_LNI D_CONSOLE -struct lnet the_lnet; /* THE state of the network */ +/* + * initialize ln_api_mutex statically, since it needs to be used in + * discovery_set callback. That module parameter callback can be called + * before module init completes. The mutex needs to be ready for use then. + */ +struct lnet the_lnet = { + .ln_api_mutex = __MUTEX_INITIALIZER(the_lnet.ln_api_mutex), +}; /* THE state of the network */ EXPORT_SYMBOL(the_lnet); static char *ip2nets = ""; @@ -101,7 +108,9 @@ static int discovery_set(const char *val, const struct kernel_param *kp) { int rc; + unsigned int *discovery = (unsigned int *)kp->arg; unsigned long value; + struct lnet_ping_buffer *pbuf; rc = kstrtoul(val, 0, &value); if (rc) { @@ -109,7 +118,38 @@ discovery_set(const char *val, const struct kernel_param *kp) return rc; } - *(unsigned int *)kp->arg = !!value; + value = !!value; + + /* + * The purpose of locking the api_mutex here is to ensure that + * the correct value ends up stored properly. + */ + mutex_lock(&the_lnet.ln_api_mutex); + + if (value == *discovery) { + mutex_unlock(&the_lnet.ln_api_mutex); + return 0; + } + + *discovery = value; + + if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) { + mutex_unlock(&the_lnet.ln_api_mutex); + return 0; + } + + /* tell peers that discovery setting has changed */ + lnet_net_lock(LNET_LOCK_EX); + pbuf = the_lnet.ln_ping_target; + if (value) + pbuf->pb_info.pi_features &= ~LNET_PING_FEAT_DISCOVERY; + else + pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY; + lnet_net_unlock(LNET_LOCK_EX); + + lnet_push_update_to_peers(1); + + mutex_unlock(&the_lnet.ln_api_mutex); return 0; } @@ -171,7 +211,6 @@ lnet_init_locks(void) init_waitqueue_head(&the_lnet.ln_eq_waitq); init_waitqueue_head(&the_lnet.ln_rc_waitq); mutex_init(&the_lnet.ln_lnd_mutex); - mutex_init(&the_lnet.ln_api_mutex); } static int @@ -654,6 +693,10 @@ lnet_prepare(lnet_pid_t requested_pid) INIT_LIST_HEAD(&the_lnet.ln_routers); INIT_LIST_HEAD(&the_lnet.ln_drop_rules); INIT_LIST_HEAD(&the_lnet.ln_delay_rules); + INIT_LIST_HEAD(&the_lnet.ln_dc_request); + INIT_LIST_HEAD(&the_lnet.ln_dc_working); + INIT_LIST_HEAD(&the_lnet.ln_dc_expired); + init_waitqueue_head(&the_lnet.ln_dc_waitq); rc = lnet_create_remote_nets_table(); if (rc) @@ -998,7 +1041,8 @@ lnet_ping_target_create(int nnis) pbuf->pb_info.pi_nnis = nnis; pbuf->pb_info.pi_pid = the_lnet.ln_pid; pbuf->pb_info.pi_magic = LNET_PROTO_PING_MAGIC; - pbuf->pb_info.pi_features = LNET_PING_FEAT_NI_STATUS; + pbuf->pb_info.pi_features = + LNET_PING_FEAT_NI_STATUS | LNET_PING_FEAT_MULTI_RAIL; return pbuf; } @@ -1231,6 +1275,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf, if (!the_lnet.ln_routing) pbuf->pb_info.pi_features |= LNET_PING_FEAT_RTE_DISABLED; + if (!lnet_peer_discovery_disabled) + pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY; /* Ensure only known feature bits have been set. */ LASSERT(pbuf->pb_info.pi_features & LNET_PING_FEAT_BITS); @@ -1252,6 +1298,8 @@ lnet_ping_target_update(struct lnet_ping_buffer *pbuf, lnet_ping_md_unlink(old_pbuf, &old_ping_md); lnet_ping_buffer_decref(old_pbuf); } + + lnet_push_update_to_peers(0); } static void @@ -1353,6 +1401,7 @@ static void lnet_push_target_event_handler(struct lnet_event *ev) if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) lnet_swap_pinginfo(pbuf); + lnet_peer_push_event(ev); if (ev->unlinked) lnet_ping_buffer_decref(pbuf); } @@ -1910,8 +1959,6 @@ int lnet_lib_init(void) lnet_assert_wire_constants(); - memset(&the_lnet, 0, sizeof(the_lnet)); - /* refer to global cfs_cpt_tab for now */ the_lnet.ln_cpt_table = cfs_cpt_tab; the_lnet.ln_cpt_number = cfs_cpt_number(cfs_cpt_tab); diff --git a/drivers/staging/lustre/lnet/lnet/lib-move.c b/drivers/staging/lustre/lnet/lnet/lib-move.c index 4773180cc7b3..2ff329bf91ba 100644 --- a/drivers/staging/lustre/lnet/lnet/lib-move.c +++ b/drivers/staging/lustre/lnet/lnet/lib-move.c @@ -444,6 +444,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target, memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr)); msg->msg_hdr.type = cpu_to_le32(type); + /* dest_nid will be overwritten by lnet_select_pathway() */ + msg->msg_hdr.dest_nid = cpu_to_le64(target.nid); msg->msg_hdr.dest_pid = cpu_to_le32(target.pid); /* src_nid will be set later */ msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid); @@ -1292,7 +1294,7 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, */ peer = lpni->lpni_peer_net->lpn_peer; if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) { - rc = lnet_discover_peer_locked(lpni, cpt); + rc = lnet_discover_peer_locked(lpni, cpt, false); if (rc) { lnet_peer_ni_decref_locked(lpni); lnet_net_unlock(cpt); @@ -1300,6 +1302,18 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, } /* The peer may have changed. */ peer = lpni->lpni_peer_net->lpn_peer; + /* queue message and return */ + msg->msg_src_nid_param = src_nid; + msg->msg_rtr_nid_param = rtr_nid; + msg->msg_sending = 0; + list_add_tail(&msg->msg_list, &peer->lp_dc_pendq); + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(cpt); + + CDEBUG(D_NET, "%s pending discovery\n", + libcfs_nid2str(peer->lp_primary_nid)); + + return LNET_DC_WAIT; } lnet_peer_ni_decref_locked(lpni); @@ -1840,7 +1854,7 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) if (rc == LNET_CREDIT_OK) lnet_ni_send(msg->msg_txni, msg); - /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */ + /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */ return 0; } diff --git a/drivers/staging/lustre/lnet/lnet/peer.c b/drivers/staging/lustre/lnet/lnet/peer.c index b78f99c354de..1ef4a44e752e 100644 --- a/drivers/staging/lustre/lnet/lnet/peer.c +++ b/drivers/staging/lustre/lnet/lnet/peer.c @@ -38,6 +38,11 @@ #include <linux/lnet/lib-lnet.h> #include <uapi/linux/lnet/lnet-dlc.h> +/* Value indicating that recovery needs to re-check a peer immediately. */ +#define LNET_REDISCOVER_PEER (1) + +static int lnet_peer_queue_for_discovery(struct lnet_peer *lp); + static void lnet_peer_remove_from_remote_list(struct lnet_peer_ni *lpni) { @@ -202,6 +207,7 @@ lnet_peer_alloc(lnet_nid_t nid) INIT_LIST_HEAD(&lp->lp_peer_list); INIT_LIST_HEAD(&lp->lp_peer_nets); INIT_LIST_HEAD(&lp->lp_dc_list); + INIT_LIST_HEAD(&lp->lp_dc_pendq); init_waitqueue_head(&lp->lp_dc_waitq); spin_lock_init(&lp->lp_lock); lp->lp_primary_nid = nid; @@ -220,6 +226,10 @@ lnet_destroy_peer_locked(struct lnet_peer *lp) LASSERT(atomic_read(&lp->lp_refcount) == 0); LASSERT(list_empty(&lp->lp_peer_nets)); LASSERT(list_empty(&lp->lp_peer_list)); + LASSERT(list_empty(&lp->lp_dc_list)); + + if (lp->lp_data) + lnet_ping_buffer_decref(lp->lp_data); kfree(lp); } @@ -260,10 +270,19 @@ lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni) /* * If there are no more peer nets, make the peer unfindable * via the peer_tables. + * + * Otherwise, if the peer is DISCOVERED, tell discovery to + * take another look at it. This is a no-op if discovery for + * this peer did the detaching. */ if (list_empty(&lp->lp_peer_nets)) { list_del_init(&lp->lp_peer_list); ptable->pt_peers--; + } else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) { + /* Discovery isn't running, nothing to do here. */ + } else if (lp->lp_state & LNET_PEER_DISCOVERED) { + lnet_peer_queue_for_discovery(lp); + wake_up(&the_lnet.ln_dc_waitq); } CDEBUG(D_NET, "peer %s NID %s\n", libcfs_nid2str(lp->lp_primary_nid), @@ -599,6 +618,25 @@ lnet_find_peer_ni_locked(lnet_nid_t nid) return lpni; } +struct lnet_peer * +lnet_find_peer(lnet_nid_t nid) +{ + struct lnet_peer_ni *lpni; + struct lnet_peer *lp = NULL; + int cpt; + + cpt = lnet_net_lock_current(); + lpni = lnet_find_peer_ni_locked(nid); + if (lpni) { + lp = lpni->lpni_peer_net->lpn_peer; + lnet_peer_addref_locked(lp); + lnet_peer_ni_decref_locked(lpni); + } + lnet_net_unlock(cpt); + + return lp; +} + struct lnet_peer_ni * lnet_get_peer_ni_idx_locked(int idx, struct lnet_peer_net **lpn, struct lnet_peer **lp) @@ -696,6 +734,37 @@ lnet_get_next_peer_ni_locked(struct lnet_peer *peer, return lpni; } +/* + * Start pushes to peers that need to be updated for a configuration + * change on this node. + */ +void +lnet_push_update_to_peers(int force) +{ + struct lnet_peer_table *ptable; + struct lnet_peer *lp; + int lncpt; + int cpt; + + lnet_net_lock(LNET_LOCK_EX); + lncpt = cfs_percpt_number(the_lnet.ln_peer_tables); + for (cpt = 0; cpt < lncpt; cpt++) { + ptable = the_lnet.ln_peer_tables[cpt]; + list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) { + if (force) { + spin_lock(&lp->lp_lock); + if (lp->lp_state & LNET_PEER_MULTI_RAIL) + lp->lp_state |= LNET_PEER_FORCE_PUSH; + spin_unlock(&lp->lp_lock); + } + if (lnet_peer_needs_push(lp)) + lnet_peer_queue_for_discovery(lp); + } + } + lnet_net_unlock(LNET_LOCK_EX); + wake_up(&the_lnet.ln_dc_waitq); +} + /* * Test whether a ni is a preferred ni for this peer_ni, e.g, whether * this is a preferred point-to-point path. Call with lnet_net_lock in @@ -941,6 +1010,7 @@ lnet_peer_primary_nid_locked(lnet_nid_t nid) lnet_nid_t LNetPrimaryNID(lnet_nid_t nid) { + struct lnet_peer *lp; struct lnet_peer_ni *lpni; lnet_nid_t primary_nid = nid; int rc = 0; @@ -952,7 +1022,15 @@ LNetPrimaryNID(lnet_nid_t nid) rc = PTR_ERR(lpni); goto out_unlock; } - primary_nid = lpni->lpni_peer_net->lpn_peer->lp_primary_nid; + lp = lpni->lpni_peer_net->lpn_peer; + while (!lnet_peer_is_uptodate(lp)) { + rc = lnet_discover_peer_locked(lpni, cpt, true); + if (rc) + goto out_decref; + lp = lpni->lpni_peer_net->lpn_peer; + } + primary_nid = lp->lp_primary_nid; +out_decref: lnet_peer_ni_decref_locked(lpni); out_unlock: lnet_net_unlock(cpt); @@ -1229,6 +1307,30 @@ lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned int flags) return rc; } +/* + * Update the primary NID of a peer, if possible. + * + * Call with the lnet_api_mutex held. + */ +static int +lnet_peer_set_primary_nid(struct lnet_peer *lp, lnet_nid_t nid, + unsigned int flags) +{ + lnet_nid_t old = lp->lp_primary_nid; + int rc = 0; + + if (lp->lp_primary_nid == nid) + goto out; + rc = lnet_peer_add_nid(lp, nid, flags); + if (rc) + goto out; + lp->lp_primary_nid = nid; +out: + CDEBUG(D_NET, "peer %s NID %s: %d\n", + libcfs_nid2str(old), libcfs_nid2str(nid), rc); + return rc; +} + /* * lpni creation initiated due to traffic either sending or receiving. */ @@ -1548,11 +1650,15 @@ lnet_peer_is_uptodate(struct lnet_peer *lp) LNET_PEER_FORCE_PING | LNET_PEER_FORCE_PUSH)) { rc = false; + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { + rc = true; } else if (lp->lp_state & LNET_PEER_REDISCOVER) { if (lnet_peer_discovery_disabled) rc = true; else rc = false; + } else if (lnet_peer_needs_push(lp)) { + rc = false; } else if (lp->lp_state & LNET_PEER_DISCOVERED) { if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) rc = true; @@ -1588,6 +1694,9 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp) rc = -EALREADY; } + CDEBUG(D_NET, "Queue peer %s: %d\n", + libcfs_nid2str(lp->lp_primary_nid), rc); + return rc; } @@ -1597,9 +1706,252 @@ static int lnet_peer_queue_for_discovery(struct lnet_peer *lp) */ static void lnet_peer_discovery_complete(struct lnet_peer *lp) { + struct lnet_msg *msg = NULL; + int rc = 0; + struct list_head pending_msgs; + + INIT_LIST_HEAD(&pending_msgs); + + CDEBUG(D_NET, "Discovery complete. Dequeue peer %s\n", + libcfs_nid2str(lp->lp_primary_nid)); + list_del_init(&lp->lp_dc_list); + list_splice_init(&lp->lp_dc_pendq, &pending_msgs); wake_up_all(&lp->lp_dc_waitq); + + lnet_net_unlock(LNET_LOCK_EX); + + /* iterate through all pending messages and send them again */ + list_for_each_entry(msg, &pending_msgs, msg_list) { + if (lp->lp_dc_error) { + lnet_finalize(msg, lp->lp_dc_error); + continue; + } + + CDEBUG(D_NET, "sending pending message %s to target %s\n", + lnet_msgtyp2str(msg->msg_type), + libcfs_id2str(msg->msg_target)); + rc = lnet_send(msg->msg_src_nid_param, msg, + msg->msg_rtr_nid_param); + if (rc < 0) { + CNETERR("Error sending %s to %s: %d\n", + lnet_msgtyp2str(msg->msg_type), + libcfs_id2str(msg->msg_target), rc); + lnet_finalize(msg, rc); + } + } + lnet_net_lock(LNET_LOCK_EX); + lnet_peer_decref_locked(lp); +} + +/* + * Handle inbound push. + * Like any event handler, called with lnet_res_lock/CPT held. + */ +void lnet_peer_push_event(struct lnet_event *ev) +{ + struct lnet_ping_buffer *pbuf = ev->md.user_ptr; + struct lnet_peer *lp; + + /* lnet_find_peer() adds a refcount */ + lp = lnet_find_peer(ev->source.nid); + if (!lp) { + CERROR("Push Put from unknown %s (source %s)\n", + libcfs_nid2str(ev->initiator.nid), + libcfs_nid2str(ev->source.nid)); + return; + } + + /* Ensure peer state remains consistent while we modify it. */ + spin_lock(&lp->lp_lock); + + /* + * If some kind of error happened the contents of the message + * cannot be used. Clear the NIDS_UPTODATE and set the + * FORCE_PING flag to trigger a ping. + */ + if (ev->status) { + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; + lp->lp_state |= LNET_PEER_FORCE_PING; + CDEBUG(D_NET, "Push Put error %d from %s (source %s)\n", + ev->status, + libcfs_nid2str(lp->lp_primary_nid), + libcfs_nid2str(ev->source.nid)); + goto out; + } + + /* + * A push with invalid or corrupted info. Clear the UPTODATE + * flag to trigger a ping. + */ + if (lnet_ping_info_validate(&pbuf->pb_info)) { + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; + lp->lp_state |= LNET_PEER_FORCE_PING; + CDEBUG(D_NET, "Corrupted Push from %s\n", + libcfs_nid2str(lp->lp_primary_nid)); + goto out; + } + + /* + * Make sure we'll allocate the correct size ping buffer when + * pinging the peer. + */ + if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis) + lp->lp_data_nnis = pbuf->pb_info.pi_nnis; + + /* + * A non-Multi-Rail peer is not supposed to be capable of + * sending a push. + */ + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)) { + CERROR("Push from non-Multi-Rail peer %s dropped\n", + libcfs_nid2str(lp->lp_primary_nid)); + goto out; + } + + /* + * Check the MULTIRAIL flag. Complain if the peer was DLC + * configured without it. + */ + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) { + if (lp->lp_state & LNET_PEER_CONFIGURED) { + CERROR("Push says %s is Multi-Rail, DLC says not\n", + libcfs_nid2str(lp->lp_primary_nid)); + } else { + lp->lp_state |= LNET_PEER_MULTI_RAIL; + lnet_peer_clr_non_mr_pref_nids(lp); + } + } + + /* + * The peer may have discovery disabled at its end. Set + * NO_DISCOVERY as appropriate. + */ + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) { + CDEBUG(D_NET, "Peer %s has discovery disabled\n", + libcfs_nid2str(lp->lp_primary_nid)); + lp->lp_state |= LNET_PEER_NO_DISCOVERY; + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { + CDEBUG(D_NET, "Peer %s has discovery enabled\n", + libcfs_nid2str(lp->lp_primary_nid)); + lp->lp_state &= ~LNET_PEER_NO_DISCOVERY; + } + + /* + * Check for truncation of the Put message. Clear the + * NIDS_UPTODATE flag and set FORCE_PING to trigger a ping, + * and tell discovery to allocate a bigger buffer. + */ + if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) { + if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis) + the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis; + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; + lp->lp_state |= LNET_PEER_FORCE_PING; + CDEBUG(D_NET, "Truncated Push from %s (%d nids)\n", + libcfs_nid2str(lp->lp_primary_nid), + pbuf->pb_info.pi_nnis); + goto out; + } + + /* + * Check whether the Put data is stale. Stale data can just be + * dropped. + */ + if (pbuf->pb_info.pi_nnis > 1 && + lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid && + LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) { + CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf), + lp->lp_peer_seqno); + goto out; + } + + /* + * Check whether the Put data is new, in which case we clear + * the UPTODATE flag and prepare to process it. + * + * If the Put data is current, and the peer is UPTODATE then + * we assome everything is all right and drop the data as + * stale. + */ + if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) { + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; + } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) { + CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf), + lp->lp_peer_seqno); + goto out; + } + + /* + * If there is data present that hasn't been processed yet, + * we'll replace it if the Put contained newer data and it + * fits. We're racing with a Ping or earlier Push in this + * case. + */ + if (lp->lp_state & LNET_PEER_DATA_PRESENT) { + if (LNET_PING_BUFFER_SEQNO(pbuf) > + LNET_PING_BUFFER_SEQNO(lp->lp_data) && + pbuf->pb_info.pi_nnis <= lp->lp_data->pb_nnis) { + memcpy(&lp->lp_data->pb_info, &pbuf->pb_info, + LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis)); + CDEBUG(D_NET, "Ping/Push race from %s: %u vs %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf), + LNET_PING_BUFFER_SEQNO(lp->lp_data)); + } + goto out; + } + + /* + * Allocate a buffer to copy the data. On a failure we drop + * the Push and set FORCE_PING to force the discovery + * thread to fix the problem by pinging the peer. + */ + lp->lp_data = lnet_ping_buffer_alloc(lp->lp_data_nnis, GFP_ATOMIC); + if (!lp->lp_data) { + lp->lp_state |= LNET_PEER_FORCE_PING; + CDEBUG(D_NET, "Cannot allocate Push buffer for %s %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf)); + goto out; + } + + /* Success */ + memcpy(&lp->lp_data->pb_info, &pbuf->pb_info, + LNET_PING_INFO_SIZE(pbuf->pb_info.pi_nnis)); + lp->lp_state |= LNET_PEER_DATA_PRESENT; + CDEBUG(D_NET, "Received Push %s %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf)); + +out: + /* + * Queue the peer for discovery, and wake the discovery thread + * if the peer was already queued, because its status changed. + */ + spin_unlock(&lp->lp_lock); + lnet_net_lock(LNET_LOCK_EX); + if (lnet_peer_queue_for_discovery(lp)) + wake_up(&the_lnet.ln_dc_waitq); + /* Drop refcount from lookup */ lnet_peer_decref_locked(lp); + lnet_net_unlock(LNET_LOCK_EX); +} + +/* + * Clear the discovery error state, unless we're already discovering + * this peer, in which case the error is current. + */ +static void lnet_peer_clear_discovery_error(struct lnet_peer *lp) +{ + spin_lock(&lp->lp_lock); + if (!(lp->lp_state & LNET_PEER_DISCOVERING)) + lp->lp_dc_error = 0; + spin_unlock(&lp->lp_lock); } /* @@ -1608,7 +1960,7 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp) * because discovery could tear down an lnet_peer. */ int -lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) +lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt, bool block) { DEFINE_WAIT(wait); struct lnet_peer *lp; @@ -1617,25 +1969,40 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) again: lnet_net_unlock(cpt); lnet_net_lock(LNET_LOCK_EX); + lp = lpni->lpni_peer_net->lpn_peer; + lnet_peer_clear_discovery_error(lp); - /* We're willing to be interrupted. */ + /* + * We're willing to be interrupted. The lpni can become a + * zombie if we race with DLC, so we must check for that. + */ for (;;) { - lp = lpni->lpni_peer_net->lpn_peer; prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE); if (signal_pending(current)) break; if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) break; + if (lp->lp_dc_error) + break; if (lnet_peer_is_uptodate(lp)) break; lnet_peer_queue_for_discovery(lp); lnet_peer_addref_locked(lp); + /* + * if caller requested a non-blocking operation then + * return immediately. Once discovery is complete then the + * peer ref will be decremented and any pending messages + * that were stopped due to discovery will be transmitted. + */ + if (!block) + break; lnet_net_unlock(LNET_LOCK_EX); schedule(); finish_wait(&lp->lp_dc_waitq, &wait); lnet_net_lock(LNET_LOCK_EX); lnet_peer_decref_locked(lp); - /* Do not use lp beyond this point. */ + /* Peer may have changed */ + lp = lpni->lpni_peer_net->lpn_peer; } finish_wait(&lp->lp_dc_waitq, &wait); @@ -1646,71 +2013,969 @@ lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt) rc = -EINTR; else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING) rc = -ESHUTDOWN; + else if (lp->lp_dc_error) + rc = lp->lp_dc_error; + else if (!block) + CDEBUG(D_NET, "non-blocking discovery\n"); else if (!lnet_peer_is_uptodate(lp)) goto again; + CDEBUG(D_NET, "peer %s NID %s: %d. %s\n", + (lp ? libcfs_nid2str(lp->lp_primary_nid) : "(none)"), + libcfs_nid2str(lpni->lpni_nid), rc, + (!block) ? "pending discovery" : "discovery complete"); + return rc; } -/* - * Event handler for the discovery EQ. - * - * Called with lnet_res_lock(cpt) held. The cpt is the - * lnet_cpt_of_cookie() of the md handle cookie. - */ -static void lnet_discovery_event_handler(struct lnet_event *event) +/* Handle an incoming ack for a push. */ +static void +lnet_discovery_event_ack(struct lnet_peer *lp, struct lnet_event *ev) { - wake_up(&the_lnet.ln_dc_waitq); + struct lnet_ping_buffer *pbuf; + + pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start); + spin_lock(&lp->lp_lock); + lp->lp_state &= ~LNET_PEER_PUSH_SENT; + lp->lp_push_error = ev->status; + if (ev->status) + lp->lp_state |= LNET_PEER_PUSH_FAILED; + else + lp->lp_node_seqno = LNET_PING_BUFFER_SEQNO(pbuf); + spin_unlock(&lp->lp_lock); + + CDEBUG(D_NET, "peer %s ev->status %d\n", + libcfs_nid2str(lp->lp_primary_nid), ev->status); } -/* - * Wait for work to be queued or some other change that must be - * attended to. Returns non-zero if the discovery thread should shut - * down. - */ -static int lnet_peer_discovery_wait_for_work(void) +/* Handle a Reply message. This is the reply to a Ping message. */ +static void +lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev) { - int cpt; - int rc = 0; + struct lnet_ping_buffer *pbuf; + int rc; - DEFINE_WAIT(wait); + spin_lock(&lp->lp_lock); - cpt = lnet_net_lock_current(); - for (;;) { - prepare_to_wait(&the_lnet.ln_dc_waitq, &wait, - TASK_IDLE); - if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) - break; - if (lnet_push_target_resize_needed()) - break; - if (!list_empty(&the_lnet.ln_dc_request)) - break; - lnet_net_unlock(cpt); - schedule(); - finish_wait(&the_lnet.ln_dc_waitq, &wait); - cpt = lnet_net_lock_current(); + /* + * If some kind of error happened the contents of message + * cannot be used. Set PING_FAILED to trigger a retry. + */ + if (ev->status) { + lp->lp_state |= LNET_PEER_PING_FAILED; + lp->lp_ping_error = ev->status; + CDEBUG(D_NET, "Ping Reply error %d from %s (source %s)\n", + ev->status, + libcfs_nid2str(lp->lp_primary_nid), + libcfs_nid2str(ev->source.nid)); + goto out; } - finish_wait(&the_lnet.ln_dc_waitq, &wait); - - if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) - rc = -ESHUTDOWN; - lnet_net_unlock(cpt); + pbuf = LNET_PING_INFO_TO_BUFFER(ev->md.start); + if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) + lnet_swap_pinginfo(pbuf); - CDEBUG(D_NET, "woken: %d\n", rc); + /* + * A reply with invalid or corrupted info. Set PING_FAILED to + * trigger a retry. + */ + rc = lnet_ping_info_validate(&pbuf->pb_info); + if (rc) { + lp->lp_state |= LNET_PEER_PING_FAILED; + lp->lp_ping_error = 0; + CDEBUG(D_NET, "Corrupted Ping Reply from %s: %d\n", + libcfs_nid2str(lp->lp_primary_nid), rc); + goto out; + } - return rc; -} + /* + * Update the MULTI_RAIL flag based on the reply. If the peer + * was configured with DLC then the setting should match what + * DLC put in. + */ + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) { + if (lp->lp_state & LNET_PEER_MULTI_RAIL) { + /* Everything's fine */ + } else if (lp->lp_state & LNET_PEER_CONFIGURED) { + CWARN("Reply says %s is Multi-Rail, DLC says not\n", + libcfs_nid2str(lp->lp_primary_nid)); + } else { + lp->lp_state |= LNET_PEER_MULTI_RAIL; + lnet_peer_clr_non_mr_pref_nids(lp); + } + } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) { + if (lp->lp_state & LNET_PEER_CONFIGURED) { + CWARN("DLC says %s is Multi-Rail, Reply says not\n", + libcfs_nid2str(lp->lp_primary_nid)); + } else { + CERROR("Multi-Rail state vanished from %s\n", + libcfs_nid2str(lp->lp_primary_nid)); + lp->lp_state &= ~LNET_PEER_MULTI_RAIL; + } + } -/* The discovery thread. */ -static int lnet_peer_discovery(void *arg) -{ - struct lnet_peer *lp; + /* + * Make sure we'll allocate the correct size ping buffer when + * pinging the peer. + */ + if (lp->lp_data_nnis < pbuf->pb_info.pi_nnis) + lp->lp_data_nnis = pbuf->pb_info.pi_nnis; - CDEBUG(D_NET, "started\n"); + /* + * The peer may have discovery disabled at its end. Set + * NO_DISCOVERY as appropriate. + */ + if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) { + CDEBUG(D_NET, "Peer %s has discovery disabled\n", + libcfs_nid2str(lp->lp_primary_nid)); + lp->lp_state |= LNET_PEER_NO_DISCOVERY; + } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) { + CDEBUG(D_NET, "Peer %s has discovery enabled\n", + libcfs_nid2str(lp->lp_primary_nid)); + lp->lp_state &= ~LNET_PEER_NO_DISCOVERY; + } - for (;;) { - if (lnet_peer_discovery_wait_for_work()) + /* + * Check for truncation of the Reply. Clear PING_SENT and set + * PING_FAILED to trigger a retry. + */ + if (pbuf->pb_nnis < pbuf->pb_info.pi_nnis) { + if (the_lnet.ln_push_target_nnis < pbuf->pb_info.pi_nnis) + the_lnet.ln_push_target_nnis = pbuf->pb_info.pi_nnis; + lp->lp_state |= LNET_PEER_PING_FAILED; + lp->lp_ping_error = 0; + CDEBUG(D_NET, "Truncated Reply from %s (%d nids)\n", + libcfs_nid2str(lp->lp_primary_nid), + pbuf->pb_info.pi_nnis); + goto out; + } + + /* + * Check the sequence numbers in the reply. These are only + * available if the reply came from a Multi-Rail peer. + */ + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL && + pbuf->pb_info.pi_nnis > 1 && + lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) { + if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) { + CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n", + libcfs_nid2str(lp->lp_primary_nid), + LNET_PING_BUFFER_SEQNO(pbuf), + lp->lp_peer_seqno); + goto out; + } + + if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); + } + + /* We're happy with the state of the data in the buffer. */ + CDEBUG(D_NET, "peer %s data present %u\n", + libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno); + if (lp->lp_state & LNET_PEER_DATA_PRESENT) + lnet_ping_buffer_decref(lp->lp_data); + else + lp->lp_state |= LNET_PEER_DATA_PRESENT; + lnet_ping_buffer_addref(pbuf); + lp->lp_data = pbuf; +out: + lp->lp_state &= ~LNET_PEER_PING_SENT; + spin_unlock(&lp->lp_lock); +} + +/* + * Send event handling. Only matters for error cases, where we clean + * up state on the peer and peer_ni that would otherwise be updated in + * the REPLY event handler for a successful Ping, and the ACK event + * handler for a successful Push. + */ +static int +lnet_discovery_event_send(struct lnet_peer *lp, struct lnet_event *ev) +{ + int rc = 0; + + if (!ev->status) + goto out; + + spin_lock(&lp->lp_lock); + if (ev->msg_type == LNET_MSG_GET) { + lp->lp_state &= ~LNET_PEER_PING_SENT; + lp->lp_state |= LNET_PEER_PING_FAILED; + lp->lp_ping_error = ev->status; + } else { /* ev->msg_type == LNET_MSG_PUT */ + lp->lp_state &= ~LNET_PEER_PUSH_SENT; + lp->lp_state |= LNET_PEER_PUSH_FAILED; + lp->lp_push_error = ev->status; + } + spin_unlock(&lp->lp_lock); + rc = LNET_REDISCOVER_PEER; +out: + CDEBUG(D_NET, "%s Send to %s: %d\n", + (ev->msg_type == LNET_MSG_GET ? "Ping" : "Push"), + libcfs_nid2str(ev->target.nid), rc); + return rc; +} + +/* + * Unlink event handling. This event is only seen if a call to + * LNetMDUnlink() caused the event to be unlinked. If this call was + * made after the event was set up in LNetGet() or LNetPut() then we + * assume the Ping or Push timed out. + */ +static void +lnet_discovery_event_unlink(struct lnet_peer *lp, struct lnet_event *ev) +{ + spin_lock(&lp->lp_lock); + /* We've passed through LNetGet() */ + if (lp->lp_state & LNET_PEER_PING_SENT) { + lp->lp_state &= ~LNET_PEER_PING_SENT; + lp->lp_state |= LNET_PEER_PING_FAILED; + lp->lp_ping_error = -ETIMEDOUT; + CDEBUG(D_NET, "Ping Unlink for message to peer %s\n", + libcfs_nid2str(lp->lp_primary_nid)); + } + /* We've passed through LNetPut() */ + if (lp->lp_state & LNET_PEER_PUSH_SENT) { + lp->lp_state &= ~LNET_PEER_PUSH_SENT; + lp->lp_state |= LNET_PEER_PUSH_FAILED; + lp->lp_push_error = -ETIMEDOUT; + CDEBUG(D_NET, "Push Unlink for message to peer %s\n", + libcfs_nid2str(lp->lp_primary_nid)); + } + spin_unlock(&lp->lp_lock); +} + +/* + * Event handler for the discovery EQ. + * + * Called with lnet_res_lock(cpt) held. The cpt is the + * lnet_cpt_of_cookie() of the md handle cookie. + */ +static void lnet_discovery_event_handler(struct lnet_event *event) +{ + struct lnet_peer *lp = event->md.user_ptr; + struct lnet_ping_buffer *pbuf; + int rc; + + /* discovery needs to take another look */ + rc = LNET_REDISCOVER_PEER; + + CDEBUG(D_NET, "Received event: %d\n", event->type); + + switch (event->type) { + case LNET_EVENT_ACK: + lnet_discovery_event_ack(lp, event); + break; + case LNET_EVENT_REPLY: + lnet_discovery_event_reply(lp, event); + break; + case LNET_EVENT_SEND: + /* Only send failure triggers a retry. */ + rc = lnet_discovery_event_send(lp, event); + break; + case LNET_EVENT_UNLINK: + /* LNetMDUnlink() was called */ + lnet_discovery_event_unlink(lp, event); + break; + default: + /* Invalid events. */ + LBUG(); + } + lnet_net_lock(LNET_LOCK_EX); + if (event->unlinked) { + pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start); + lnet_ping_buffer_decref(pbuf); + lnet_peer_decref_locked(lp); + } + if (rc == LNET_REDISCOVER_PEER) { + list_move_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request); + wake_up(&the_lnet.ln_dc_waitq); + } + lnet_net_unlock(LNET_LOCK_EX); +} + +/* + * Build a peer from incoming data. + * + * The NIDs in the incoming data are supposed to be structured as follows: + * - loopback + * - primary NID + * - other NIDs in same net + * - NIDs in second net + * - NIDs in third net + * - ... + * This due to the way the list of NIDs in the data is created. + * + * Note that this function will mark the peer uptodate unless an + * ENOMEM is encontered. All other errors are due to a conflict + * between the DLC configuration and what discovery sees. We treat DLC + * as binding, and therefore set the NIDS_UPTODATE flag to prevent the + * peer from becoming stuck in discovery. + */ +static int lnet_peer_merge_data(struct lnet_peer *lp, + struct lnet_ping_buffer *pbuf) +{ + struct lnet_peer_ni *lpni; + lnet_nid_t *curnis = NULL; + lnet_nid_t *addnis = NULL; + lnet_nid_t *delnis = NULL; + unsigned int flags; + int ncurnis; + int naddnis; + int ndelnis; + int nnis = 0; + int i; + int j; + int rc; + + flags = LNET_PEER_DISCOVERED; + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) + flags |= LNET_PEER_MULTI_RAIL; + + nnis = max_t(int, lp->lp_nnis, pbuf->pb_info.pi_nnis); + curnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); + addnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); + delnis = kmalloc_array(nnis, sizeof(lnet_nid_t), GFP_NOFS); + if (!curnis || !addnis || !delnis) { + rc = -ENOMEM; + goto out; + } + ncurnis = 0; + naddnis = 0; + ndelnis = 0; + + /* Construct the list of NIDs present in peer. */ + lpni = NULL; + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) + curnis[ncurnis++] = lpni->lpni_nid; + + /* + * Check for NIDs in pbuf not present in curnis[]. + * The loop starts at 1 to skip the loopback NID. + */ + for (i = 1; i < pbuf->pb_info.pi_nnis; i++) { + for (j = 0; j < ncurnis; j++) + if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j]) + break; + if (j == ncurnis) + addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid; + } + /* + * Check for NIDs in curnis[] not present in pbuf. + * The nested loop starts at 1 to skip the loopback NID. + * + * But never add the loopback NID to delnis[]: if it is + * present in curnis[] then this peer is for this node. + */ + for (i = 0; i < ncurnis; i++) { + if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND) + continue; + for (j = 1; j < pbuf->pb_info.pi_nnis; j++) + if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid) + break; + if (j == pbuf->pb_info.pi_nnis) + delnis[ndelnis++] = curnis[i]; + } + + for (i = 0; i < naddnis; i++) { + rc = lnet_peer_add_nid(lp, addnis[i], flags); + if (rc) { + CERROR("Error adding NID %s to peer %s: %d\n", + libcfs_nid2str(addnis[i]), + libcfs_nid2str(lp->lp_primary_nid), rc); + if (rc == -ENOMEM) + goto out; + } + } + for (i = 0; i < ndelnis; i++) { + rc = lnet_peer_del_nid(lp, delnis[i], flags); + if (rc) { + CERROR("Error deleting NID %s from peer %s: %d\n", + libcfs_nid2str(delnis[i]), + libcfs_nid2str(lp->lp_primary_nid), rc); + if (rc == -ENOMEM) + goto out; + } + } + /* + * Errors other than -ENOMEM are due to peers having been + * configured with DLC. Ignore these because DLC overrides + * Discovery. + */ + rc = 0; +out: + kfree(curnis); + kfree(addnis); + kfree(delnis); + lnet_ping_buffer_decref(pbuf); + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); + + if (rc) { + spin_lock(&lp->lp_lock); + lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE; + lp->lp_state |= LNET_PEER_FORCE_PING; + spin_unlock(&lp->lp_lock); + } + return rc; +} + +/* + * The data in pbuf says lp is its primary peer, but the data was + * received by a different peer. Try to update lp with the data. + */ +static int +lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf) +{ + struct lnet_handle_md mdh; + + /* Queue lp for discovery, and force it on the request queue. */ + lnet_net_lock(LNET_LOCK_EX); + if (lnet_peer_queue_for_discovery(lp)) + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_request); + lnet_net_unlock(LNET_LOCK_EX); + + LNetInvalidateMDHandle(&mdh); + + /* + * Decide whether we can move the peer to the DATA_PRESENT state. + * + * We replace stale data for a multi-rail peer, repair PING_FAILED + * status, and preempt FORCE_PING. + * + * If after that we have DATA_PRESENT, we merge it into this peer. + */ + spin_lock(&lp->lp_lock); + if (lp->lp_state & LNET_PEER_MULTI_RAIL) { + if (lp->lp_peer_seqno < LNET_PING_BUFFER_SEQNO(pbuf)) { + lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf); + } else if (lp->lp_state & LNET_PEER_DATA_PRESENT) { + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; + lnet_ping_buffer_decref(pbuf); + pbuf = lp->lp_data; + lp->lp_data = NULL; + } + } + if (lp->lp_state & LNET_PEER_DATA_PRESENT) { + lnet_ping_buffer_decref(lp->lp_data); + lp->lp_data = NULL; + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; + } + if (lp->lp_state & LNET_PEER_PING_FAILED) { + mdh = lp->lp_ping_mdh; + LNetInvalidateMDHandle(&lp->lp_ping_mdh); + lp->lp_state &= ~LNET_PEER_PING_FAILED; + lp->lp_ping_error = 0; + } + if (lp->lp_state & LNET_PEER_FORCE_PING) + lp->lp_state &= ~LNET_PEER_FORCE_PING; + lp->lp_state |= LNET_PEER_NIDS_UPTODATE; + spin_unlock(&lp->lp_lock); + + if (!LNetMDHandleIsInvalid(mdh)) + LNetMDUnlink(mdh); + + if (pbuf) + return lnet_peer_merge_data(lp, pbuf); + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + return 0; +} + +/* + * Update a peer using the data received. + */ +static int lnet_peer_data_present(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + struct lnet_ping_buffer *pbuf; + struct lnet_peer_ni *lpni; + lnet_nid_t nid = LNET_NID_ANY; + unsigned int flags; + int rc = 0; + + pbuf = lp->lp_data; + lp->lp_data = NULL; + lp->lp_state &= ~LNET_PEER_DATA_PRESENT; + lp->lp_state |= LNET_PEER_NIDS_UPTODATE; + spin_unlock(&lp->lp_lock); + + /* + * Modifications of peer structures are done while holding the + * ln_api_mutex. A global lock is required because we may be + * modifying multiple peer structures, and a mutex greatly + * simplifies memory management. + * + * The actual changes to the data structures must also protect + * against concurrent lookups, for which the lnet_net_lock in + * LNET_LOCK_EX mode is used. + */ + mutex_lock(&the_lnet.ln_api_mutex); + if (the_lnet.ln_state == LNET_STATE_SHUTDOWN) { + rc = -ESHUTDOWN; + goto out; + } + + /* + * If this peer is not on the peer list then it is being torn + * down, and our reference count may be all that is keeping it + * alive. Don't do any work on it. + */ + if (list_empty(&lp->lp_peer_list)) + goto out; + + flags = LNET_PEER_DISCOVERED; + if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) + flags |= LNET_PEER_MULTI_RAIL; + + /* + * Check whether the primary NID in the message matches the + * primary NID of the peer. If it does, update the peer, if + * it it does not, check whether there is already a peer with + * that primary NID. If no such peer exists, try to update + * the primary NID of the current peer (allowed if it was + * created due to message traffic) and complete the update. + * If the peer did exist, hand off the data to it. + * + * The peer for the loopback interface is a special case: this + * is the peer for the local node, and we want to set its + * primary NID to the correct value here. + */ + if (pbuf->pb_info.pi_nnis > 1) + nid = pbuf->pb_info.pi_ni[1].ns_nid; + if (LNET_NETTYP(LNET_NIDNET(lp->lp_primary_nid)) == LOLND) { + rc = lnet_peer_set_primary_nid(lp, nid, flags); + if (!rc) + rc = lnet_peer_merge_data(lp, pbuf); + } else if (lp->lp_primary_nid == nid) { + rc = lnet_peer_merge_data(lp, pbuf); + } else { + lpni = lnet_find_peer_ni_locked(nid); + if (!lpni) { + rc = lnet_peer_set_primary_nid(lp, nid, flags); + if (rc) { + CERROR("Primary NID error %s versus %s: %d\n", + libcfs_nid2str(lp->lp_primary_nid), + libcfs_nid2str(nid), rc); + } else { + rc = lnet_peer_merge_data(lp, pbuf); + } + } else { + rc = lnet_peer_set_primary_data( + lpni->lpni_peer_net->lpn_peer, pbuf); + lnet_peer_ni_decref_locked(lpni); + } + } +out: + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); + mutex_unlock(&the_lnet.ln_api_mutex); + + spin_lock(&lp->lp_lock); + /* Tell discovery to re-check the peer immediately. */ + if (!rc) + rc = LNET_REDISCOVER_PEER; + return rc; +} + +/* + * A ping failed. Clear the PING_FAILED state and set the + * FORCE_PING state, to ensure a retry even if discovery is + * disabled. This avoids being left with incorrect state. + */ +static int lnet_peer_ping_failed(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + struct lnet_handle_md mdh; + int rc; + + mdh = lp->lp_ping_mdh; + LNetInvalidateMDHandle(&lp->lp_ping_mdh); + lp->lp_state &= ~LNET_PEER_PING_FAILED; + lp->lp_state |= LNET_PEER_FORCE_PING; + rc = lp->lp_ping_error; + lp->lp_ping_error = 0; + spin_unlock(&lp->lp_lock); + + if (!LNetMDHandleIsInvalid(mdh)) + LNetMDUnlink(mdh); + + CDEBUG(D_NET, "peer %s:%d\n", + libcfs_nid2str(lp->lp_primary_nid), rc); + + spin_lock(&lp->lp_lock); + return rc ? rc : LNET_REDISCOVER_PEER; +} + +/* + * Select NID to send a Ping or Push to. + */ +static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp) +{ + struct lnet_peer_ni *lpni; + + /* Look for a direct-connected NID for this peer. */ + lpni = NULL; + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) { + if (!lnet_is_peer_ni_healthy_locked(lpni)) + continue; + if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id)) + continue; + break; + } + if (lpni) + return lpni->lpni_nid; + + /* Look for a routed-connected NID for this peer. */ + lpni = NULL; + while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) { + if (!lnet_is_peer_ni_healthy_locked(lpni)) + continue; + if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id)) + continue; + break; + } + if (lpni) + return lpni->lpni_nid; + + return LNET_NID_ANY; +} + +/* Active side of ping. */ +static int lnet_peer_send_ping(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + struct lnet_md md = { NULL }; + struct lnet_process_id id; + struct lnet_ping_buffer *pbuf; + int nnis; + int rc; + int cpt; + + lp->lp_state |= LNET_PEER_PING_SENT; + lp->lp_state &= ~LNET_PEER_FORCE_PING; + spin_unlock(&lp->lp_lock); + + nnis = max_t(int, lp->lp_data_nnis, LNET_INTERFACES_MIN); + pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS); + if (!pbuf) { + rc = -ENOMEM; + goto fail_error; + } + + /* initialize md content */ + md.start = &pbuf->pb_info; + md.length = LNET_PING_INFO_SIZE(nnis); + md.threshold = 2; /* GET/REPLY */ + md.max_size = 0; + md.options = LNET_MD_TRUNCATE; + md.user_ptr = lp; + md.eq_handle = the_lnet.ln_dc_eqh; + + rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh); + if (rc != 0) { + lnet_ping_buffer_decref(pbuf); + CERROR("Can't bind MD: %d\n", rc); + goto fail_error; + } + cpt = lnet_net_lock_current(); + /* Refcount for MD. */ + lnet_peer_addref_locked(lp); + id.pid = LNET_PID_LUSTRE; + id.nid = lnet_peer_select_nid(lp); + lnet_net_unlock(cpt); + + if (id.nid == LNET_NID_ANY) { + rc = -EHOSTUNREACH; + goto fail_unlink_md; + } + + rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id, + LNET_RESERVED_PORTAL, + LNET_PROTO_PING_MATCHBITS, 0); + + if (rc) + goto fail_unlink_md; + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + + spin_lock(&lp->lp_lock); + return 0; + +fail_unlink_md: + LNetMDUnlink(lp->lp_ping_mdh); + LNetInvalidateMDHandle(&lp->lp_ping_mdh); +fail_error: + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); + /* + * The errors that get us here are considered hard errors and + * cause Discovery to terminate. So we clear PING_SENT, but do + * not set either PING_FAILED or FORCE_PING. In fact we need + * to clear PING_FAILED, because the unlink event handler will + * have set it if we called LNetMDUnlink() above. + */ + spin_lock(&lp->lp_lock); + lp->lp_state &= ~(LNET_PEER_PING_SENT | LNET_PEER_PING_FAILED); + return rc; +} + +/* + * This function exists because you cannot call LNetMDUnlink() from an + * event handler. + */ +static int lnet_peer_push_failed(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + struct lnet_handle_md mdh; + int rc; + + mdh = lp->lp_push_mdh; + LNetInvalidateMDHandle(&lp->lp_push_mdh); + lp->lp_state &= ~LNET_PEER_PUSH_FAILED; + rc = lp->lp_push_error; + lp->lp_push_error = 0; + spin_unlock(&lp->lp_lock); + + if (!LNetMDHandleIsInvalid(mdh)) + LNetMDUnlink(mdh); + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + spin_lock(&lp->lp_lock); + return rc ? rc : LNET_REDISCOVER_PEER; +} + +/* Active side of push. */ +static int lnet_peer_send_push(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + struct lnet_ping_buffer *pbuf; + struct lnet_process_id id; + struct lnet_md md; + int cpt; + int rc; + + /* Don't push to a non-multi-rail peer. */ + if (!(lp->lp_state & LNET_PEER_MULTI_RAIL)) { + lp->lp_state &= ~LNET_PEER_FORCE_PUSH; + return 0; + } + + lp->lp_state |= LNET_PEER_PUSH_SENT; + lp->lp_state &= ~LNET_PEER_FORCE_PUSH; + spin_unlock(&lp->lp_lock); + + cpt = lnet_net_lock_current(); + pbuf = the_lnet.ln_ping_target; + lnet_ping_buffer_addref(pbuf); + lnet_net_unlock(cpt); + + /* Push source MD */ + md.start = &pbuf->pb_info; + md.length = LNET_PING_INFO_SIZE(pbuf->pb_nnis); + md.threshold = 2; /* Put/Ack */ + md.max_size = 0; + md.options = 0; + md.eq_handle = the_lnet.ln_dc_eqh; + md.user_ptr = lp; + + rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_push_mdh); + if (rc) { + lnet_ping_buffer_decref(pbuf); + CERROR("Can't bind push source MD: %d\n", rc); + goto fail_error; + } + cpt = lnet_net_lock_current(); + /* Refcount for MD. */ + lnet_peer_addref_locked(lp); + id.pid = LNET_PID_LUSTRE; + id.nid = lnet_peer_select_nid(lp); + lnet_net_unlock(cpt); + + if (id.nid == LNET_NID_ANY) { + rc = -EHOSTUNREACH; + goto fail_unlink; + } + + rc = LNetPut(LNET_NID_ANY, lp->lp_push_mdh, + LNET_ACK_REQ, id, LNET_RESERVED_PORTAL, + LNET_PROTO_PING_MATCHBITS, 0, 0); + + if (rc) + goto fail_unlink; + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + + spin_lock(&lp->lp_lock); + return 0; + +fail_unlink: + LNetMDUnlink(lp->lp_push_mdh); + LNetInvalidateMDHandle(&lp->lp_push_mdh); +fail_error: + CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc); + /* + * The errors that get us here are considered hard errors and + * cause Discovery to terminate. So we clear PUSH_SENT, but do + * not set PUSH_FAILED. In fact we need to clear PUSH_FAILED, + * because the unlink event handler will have set it if we + * called LNetMDUnlink() above. + */ + spin_lock(&lp->lp_lock); + lp->lp_state &= ~(LNET_PEER_PUSH_SENT | LNET_PEER_PUSH_FAILED); + return rc; +} + +/* + * An unrecoverable error was encountered during discovery. + * Set error status in peer and abort discovery. + */ +static void lnet_peer_discovery_error(struct lnet_peer *lp, int error) +{ + CDEBUG(D_NET, "Discovery error %s: %d\n", + libcfs_nid2str(lp->lp_primary_nid), error); + + spin_lock(&lp->lp_lock); + lp->lp_dc_error = error; + lp->lp_state &= ~LNET_PEER_DISCOVERING; + lp->lp_state |= LNET_PEER_REDISCOVER; + spin_unlock(&lp->lp_lock); +} + +/* + * Mark the peer as discovered. + */ +static int lnet_peer_discovered(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + lp->lp_state |= LNET_PEER_DISCOVERED; + lp->lp_state &= ~(LNET_PEER_DISCOVERING | + LNET_PEER_REDISCOVER); + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + + return 0; +} + +/* + * Mark the peer as to be rediscovered. + */ +static int lnet_peer_rediscover(struct lnet_peer *lp) +__must_hold(&lp->lp_lock) +{ + lp->lp_state |= LNET_PEER_REDISCOVER; + lp->lp_state &= ~LNET_PEER_DISCOVERING; + + CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid)); + + return 0; +} + +/* + * Returns the first peer on the ln_dc_working queue if its timeout + * has expired. Takes the current time as an argument so as to not + * obsessively re-check the clock. The oldest discovery request will + * be at the head of the queue. + */ +static struct lnet_peer *lnet_peer_dc_timed_out(time64_t now) +{ + struct lnet_peer *lp; + + if (list_empty(&the_lnet.ln_dc_working)) + return NULL; + lp = list_first_entry(&the_lnet.ln_dc_working, + struct lnet_peer, lp_dc_list); + if (now < lp->lp_last_queued + DISCOVERY_TIMEOUT) + return NULL; + return lp; +} + +/* + * Discovering this peer is taking too long. Cancel any Ping or Push + * that discovery is waiting on by unlinking the relevant MDs. The + * lnet_discovery_event_handler() will proceed from here and complete + * the cleanup. + */ +static void lnet_peer_discovery_timeout(struct lnet_peer *lp) +{ + struct lnet_handle_md ping_mdh; + struct lnet_handle_md push_mdh; + + LNetInvalidateMDHandle(&ping_mdh); + LNetInvalidateMDHandle(&push_mdh); + + spin_lock(&lp->lp_lock); + if (lp->lp_state & LNET_PEER_PING_SENT) { + ping_mdh = lp->lp_ping_mdh; + LNetInvalidateMDHandle(&lp->lp_ping_mdh); + } + if (lp->lp_state & LNET_PEER_PUSH_SENT) { + push_mdh = lp->lp_push_mdh; + LNetInvalidateMDHandle(&lp->lp_push_mdh); + } + spin_unlock(&lp->lp_lock); + + if (!LNetMDHandleIsInvalid(ping_mdh)) + LNetMDUnlink(ping_mdh); + if (!LNetMDHandleIsInvalid(push_mdh)) + LNetMDUnlink(push_mdh); +} + +/* + * Wait for work to be queued or some other change that must be + * attended to. Returns non-zero if the discovery thread should shut + * down. + */ +static int lnet_peer_discovery_wait_for_work(void) +{ + int cpt; + int rc = 0; + + DEFINE_WAIT(wait); + + cpt = lnet_net_lock_current(); + for (;;) { + prepare_to_wait(&the_lnet.ln_dc_waitq, &wait, + TASK_IDLE); + if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) + break; + if (lnet_push_target_resize_needed()) + break; + if (!list_empty(&the_lnet.ln_dc_request)) + break; + if (lnet_peer_dc_timed_out(ktime_get_real_seconds())) + break; + lnet_net_unlock(cpt); + + /* + * wakeup max every second to check if there are peers that + * have been stuck on the working queue for greater than + * the peer timeout. + */ + schedule_timeout(HZ); + finish_wait(&the_lnet.ln_dc_waitq, &wait); + cpt = lnet_net_lock_current(); + } + finish_wait(&the_lnet.ln_dc_waitq, &wait); + + if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) + rc = -ESHUTDOWN; + + lnet_net_unlock(cpt); + + CDEBUG(D_NET, "woken: %d\n", rc); + + return rc; +} + +/* The discovery thread. */ +static int lnet_peer_discovery(void *arg) +{ + struct lnet_peer *lp; + time64_t now; + int rc; + + CDEBUG(D_NET, "started\n"); + + for (;;) { + if (lnet_peer_discovery_wait_for_work()) break; if (lnet_push_target_resize_needed()) @@ -1719,33 +2984,97 @@ static int lnet_peer_discovery(void *arg) lnet_net_lock(LNET_LOCK_EX); if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) break; + + /* + * Process all incoming discovery work requests. When + * discovery must wait on a peer to change state, it + * is added to the tail of the ln_dc_working queue. A + * timestamp keeps track of when the peer was added, + * so we can time out discovery requests that take too + * long. + */ while (!list_empty(&the_lnet.ln_dc_request)) { lp = list_first_entry(&the_lnet.ln_dc_request, struct lnet_peer, lp_dc_list); list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working); + /* + * set the time the peer was put on the dc_working + * queue. It shouldn't remain on the queue + * forever, in case the GET message (for ping) + * doesn't get a REPLY or the PUT message (for + * push) doesn't get an ACK. + * + * TODO: LNet Health will deal with this scenario + * in a generic way. + */ + lp->lp_last_queued = ktime_get_real_seconds(); lnet_net_unlock(LNET_LOCK_EX); - /* Just tag and release for now. */ + /* + * Select an action depending on the state of + * the peer and whether discovery is disabled. + * The check whether discovery is disabled is + * done after the code that handles processing + * for arrived data, cleanup for failures, and + * forcing a Ping or Push. + */ spin_lock(&lp->lp_lock); - if (lnet_peer_discovery_disabled) { - lp->lp_state |= LNET_PEER_REDISCOVER; - lp->lp_state &= ~(LNET_PEER_DISCOVERED | - LNET_PEER_NIDS_UPTODATE | - LNET_PEER_DISCOVERING); - } else { - lp->lp_state |= (LNET_PEER_DISCOVERED | - LNET_PEER_NIDS_UPTODATE); - lp->lp_state &= ~(LNET_PEER_REDISCOVER | - LNET_PEER_DISCOVERING); - } + CDEBUG(D_NET, "peer %s state %#x\n", + libcfs_nid2str(lp->lp_primary_nid), + lp->lp_state); + if (lp->lp_state & LNET_PEER_DATA_PRESENT) + rc = lnet_peer_data_present(lp); + else if (lp->lp_state & LNET_PEER_PING_FAILED) + rc = lnet_peer_ping_failed(lp); + else if (lp->lp_state & LNET_PEER_PUSH_FAILED) + rc = lnet_peer_push_failed(lp); + else if (lp->lp_state & LNET_PEER_FORCE_PING) + rc = lnet_peer_send_ping(lp); + else if (lp->lp_state & LNET_PEER_FORCE_PUSH) + rc = lnet_peer_send_push(lp); + else if (lnet_peer_discovery_disabled) + rc = lnet_peer_rediscover(lp); + else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE)) + rc = lnet_peer_send_ping(lp); + else if (lnet_peer_needs_push(lp)) + rc = lnet_peer_send_push(lp); + else + rc = lnet_peer_discovered(lp); + CDEBUG(D_NET, "peer %s state %#x rc %d\n", + libcfs_nid2str(lp->lp_primary_nid), + lp->lp_state, rc); spin_unlock(&lp->lp_lock); lnet_net_lock(LNET_LOCK_EX); + if (rc == LNET_REDISCOVER_PEER) { + list_move(&lp->lp_dc_list, + &the_lnet.ln_dc_request); + } else if (rc) { + lnet_peer_discovery_error(lp, rc); + } if (!(lp->lp_state & LNET_PEER_DISCOVERING)) lnet_peer_discovery_complete(lp); if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING) break; } + + /* + * Now that the ln_dc_request queue has been emptied + * check the ln_dc_working queue for peers that are + * taking too long. Move all that are found to the + * ln_dc_expired queue and time out any pending + * Ping or Push. We have to drop the lnet_net_lock + * in the loop because lnet_peer_discovery_timeout() + * calls LNetMDUnlink(). + */ + now = ktime_get_real_seconds(); + while ((lp = lnet_peer_dc_timed_out(now)) != NULL) { + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired); + lnet_net_unlock(LNET_LOCK_EX); + lnet_peer_discovery_timeout(lp); + lnet_net_lock(LNET_LOCK_EX); + } + lnet_net_unlock(LNET_LOCK_EX); } @@ -1759,23 +3088,28 @@ static int lnet_peer_discovery(void *arg) LNetEQFree(the_lnet.ln_dc_eqh); LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh); + /* Queue cleanup 1: stop all pending pings and pushes. */ lnet_net_lock(LNET_LOCK_EX); - list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) { - spin_lock(&lp->lp_lock); - lp->lp_state |= LNET_PEER_REDISCOVER; - lp->lp_state &= ~(LNET_PEER_DISCOVERED | - LNET_PEER_DISCOVERING | - LNET_PEER_NIDS_UPTODATE); - spin_unlock(&lp->lp_lock); - lnet_peer_discovery_complete(lp); + while (!list_empty(&the_lnet.ln_dc_working)) { + lp = list_first_entry(&the_lnet.ln_dc_working, + struct lnet_peer, lp_dc_list); + list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired); + lnet_net_unlock(LNET_LOCK_EX); + lnet_peer_discovery_timeout(lp); + lnet_net_lock(LNET_LOCK_EX); } - list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) { - spin_lock(&lp->lp_lock); - lp->lp_state |= LNET_PEER_REDISCOVER; - lp->lp_state &= ~(LNET_PEER_DISCOVERED | - LNET_PEER_DISCOVERING | - LNET_PEER_NIDS_UPTODATE); - spin_unlock(&lp->lp_lock); + lnet_net_unlock(LNET_LOCK_EX); + + /* Queue cleanup 2: wait for the expired queue to clear. */ + while (!list_empty(&the_lnet.ln_dc_expired)) + schedule_timeout_uninterruptible(HZ); + + /* Queue cleanup 3: clear the request queue. */ + lnet_net_lock(LNET_LOCK_EX); + while (!list_empty(&the_lnet.ln_dc_request)) { + lp = list_first_entry(&the_lnet.ln_dc_request, + struct lnet_peer, lp_dc_list); + lnet_peer_discovery_error(lp, -ESHUTDOWN); lnet_peer_discovery_complete(lp); } lnet_net_unlock(LNET_LOCK_EX); @@ -1797,10 +3131,6 @@ int lnet_peer_discovery_start(void) if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN) return -EALREADY; - INIT_LIST_HEAD(&the_lnet.ln_dc_request); - INIT_LIST_HEAD(&the_lnet.ln_dc_working); - init_waitqueue_head(&the_lnet.ln_dc_waitq); - rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh); if (rc != 0) { CERROR("Can't allocate discovery EQ: %d\n", rc); @@ -1819,6 +3149,8 @@ int lnet_peer_discovery_start(void) the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN; } + CDEBUG(D_NET, "discovery start: %d\n", rc); + return rc; } @@ -1837,6 +3169,9 @@ void lnet_peer_discovery_stop(void) LASSERT(list_empty(&the_lnet.ln_dc_request)); LASSERT(list_empty(&the_lnet.ln_dc_working)); + LASSERT(list_empty(&the_lnet.ln_dc_expired)); + + CDEBUG(D_NET, "discovery stopped\n"); } /* Debugging */