diff mbox series

[net-next,v6,08/12] net: homa: create homa_incoming.c

Message ID 20250115185937.1324-9-ouster@cs.stanford.edu (mailing list archive)
State New
Delegated to: Netdev Maintainers
Headers show
Series Begin upstreaming Homa transport protocol | expand

Commit Message

John Ousterhout Jan. 15, 2025, 6:59 p.m. UTC
This file contains most of the code for handling incoming packets,
including top-level dispatching code plus specific handlers for each
pack type. It also contains code for dispatching fully-received
messages to waiting application threads.

Signed-off-by: John Ousterhout <ouster@cs.stanford.edu>
---
 net/homa/homa_incoming.c | 1076 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 1076 insertions(+)
 create mode 100644 net/homa/homa_incoming.c
diff mbox series

Patch

diff --git a/net/homa/homa_incoming.c b/net/homa/homa_incoming.c
new file mode 100644
index 000000000000..c7630bf6d1fb
--- /dev/null
+++ b/net/homa/homa_incoming.c
@@ -0,0 +1,1076 @@ 
+// SPDX-License-Identifier: BSD-2-Clause
+
+/* This file contains functions that handle incoming Homa messages.
+ */
+
+#include "homa_impl.h"
+#include "homa_peer.h"
+#include "homa_pool.h"
+
+/**
+ * homa_message_in_init() - Constructor for homa_message_in.
+ * @rpc:          RPC whose msgin structure should be initialized.
+ * @length:       Total number of bytes in message.
+ * Return:        Zero for successful initialization, or a negative errno
+ *                if rpc->msgin could not be initialized.
+ */
+int homa_message_in_init(struct homa_rpc *rpc, int length)
+{
+	int err;
+
+	rpc->msgin.length = length;
+	skb_queue_head_init(&rpc->msgin.packets);
+	rpc->msgin.recv_end = 0;
+	INIT_LIST_HEAD(&rpc->msgin.gaps);
+	rpc->msgin.bytes_remaining = length;
+	rpc->msgin.resend_all = 0;
+	rpc->msgin.num_bpages = 0;
+	err = homa_pool_allocate(rpc);
+	if (err != 0)
+		return err;
+	return 0;
+}
+
+/**
+ * homa_gap_new() - Create a new gap and add it to a list.
+ * @next:   Add the new gap just before this list element.
+ * @start:  Offset of first byte covered by the gap.
+ * @end:    Offset of byte just after the last one covered by the gap.
+ * Return:  Pointer to the new gap, or NULL if memory couldn't be allocated
+ *          for the gap object.
+ */
+struct homa_gap *homa_gap_new(struct list_head *next, int start, int end)
+{
+	struct homa_gap *gap;
+
+	gap = kmalloc(sizeof(*gap), GFP_ATOMIC);
+	if (!gap)
+		return NULL;
+	gap->start = start;
+	gap->end = end;
+	gap->time = sched_clock();
+	list_add_tail(&gap->links, next);
+	return gap;
+}
+
+/**
+ * homa_gap_retry() - Send RESEND requests for all of the unreceived
+ * gaps in a message.
+ * @rpc:     RPC to check; must be locked by caller.
+ */
+void homa_gap_retry(struct homa_rpc *rpc)
+{
+	struct homa_resend_hdr resend;
+	struct homa_gap *gap;
+
+	list_for_each_entry(gap, &rpc->msgin.gaps, links) {
+		resend.offset = htonl(gap->start);
+		resend.length = htonl(gap->end - gap->start);
+		homa_xmit_control(RESEND, &resend, sizeof(resend), rpc);
+	}
+}
+
+/**
+ * homa_add_packet() - Add an incoming packet to the contents of a
+ * partially received message.
+ * @rpc:   Add the packet to the msgin for this RPC.
+ * @skb:   The new packet. This function takes ownership of the packet
+ *         (the packet will either be freed or added to rpc->msgin.packets).
+ */
+void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb)
+{
+	struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
+	struct homa_gap *gap, *dummy, *gap2;
+	int start = ntohl(h->seg.offset);
+	int length = homa_data_len(skb);
+	int end = start + length;
+
+	if ((start + length) > rpc->msgin.length)
+		goto discard;
+
+	if (start == rpc->msgin.recv_end) {
+		/* Common case: packet is sequential. */
+		rpc->msgin.recv_end += length;
+		goto keep;
+	}
+
+	if (start > rpc->msgin.recv_end) {
+		/* Packet creates a new gap. */
+		if (!homa_gap_new(&rpc->msgin.gaps,
+				  rpc->msgin.recv_end, start)) {
+			pr_err("Homa couldn't allocate gap: insufficient memory\n");
+			goto discard;
+		}
+		rpc->msgin.recv_end = end;
+		goto keep;
+	}
+
+	/* Must now check to see if the packet fills in part or all of
+	 * an existing gap.
+	 */
+	list_for_each_entry_safe(gap, dummy, &rpc->msgin.gaps, links) {
+		/* Is packet at the start of this gap? */
+		if (start <= gap->start) {
+			if (end <= gap->start)
+				continue;
+			if (start < gap->start)
+				goto discard;
+			if (end > gap->end)
+				goto discard;
+			gap->start = end;
+			if (gap->start >= gap->end) {
+				list_del(&gap->links);
+				kfree(gap);
+			}
+			goto keep;
+		}
+
+		/* Is packet at the end of this gap? BTW, at this point we know
+		 * the packet can't cover the entire gap.
+		 */
+		if (end >= gap->end) {
+			if (start >= gap->end)
+				continue;
+			if (end > gap->end)
+				goto discard;
+			gap->end = start;
+			goto keep;
+		}
+
+		/* Packet is in the middle of the gap; must split the gap. */
+		gap2 = homa_gap_new(&gap->links, gap->start, start);
+		if (!gap2) {
+			pr_err("Homa couldn't allocate gap for split: insufficient memory\n");
+			goto discard;
+		}
+		gap2->time = gap->time;
+		gap->start = end;
+		goto keep;
+	}
+
+discard:
+	kfree_skb(skb);
+	return;
+
+keep:
+	__skb_queue_tail(&rpc->msgin.packets, skb);
+	rpc->msgin.bytes_remaining -= length;
+}
+
+/**
+ * homa_copy_to_user() - Copy as much data as possible from incoming
+ * packet buffers to buffers in user space.
+ * @rpc:     RPC for which data should be copied. Must be locked by caller.
+ * Return:   Zero for success or a negative errno if there is an error.
+ *           It is possible for the RPC to be freed while this function
+ *           executes (it releases and reacquires the RPC lock). If that
+ *           happens, -EINVAL will be returned and the state of @rpc
+ *           will be RPC_DEAD.
+ */
+int homa_copy_to_user(struct homa_rpc *rpc)
+	__releases(rpc->bucket_lock)
+	__acquires(rpc->bucket_lock)
+{
+#define MAX_SKBS 20
+	struct sk_buff *skbs[MAX_SKBS];
+	int error = 0;
+	int n = 0;             /* Number of filled entries in skbs. */
+	int i;
+
+	/* Tricky note: we can't hold the RPC lock while we're actually
+	 * copying to user space, because (a) it's illegal to hold a spinlock
+	 * while copying to user space and (b) we'd like for homa_softirq
+	 * to add more packets to the RPC while we're copying these out.
+	 * So, collect a bunch of packets to copy, then release the lock,
+	 * copy them, and reacquire the lock.
+	 */
+	while (true) {
+		struct sk_buff *skb;
+
+		if (rpc->state == RPC_DEAD) {
+			error = -EINVAL;
+			break;
+		}
+
+		skb = __skb_dequeue(&rpc->msgin.packets);
+		if (skb) {
+			skbs[n] = skb;
+			n++;
+			if (n < MAX_SKBS)
+				continue;
+		}
+		if (n == 0)
+			break;
+
+		/* At this point we've collected a batch of packets (or
+		 * run out of packets); copy any available packets out to
+		 * user space.
+		 */
+		atomic_or(RPC_COPYING_TO_USER, &rpc->flags);
+		homa_rpc_unlock(rpc);
+
+		/* Each iteration of this loop copies out one skb. */
+		for (i = 0; i < n; i++) {
+			struct homa_data_hdr *h = (struct homa_data_hdr *)
+					skbs[i]->data;
+			int pkt_length = homa_data_len(skbs[i]);
+			int offset = ntohl(h->seg.offset);
+			int buf_bytes, chunk_size;
+			struct iov_iter iter;
+			int copied = 0;
+			char *dst;
+
+			/* Each iteration of this loop copies to one
+			 * user buffer.
+			 */
+			while (copied < pkt_length) {
+				chunk_size = pkt_length - copied;
+				dst = homa_pool_get_buffer(rpc, offset + copied,
+							   &buf_bytes);
+				if (buf_bytes < chunk_size) {
+					if (buf_bytes == 0)
+						/* skb has data beyond message
+						 * end?
+						 */
+						break;
+					chunk_size = buf_bytes;
+				}
+				error = import_ubuf(READ, (void __user *)dst,
+						    chunk_size, &iter);
+				if (error)
+					goto free_skbs;
+				error = skb_copy_datagram_iter(skbs[i],
+							       sizeof(*h) +
+							       copied,  &iter,
+							       chunk_size);
+				if (error)
+					goto free_skbs;
+				copied += chunk_size;
+			}
+		}
+
+free_skbs:
+		for (i = 0; i < n; i++)
+			kfree_skb(skbs[i]);
+		n = 0;
+		atomic_or(APP_NEEDS_LOCK, &rpc->flags);
+		homa_rpc_lock(rpc, "homa_copy_to_user");
+		atomic_andnot(APP_NEEDS_LOCK | RPC_COPYING_TO_USER,
+			      &rpc->flags);
+		if (error)
+			break;
+	}
+	return error;
+}
+
+/**
+ * homa_dispatch_pkts() - Top-level function that processes a batch of packets,
+ * all related to the same RPC.
+ * @skb:       First packet in the batch, linked through skb->next.
+ * @homa:      Overall information about the Homa transport.
+ */
+void homa_dispatch_pkts(struct sk_buff *skb, struct homa *homa)
+{
+#define MAX_ACKS 10
+	const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
+	struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
+	__u64 id = homa_local_id(h->common.sender_id);
+	int dport = ntohs(h->common.dport);
+
+	/* Used to collect acks from data packets so we can process them
+	 * all at the end (can't process them inline because that may
+	 * require locking conflicting RPCs). If we run out of space just
+	 * ignore the extra acks; they'll be regenerated later through the
+	 * explicit mechanism.
+	 */
+	struct homa_ack acks[MAX_ACKS];
+	struct homa_rpc *rpc = NULL;
+	struct homa_sock *hsk;
+	struct sk_buff *next;
+	int num_acks = 0;
+
+	/* Find the appropriate socket.*/
+	hsk = homa_sock_find(homa->port_map, dport);
+	if (!hsk) {
+		if (skb_is_ipv6(skb))
+			icmp6_send(skb, ICMPV6_DEST_UNREACH,
+				   ICMPV6_PORT_UNREACH, 0, NULL, IP6CB(skb));
+		else
+			icmp_send(skb, ICMP_DEST_UNREACH,
+				  ICMP_PORT_UNREACH, 0);
+		while (skb) {
+			next = skb->next;
+			kfree_skb(skb);
+			skb = next;
+		}
+		return;
+	}
+
+	/* Each iteration through the following loop processes one packet. */
+	for (; skb; skb = next) {
+		h = (struct homa_data_hdr *)skb->data;
+		next = skb->next;
+
+		/* Relinquish the RPC lock temporarily if it's needed
+		 * elsewhere.
+		 */
+		if (rpc) {
+			int flags = atomic_read(&rpc->flags);
+
+			if (flags & APP_NEEDS_LOCK) {
+				homa_rpc_unlock(rpc);
+				homa_spin(200);
+				rpc = NULL;
+			}
+		}
+
+		/* Find and lock the RPC if we haven't already done so. */
+		if (!rpc) {
+			if (!homa_is_client(id)) {
+				/* We are the server for this RPC. */
+				if (h->common.type == DATA) {
+					int created;
+
+					/* Create a new RPC if one doesn't
+					 * already exist.
+					 */
+					rpc = homa_rpc_new_server(hsk, &saddr,
+								  h, &created);
+					if (IS_ERR(rpc)) {
+						pr_warn("homa_pkt_dispatch couldn't create server rpc: error %lu",
+							-PTR_ERR(rpc));
+						rpc = NULL;
+						goto discard;
+					}
+				} else {
+					rpc = homa_find_server_rpc(hsk, &saddr,
+								   id);
+				}
+			} else {
+				rpc = homa_find_client_rpc(hsk, id);
+			}
+		}
+		if (unlikely(!rpc)) {
+			if (h->common.type != NEED_ACK &&
+			    h->common.type != ACK &&
+			    h->common.type != RESEND)
+				goto discard;
+		} else {
+			if (h->common.type == DATA ||
+			    h->common.type == BUSY ||
+			    h->common.type == NEED_ACK)
+				rpc->silent_ticks = 0;
+			rpc->peer->outstanding_resends = 0;
+		}
+
+		switch (h->common.type) {
+		case DATA:
+			if (h->ack.client_id) {
+				/* Save the ack for processing later, when we
+				 * have released the RPC lock.
+				 */
+				if (num_acks < MAX_ACKS) {
+					acks[num_acks] = h->ack;
+					num_acks++;
+				}
+			}
+			homa_data_pkt(skb, rpc);
+			break;
+		case RESEND:
+			homa_resend_pkt(skb, rpc, hsk);
+			break;
+		case UNKNOWN:
+			homa_unknown_pkt(skb, rpc);
+			break;
+		case BUSY:
+			/* Nothing to do for these packets except reset
+			 * silent_ticks, which happened above.
+			 */
+			goto discard;
+		case NEED_ACK:
+			homa_need_ack_pkt(skb, hsk, rpc);
+			break;
+		case ACK:
+			homa_ack_pkt(skb, hsk, rpc);
+			rpc = NULL;
+
+			/* It isn't safe to process more packets once we've
+			 * released the RPC lock (this should never happen).
+			 */
+			while (next) {
+				WARN_ONCE(next, "%s found extra packets after AC<\n",
+					  __func__);
+				skb = next;
+				next = skb->next;
+				kfree_skb(skb);
+			}
+			break;
+		default:
+			goto discard;
+		}
+		continue;
+
+discard:
+		kfree_skb(skb);
+	}
+	if (rpc)
+		homa_rpc_unlock(rpc);
+
+	while (num_acks > 0) {
+		num_acks--;
+		homa_rpc_acked(hsk, &saddr, &acks[num_acks]);
+	}
+
+	if (hsk->dead_skbs >= 2 * hsk->homa->dead_buffs_limit)
+		/* We get here if neither homa_wait_for_message
+		 * nor homa_timer can keep up with reaping dead
+		 * RPCs. See reap.txt for details.
+		 */
+		homa_rpc_reap(hsk, false);
+}
+
+/**
+ * homa_data_pkt() - Handler for incoming DATA packets
+ * @skb:     Incoming packet; size known to be large enough for the header.
+ *           This function now owns the packet.
+ * @rpc:     Information about the RPC corresponding to this packet.
+ *           Must be locked by the caller.
+ */
+void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
+{
+	struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
+
+	if (rpc->state != RPC_INCOMING && homa_is_client(rpc->id)) {
+		if (unlikely(rpc->state != RPC_OUTGOING))
+			goto discard;
+		rpc->state = RPC_INCOMING;
+		if (homa_message_in_init(rpc, ntohl(h->message_length)) != 0)
+			goto discard;
+	} else if (rpc->state != RPC_INCOMING) {
+		/* Must be server; note that homa_rpc_new_server already
+		 * initialized msgin and allocated buffers.
+		 */
+		if (unlikely(rpc->msgin.length >= 0))
+			goto discard;
+	}
+
+	if (rpc->msgin.num_bpages == 0)
+		/* Drop packets that arrive when we can't allocate buffer
+		 * space. If we keep them around, packet buffer usage can
+		 * exceed available cache space, resulting in poor
+		 * performance.
+		 */
+		goto discard;
+
+	homa_add_packet(rpc, skb);
+
+	if (skb_queue_len(&rpc->msgin.packets) != 0 &&
+	    !(atomic_read(&rpc->flags) & RPC_PKTS_READY)) {
+		atomic_or(RPC_PKTS_READY, &rpc->flags);
+		homa_sock_lock(rpc->hsk, "homa_data_pkt");
+		homa_rpc_handoff(rpc);
+		homa_sock_unlock(rpc->hsk);
+	}
+	return;
+
+discard:
+	kfree_skb(skb);
+}
+
+/**
+ * homa_resend_pkt() - Handler for incoming RESEND packets
+ * @skb:     Incoming packet; size already verified large enough for header.
+ *           This function now owns the packet.
+ * @rpc:     Information about the RPC corresponding to this packet; must
+ *           be locked by caller, but may be NULL if there is no RPC matching
+ *           this packet
+ * @hsk:     Socket on which the packet was received.
+ */
+void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc,
+		     struct homa_sock *hsk)
+{
+	struct homa_resend_hdr *h = (struct homa_resend_hdr *)skb->data;
+	struct homa_busy_hdr busy;
+
+	if (!rpc) {
+		homa_xmit_unknown(skb, hsk);
+		goto done;
+	}
+
+	if (!homa_is_client(rpc->id) && rpc->state != RPC_OUTGOING) {
+		/* We are the server for this RPC and don't yet have a
+		 * response packet, so just send BUSY.
+		 */
+		homa_xmit_control(BUSY, &busy, sizeof(busy), rpc);
+		goto done;
+	}
+	if (ntohl(h->length) == 0)
+		/* This RESEND is from a server just trying to determine
+		 * whether the client still cares about the RPC; return
+		 * BUSY so the server doesn't time us out.
+		 */
+		homa_xmit_control(BUSY, &busy, sizeof(busy), rpc);
+	homa_resend_data(rpc, ntohl(h->offset),
+			 ntohl(h->offset) + ntohl(h->length));
+
+done:
+	kfree_skb(skb);
+}
+
+/**
+ * homa_unknown_pkt() - Handler for incoming UNKNOWN packets.
+ * @skb:     Incoming packet; size known to be large enough for the header.
+ *           This function now owns the packet.
+ * @rpc:     Information about the RPC corresponding to this packet.
+ */
+void homa_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
+{
+	if (homa_is_client(rpc->id)) {
+		if (rpc->state == RPC_OUTGOING) {
+			/* It appears that everything we've already transmitted
+			 * has been lost; retransmit it.
+			 */
+			homa_resend_data(rpc, 0, rpc->msgout.next_xmit_offset);
+			goto done;
+		}
+
+	} else {
+		homa_rpc_free(rpc);
+	}
+done:
+	kfree_skb(skb);
+}
+
+/**
+ * homa_need_ack_pkt() - Handler for incoming NEED_ACK packets
+ * @skb:     Incoming packet; size already verified large enough for header.
+ *           This function now owns the packet.
+ * @hsk:     Socket on which the packet was received.
+ * @rpc:     The RPC named in the packet header, or NULL if no such
+ *           RPC exists. The RPC has been locked by the caller.
+ */
+void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
+		       struct homa_rpc *rpc)
+{
+	struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data;
+	const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
+	__u64 id = homa_local_id(h->sender_id);
+	struct homa_peer *peer;
+	struct homa_ack_hdr ack;
+
+	/* Return if it's not safe for the peer to purge its state
+	 * for this RPC (the RPC still exists and we haven't received
+	 * the entire response), or if we can't find peer info.
+	 */
+	if (rpc && (rpc->state != RPC_INCOMING ||
+		    rpc->msgin.bytes_remaining)) {
+		goto done;
+	} else {
+		peer = homa_peer_find(hsk->homa->peers, &saddr, &hsk->inet);
+		if (IS_ERR(peer))
+			goto done;
+	}
+
+	/* Send an ACK for this RPC. At the same time, include all of the
+	 * other acks available for the peer. Note: can't use rpc below,
+	 * since it may be NULL.
+	 */
+	ack.common.type = ACK;
+	ack.common.sport = h->dport;
+	ack.common.dport = h->sport;
+	ack.common.sender_id = cpu_to_be64(id);
+	ack.num_acks = htons(homa_peer_get_acks(peer,
+						HOMA_MAX_ACKS_PER_PKT,
+						ack.acks));
+	__homa_xmit_control(&ack, sizeof(ack), peer, hsk);
+
+done:
+	kfree_skb(skb);
+}
+
+/**
+ * homa_ack_pkt() - Handler for incoming ACK packets
+ * @skb:     Incoming packet; size already verified large enough for header.
+ *           This function now owns the packet.
+ * @hsk:     Socket on which the packet was received.
+ * @rpc:     The RPC named in the packet header, or NULL if no such
+ *           RPC exists. The RPC has been locked by the caller but will
+ *           be unlocked here.
+ */
+void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
+		  struct homa_rpc *rpc)
+	__releases(rpc->bucket_lock)
+{
+	const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
+	struct homa_ack_hdr *h = (struct homa_ack_hdr *)skb->data;
+	int i, count;
+
+	if (rpc) {
+		homa_rpc_free(rpc);
+		homa_rpc_unlock(rpc);
+	}
+
+	count = ntohs(h->num_acks);
+	for (i = 0; i < count; i++)
+		homa_rpc_acked(hsk, &saddr, &h->acks[i]);
+	kfree_skb(skb);
+}
+
+/**
+ * homa_rpc_abort() - Terminate an RPC.
+ * @rpc:     RPC to be terminated.  Must be locked by caller.
+ * @error:   A negative errno value indicating the error that caused the abort.
+ *           If this is a client RPC, the error will be returned to the
+ *           application; if it's a server RPC, the error is ignored and
+ *           we just free the RPC.
+ */
+void homa_rpc_abort(struct homa_rpc *rpc, int error)
+{
+	if (!homa_is_client(rpc->id)) {
+		homa_rpc_free(rpc);
+		return;
+	}
+	rpc->error = error;
+	homa_sock_lock(rpc->hsk, "homa_rpc_abort");
+	if (!rpc->hsk->shutdown)
+		homa_rpc_handoff(rpc);
+	homa_sock_unlock(rpc->hsk);
+}
+
+/**
+ * homa_abort_rpcs() - Abort all RPCs to/from a particular peer.
+ * @homa:    Overall data about the Homa protocol implementation.
+ * @addr:    Address (network order) of the destination whose RPCs are
+ *           to be aborted.
+ * @port:    If nonzero, then RPCs will only be aborted if they were
+ *	     targeted at this server port.
+ * @error:   Negative errno value indicating the reason for the abort.
+ */
+void homa_abort_rpcs(struct homa *homa, const struct in6_addr *addr,
+		     int port, int error)
+{
+	struct homa_socktab_scan scan;
+	struct homa_rpc *rpc, *tmp;
+	struct homa_sock *hsk;
+
+	rcu_read_lock();
+	for (hsk = homa_socktab_start_scan(homa->port_map, &scan); hsk;
+	     hsk = homa_socktab_next(&scan)) {
+		/* Skip the (expensive) lock acquisition if there's no
+		 * work to do.
+		 */
+		if (list_empty(&hsk->active_rpcs))
+			continue;
+		if (!homa_protect_rpcs(hsk))
+			continue;
+		list_for_each_entry_safe(rpc, tmp, &hsk->active_rpcs,
+					 active_links) {
+			if (!ipv6_addr_equal(&rpc->peer->addr, addr))
+				continue;
+			if (port && rpc->dport != port)
+				continue;
+			homa_rpc_lock(rpc, "rpc_abort_rpcs");
+			homa_rpc_abort(rpc, error);
+			homa_rpc_unlock(rpc);
+		}
+		homa_unprotect_rpcs(hsk);
+	}
+	homa_socktab_end_scan(&scan);
+	rcu_read_unlock();
+}
+
+/**
+ * homa_abort_sock_rpcs() - Abort all outgoing (client-side) RPCs on a given
+ * socket.
+ * @hsk:         Socket whose RPCs should be aborted.
+ * @error:       Zero means that the aborted RPCs should be freed immediately.
+ *               A nonzero value means that the RPCs should be marked
+ *               complete, so that they can be returned to the application;
+ *               this value (a negative errno) will be returned from
+ *               recvmsg.
+ */
+void homa_abort_sock_rpcs(struct homa_sock *hsk, int error)
+{
+	struct homa_rpc *rpc, *tmp;
+
+	rcu_read_lock();
+	if (list_empty(&hsk->active_rpcs))
+		goto done;
+	if (!homa_protect_rpcs(hsk))
+		goto done;
+	list_for_each_entry_safe(rpc, tmp, &hsk->active_rpcs, active_links) {
+		if (!homa_is_client(rpc->id))
+			continue;
+		homa_rpc_lock(rpc, "homa_abort_sock_rpcs");
+		if (rpc->state == RPC_DEAD) {
+			homa_rpc_unlock(rpc);
+			continue;
+		}
+		if (error)
+			homa_rpc_abort(rpc, error);
+		else
+			homa_rpc_free(rpc);
+		homa_rpc_unlock(rpc);
+	}
+	homa_unprotect_rpcs(hsk);
+done:
+	rcu_read_unlock();
+}
+
+/**
+ * homa_register_interests() - Records information in various places so
+ * that a thread will be woken up if an RPC that it cares about becomes
+ * available.
+ * @interest:     Used to record information about the messages this thread is
+ *                waiting on. The initial contents of the structure are
+ *                assumed to be undefined.
+ * @hsk:          Socket on which relevant messages will arrive.  Must not be
+ *                locked.
+ * @flags:        Flags field from homa_recvmsg_args; see manual entry for
+ *                details.
+ * @id:           If non-zero, then the caller is interested in receiving
+ *                the response for this RPC (@id must be a client request).
+ * Return:        Either zero or a negative errno value. If a matching RPC
+ *                is already available, information about it will be stored in
+ *                interest.
+ */
+int homa_register_interests(struct homa_interest *interest,
+			    struct homa_sock *hsk, int flags, __u64 id)
+{
+	struct homa_rpc *rpc = NULL;
+	int locked = 1;
+
+	homa_interest_init(interest);
+	if (id != 0) {
+		if (!homa_is_client(id))
+			return -EINVAL;
+		rpc = homa_find_client_rpc(hsk, id); /* Locks rpc. */
+		if (!rpc)
+			return -EINVAL;
+		if (rpc->interest && rpc->interest != interest) {
+			homa_rpc_unlock(rpc);
+			return -EINVAL;
+		}
+	}
+
+	/* Need both the RPC lock (acquired above) and the socket lock to
+	 * avoid races.
+	 */
+	homa_sock_lock(hsk, "homa_register_interests");
+	if (hsk->shutdown) {
+		homa_sock_unlock(hsk);
+		if (rpc)
+			homa_rpc_unlock(rpc);
+		return -ESHUTDOWN;
+	}
+
+	if (id != 0) {
+		if ((atomic_read(&rpc->flags) & RPC_PKTS_READY) || rpc->error)
+			goto claim_rpc;
+		rpc->interest = interest;
+		interest->reg_rpc = rpc;
+		homa_rpc_unlock(rpc);
+	}
+
+	locked = 0;
+	if (flags & HOMA_RECVMSG_RESPONSE) {
+		if (!list_empty(&hsk->ready_responses)) {
+			rpc = list_first_entry(&hsk->ready_responses,
+					       struct homa_rpc,
+					       ready_links);
+			goto claim_rpc;
+		}
+		/* Insert this thread at the *front* of the list;
+		 * we'll get better cache locality if we reuse
+		 * the same thread over and over, rather than
+		 * round-robining between threads.  Same below.
+		 */
+		list_add(&interest->response_links,
+			 &hsk->response_interests);
+	}
+	if (flags & HOMA_RECVMSG_REQUEST) {
+		if (!list_empty(&hsk->ready_requests)) {
+			rpc = list_first_entry(&hsk->ready_requests,
+					       struct homa_rpc, ready_links);
+			/* Make sure the interest isn't on the response list;
+			 * otherwise it might receive a second RPC.
+			 */
+			if (!list_empty(&interest->response_links))
+				list_del_init(&interest->response_links);
+			goto claim_rpc;
+		}
+		list_add(&interest->request_links, &hsk->request_interests);
+	}
+	homa_sock_unlock(hsk);
+	return 0;
+
+claim_rpc:
+	list_del_init(&rpc->ready_links);
+	if (!list_empty(&hsk->ready_requests) ||
+	    !list_empty(&hsk->ready_responses)) {
+		hsk->sock.sk_data_ready(&hsk->sock);
+	}
+
+	/* This flag is needed to keep the RPC from being reaped during the
+	 * gap between when we release the socket lock and we acquire the
+	 * RPC lock.
+	 */
+	atomic_or(RPC_HANDING_OFF, &rpc->flags);
+	homa_sock_unlock(hsk);
+	if (!locked) {
+		atomic_or(APP_NEEDS_LOCK, &rpc->flags);
+		homa_rpc_lock(rpc, "homa_register_interests");
+		atomic_andnot(APP_NEEDS_LOCK, &rpc->flags);
+		locked = 1;
+	}
+	atomic_andnot(RPC_HANDING_OFF, &rpc->flags);
+	homa_interest_set_rpc(interest, rpc, locked);
+	return 0;
+}
+
+/**
+ * homa_wait_for_message() - Wait for receipt of an incoming message
+ * that matches the parameters. Various other activities can occur while
+ * waiting, such as reaping dead RPCs and copying data to user space.
+ * @hsk:          Socket where messages will arrive.
+ * @flags:        Flags field from homa_recvmsg_args; see manual entry for
+ *                details.
+ * @id:           If non-zero, then a response message matching this id may
+ *                be returned (@id must refer to a client request).
+ *
+ * Return:   Pointer to an RPC that matches @flags and @id, or a negative
+ *           errno value. The RPC will be locked; the caller must unlock.
+ */
+struct homa_rpc *homa_wait_for_message(struct homa_sock *hsk, int flags,
+				       __u64 id)
+	__acquires(&rpc->bucket_lock)
+{
+	struct homa_rpc *result = NULL;
+	struct homa_interest interest;
+	struct homa_rpc *rpc = NULL;
+	int error;
+
+	/* Each iteration of this loop finds an RPC, but it might not be
+	 * in a state where we can return it (e.g., there might be packets
+	 * ready to transfer to user space, but the incoming message isn't yet
+	 * complete). Thus it could take many iterations of this loop
+	 * before we have an RPC with a complete message.
+	 */
+	while (1) {
+		error = homa_register_interests(&interest, hsk, flags, id);
+		rpc = homa_interest_get_rpc(&interest);
+		if (rpc)
+			goto found_rpc;
+		if (error < 0) {
+			result = ERR_PTR(error);
+			goto found_rpc;
+		}
+
+		/* There is no ready RPC so far. Clean up dead RPCs before
+		 * going to sleep (or returning, if in nonblocking mode).
+		 */
+		while (1) {
+			int reaper_result;
+
+			rpc = homa_interest_get_rpc(&interest);
+			if (rpc)
+				goto found_rpc;
+			reaper_result = homa_rpc_reap(hsk, false);
+			if (reaper_result == 0)
+				break;
+
+			/* Give NAPI and SoftIRQ tasks a chance to run. */
+			schedule();
+		}
+		if (flags & HOMA_RECVMSG_NONBLOCKING) {
+			result = ERR_PTR(-EAGAIN);
+			goto found_rpc;
+		}
+
+		/* Now it's time to sleep. */
+		set_current_state(TASK_INTERRUPTIBLE);
+		rpc = homa_interest_get_rpc(&interest);
+		if (!rpc && !hsk->shutdown)
+			schedule();
+		__set_current_state(TASK_RUNNING);
+
+found_rpc:
+		/* If we get here, it means either an RPC is ready for our
+		 * attention or an error occurred.
+		 *
+		 * First, clean up all of the interests. Must do this before
+		 * making any other decisions, because until we do, an incoming
+		 * message could still be passed to us. Note: if we went to
+		 * sleep, then this info was already cleaned up by whoever
+		 * woke us up. Also, values in the interest may change between
+		 * when we test them below and when we acquire the socket lock,
+		 * so they have to be checked again after locking the socket.
+		 */
+		if (interest.reg_rpc ||
+		    !list_empty(&interest.request_links) ||
+		    !list_empty(&interest.response_links)) {
+			homa_sock_lock(hsk, "homa_wait_for_message");
+			if (interest.reg_rpc)
+				interest.reg_rpc->interest = NULL;
+			if (!list_empty(&interest.request_links))
+				list_del_init(&interest.request_links);
+			if (!list_empty(&interest.response_links))
+				list_del_init(&interest.response_links);
+			homa_sock_unlock(hsk);
+		}
+
+		/* Now check to see if we received an RPC handoff (note that
+		 * this could have happened anytime up until we reset the
+		 * interests above).
+		 */
+		rpc = homa_interest_get_rpc(&interest);
+		if (rpc) {
+			if (!interest.locked) {
+				atomic_or(APP_NEEDS_LOCK, &rpc->flags);
+				homa_rpc_lock(rpc, "homa_wait_for_message");
+				atomic_andnot(APP_NEEDS_LOCK | RPC_HANDING_OFF,
+					      &rpc->flags);
+			} else {
+				atomic_andnot(RPC_HANDING_OFF, &rpc->flags);
+			}
+			if (!rpc->error)
+				rpc->error = homa_copy_to_user(rpc);
+			if (rpc->state == RPC_DEAD) {
+				homa_rpc_unlock(rpc);
+				continue;
+			}
+			if (rpc->error)
+				goto done;
+			atomic_andnot(RPC_PKTS_READY, &rpc->flags);
+			if (rpc->msgin.bytes_remaining == 0 &&
+			    !skb_queue_len(&rpc->msgin.packets))
+				goto done;
+			homa_rpc_unlock(rpc);
+		}
+
+		/* A complete message isn't available: check for errors. */
+		if (IS_ERR(result))
+			return result;
+		if (signal_pending(current))
+			return ERR_PTR(-EINTR);
+
+		/* No message and no error; try again. */
+	}
+
+done:
+	return rpc;
+}
+
+/**
+ * homa_choose_interest() - Given a list of interests for an incoming
+ * message, choose the best one to handle it (if any).
+ * @homa:        Overall information about the Homa transport.
+ * @head:        Head pointers for the list of interest: either
+ *		 hsk->request_interests or hsk->response_interests.
+ * @offset:      Offset of "next" pointers in the list elements (either
+ *               offsetof(request_links) or offsetof(response_links).
+ * Return:       An interest to use for the incoming message, or NULL if none
+ *               is available. If possible, this function tries to pick an
+ *               interest whose thread is running on a core that isn't
+ *               currently busy doing Homa transport work.
+ */
+struct homa_interest *homa_choose_interest(struct homa *homa,
+					   struct list_head *head, int offset)
+{
+	struct homa_interest *backup = NULL;
+	struct homa_interest *interest;
+	struct list_head *pos;
+
+	list_for_each(pos, head) {
+		interest = (struct homa_interest *)(((char *)pos) - offset);
+		if (!backup)
+			backup = interest;
+	}
+
+	/* All interested threads are on busy cores; return the first. */
+	return backup;
+}
+
+/**
+ * homa_rpc_handoff() - This function is called when the input message for
+ * an RPC is ready for attention from a user thread. It either notifies
+ * a waiting reader or queues the RPC.
+ * @rpc:                RPC to handoff; must be locked. The caller must
+ *			also have locked the socket for this RPC.
+ */
+void homa_rpc_handoff(struct homa_rpc *rpc)
+{
+	struct homa_sock *hsk = rpc->hsk;
+	struct homa_interest *interest;
+
+	if ((atomic_read(&rpc->flags) & RPC_HANDING_OFF) ||
+	    !list_empty(&rpc->ready_links))
+		return;
+
+	/* First, see if someone is interested in this RPC specifically.
+	 */
+	if (rpc->interest) {
+		interest = rpc->interest;
+		goto thread_waiting;
+	}
+
+	/* Second, check the interest list for this type of RPC. */
+	if (homa_is_client(rpc->id)) {
+		interest = homa_choose_interest(hsk->homa,
+						&hsk->response_interests,
+						offsetof(struct homa_interest,
+							 response_links));
+		if (interest)
+			goto thread_waiting;
+		list_add_tail(&rpc->ready_links, &hsk->ready_responses);
+	} else {
+		interest = homa_choose_interest(hsk->homa,
+						&hsk->request_interests,
+						offsetof(struct homa_interest,
+							 request_links));
+		if (interest)
+			goto thread_waiting;
+		list_add_tail(&rpc->ready_links, &hsk->ready_requests);
+	}
+
+	/* If we get here, no-one is waiting for the RPC, so it has been
+	 * queued.
+	 */
+
+	/* Notify the poll mechanism. */
+	hsk->sock.sk_data_ready(&hsk->sock);
+	return;
+
+thread_waiting:
+	/* We found a waiting thread. The following 3 lines must be here,
+	 * before clearing the interest, in order to avoid a race with
+	 * homa_wait_for_message (which won't acquire the socket lock if
+	 * the interest is clear).
+	 */
+	atomic_or(RPC_HANDING_OFF, &rpc->flags);
+	homa_interest_set_rpc(interest, rpc, 0);
+
+	/* Clear the interest. This serves two purposes. First, it saves
+	 * the waking thread from acquiring the socket lock again, which
+	 * reduces contention on that lock). Second, it ensures that
+	 * no-one else attempts to give this interest a different RPC.
+	 */
+	if (interest->reg_rpc) {
+		interest->reg_rpc->interest = NULL;
+		interest->reg_rpc = NULL;
+	}
+	if (!list_empty(&interest->request_links))
+		list_del_init(&interest->request_links);
+	if (!list_empty(&interest->response_links))
+		list_del_init(&interest->response_links);
+	wake_up_process(interest->thread);
+}
+
+/**
+ * homa_incoming_sysctl_changed() - Invoked whenever a sysctl value is changed;
+ * any input-related parameters that depend on sysctl-settable values.
+ * @homa:    Overall data about the Homa protocol implementation.
+ */
+void homa_incoming_sysctl_changed(struct homa *homa)
+{
+}