diff mbox series

[net-next,v5,04/12] net: homa: create homa_pool.h and homa_pool.c

Message ID 20250106181219.1075-5-ouster@cs.stanford.edu (mailing list archive)
State Changes Requested
Delegated to: Netdev Maintainers
Headers show
Series Begin upstreaming Homa transport protocol | expand

Checks

Context Check Description
netdev/series_format success Posting correctly formatted
netdev/tree_selection success Clearly marked for net-next, async
netdev/ynl success Generated files up to date; no warnings/errors; no diff in generated;
netdev/fixes_present success Fixes tag not required for -next series
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 1 this patch: 1
netdev/build_tools success No tools touched, skip
netdev/cc_maintainers success CCed 6 of 6 maintainers
netdev/build_clang success Errors and warnings before: 33 this patch: 33
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/deprecated_api success None detected
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 5 this patch: 5
netdev/checkpatch warning CHECK: Alignment should match open parenthesis WARNING: added, moved or deleted file(s), does MAINTAINERS need updating?
netdev/build_clang_rust success No Rust files in patch. Skipping build
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
netdev/contest fail net-next-2025-01-07--03-00 (tests: 883)

Commit Message

John Ousterhout Jan. 6, 2025, 6:12 p.m. UTC
These files implement Homa's mechanism for managing application-level
buffer space for incoming messages This mechanism is needed to allow
Homa to copy data out to user space in parallel with receiving packets;
it was discussed in a talk at NetDev 0x17.

Signed-off-by: John Ousterhout <ouster@cs.stanford.edu>
---
 net/homa/homa_pool.c | 453 +++++++++++++++++++++++++++++++++++++++++++
 net/homa/homa_pool.h | 154 +++++++++++++++
 2 files changed, 607 insertions(+)
 create mode 100644 net/homa/homa_pool.c
 create mode 100644 net/homa/homa_pool.h
diff mbox series

Patch

diff --git a/net/homa/homa_pool.c b/net/homa/homa_pool.c
new file mode 100644
index 000000000000..0b2ec83b6174
--- /dev/null
+++ b/net/homa/homa_pool.c
@@ -0,0 +1,453 @@ 
+// SPDX-License-Identifier: BSD-2-Clause
+
+#include "homa_impl.h"
+#include "homa_pool.h"
+
+/* This file contains functions that manage user-space buffer pools. */
+
+/* Pools must always have at least this many bpages (no particular
+ * reasoning behind this value).
+ */
+#define MIN_POOL_SIZE 2
+
+/* Used when determining how many bpages to consider for allocation. */
+#define MIN_EXTRA 4
+
+/**
+ * set_bpages_needed() - Set the bpages_needed field of @pool based
+ * on the length of the first RPC that's waiting for buffer space.
+ * The caller must own the lock for @pool->hsk.
+ * @pool: Pool to update.
+ */
+static void set_bpages_needed(struct homa_pool *pool)
+{
+	struct homa_rpc *rpc = list_first_entry(&pool->hsk->waiting_for_bufs,
+			struct homa_rpc, buf_links);
+	pool->bpages_needed = (rpc->msgin.length + HOMA_BPAGE_SIZE - 1)
+			>> HOMA_BPAGE_SHIFT;
+}
+
+/**
+ * homa_pool_init() - Initialize a homa_pool; any previous contents are
+ * destroyed.
+ * @hsk:          Socket containing the pool to initialize.
+ * @region:       First byte of the memory region for the pool, allocated
+ *                by the application; must be page-aligned.
+ * @region_size:  Total number of bytes available at @buf_region.
+ * Return: Either zero (for success) or a negative errno for failure.
+ */
+int homa_pool_init(struct homa_sock *hsk, void __user *region,
+		   __u64 region_size)
+{
+	struct homa_pool *pool = hsk->buffer_pool;
+	int i, result;
+
+	homa_pool_destroy(hsk->buffer_pool);
+
+	if (((uintptr_t)region) & ~PAGE_MASK)
+		return -EINVAL;
+	pool->hsk = hsk;
+	pool->region = (char __user *)region;
+	pool->num_bpages = region_size >> HOMA_BPAGE_SHIFT;
+	pool->descriptors = NULL;
+	pool->cores = NULL;
+	if (pool->num_bpages < MIN_POOL_SIZE) {
+		result = -EINVAL;
+		goto error;
+	}
+	pool->descriptors = kmalloc_array(pool->num_bpages,
+					  sizeof(struct homa_bpage),
+					  GFP_ATOMIC);
+	if (!pool->descriptors) {
+		result = -ENOMEM;
+		goto error;
+	}
+	for (i = 0; i < pool->num_bpages; i++) {
+		struct homa_bpage *bp = &pool->descriptors[i];
+
+		spin_lock_init(&bp->lock);
+		atomic_set(&bp->refs, 0);
+		bp->owner = -1;
+		bp->expiration = 0;
+	}
+	atomic_set(&pool->free_bpages, pool->num_bpages);
+	pool->bpages_needed = INT_MAX;
+
+	/* Allocate and initialize core-specific data. */
+	pool->cores = kmalloc_array(nr_cpu_ids, sizeof(struct homa_pool_core),
+				    GFP_ATOMIC);
+	if (!pool->cores) {
+		result = -ENOMEM;
+		goto error;
+	}
+	pool->num_cores = nr_cpu_ids;
+	for (i = 0; i < pool->num_cores; i++) {
+		pool->cores[i].page_hint = 0;
+		pool->cores[i].allocated = 0;
+		pool->cores[i].next_candidate = 0;
+	}
+	pool->check_waiting_invoked = 0;
+
+	return 0;
+
+error:
+	kfree(pool->descriptors);
+	kfree(pool->cores);
+	pool->region = NULL;
+	return result;
+}
+
+/**
+ * homa_pool_destroy() - Destructor for homa_pool. After this method
+ * returns, the object should not be used unless it has been reinitialized.
+ * @pool: Pool to destroy.
+ */
+void homa_pool_destroy(struct homa_pool *pool)
+{
+	if (!pool->region)
+		return;
+	kfree(pool->descriptors);
+	kfree(pool->cores);
+	pool->region = NULL;
+}
+
+/**
+ * homa_pool_get_rcvbuf() - Return information needed to handle getsockopt
+ * for HOMA_SO_RCVBUF.
+ * @hsk:          Socket on which getsockopt request was made.
+ * @args:         Store info here.
+ */
+void homa_pool_get_rcvbuf(struct homa_sock *hsk,
+			  struct homa_rcvbuf_args *args)
+{
+	homa_sock_lock(hsk, "homa_pool_get_rcvbuf");
+	args->start = (uintptr_t)hsk->buffer_pool->region;
+	args->length = hsk->buffer_pool->num_bpages << HOMA_BPAGE_SHIFT;
+	homa_sock_unlock(hsk);
+}
+
+/**
+ * homa_pool_get_pages() - Allocate one or more full pages from the pool.
+ * @pool:         Pool from which to allocate pages
+ * @num_pages:    Number of pages needed
+ * @pages:        The indices of the allocated pages are stored here; caller
+ *                must ensure this array is big enough. Reference counts have
+ *                been set to 1 on all of these pages (or 2 if set_owner
+ *                was specified).
+ * @set_owner:    If nonzero, the current core is marked as owner of all
+ *                of the allocated pages (and the expiration time is also
+ *                set). Otherwise the pages are left unowned.
+ * Return: 0 for success, -1 if there wasn't enough free space in the pool.
+ */
+int homa_pool_get_pages(struct homa_pool *pool, int num_pages, __u32 *pages,
+			int set_owner)
+{
+	int core_num = raw_smp_processor_id();
+	struct homa_pool_core *core;
+	__u64 now = sched_clock();
+	int alloced = 0;
+	int limit = 0;
+
+	core = &pool->cores[core_num];
+	if (atomic_sub_return(num_pages, &pool->free_bpages) < 0) {
+		atomic_add(num_pages, &pool->free_bpages);
+		return -1;
+	}
+
+	/* Once we get to this point we know we will be able to find
+	 * enough free pages; now we just have to find them.
+	 */
+	while (alloced != num_pages) {
+		struct homa_bpage *bpage;
+		int cur, ref_count;
+
+		/* If we don't need to use all of the bpages in the pool,
+		 * then try to use only the ones with low indexes. This
+		 * will reduce the cache footprint for the pool by reusing
+		 * a few bpages over and over. Specifically this code will
+		 * not consider any candidate page whose index is >= limit.
+		 * Limit is chosen to make sure there are a reasonable
+		 * number of free pages in the range, so we won't have to
+		 * check a huge number of pages.
+		 */
+		if (limit == 0) {
+			int extra;
+
+			limit = pool->num_bpages
+					- atomic_read(&pool->free_bpages);
+			extra = limit >> 2;
+			limit += (extra < MIN_EXTRA) ? MIN_EXTRA : extra;
+			if (limit > pool->num_bpages)
+				limit = pool->num_bpages;
+		}
+
+		cur = core->next_candidate;
+		core->next_candidate++;
+		if (cur >= limit) {
+			core->next_candidate = 0;
+
+			/* Must recompute the limit for each new loop through
+			 * the bpage array: we may need to consider a larger
+			 * range of pages because of concurrent allocations.
+			 */
+			limit = 0;
+			continue;
+		}
+		bpage = &pool->descriptors[cur];
+
+		/* Figure out whether this candidate is free (or can be
+		 * stolen). Do a quick check without locking the page, and
+		 * if the page looks promising, then lock it and check again
+		 * (must check again in case someone else snuck in and
+		 * grabbed the page).
+		 */
+		ref_count = atomic_read(&bpage->refs);
+		if (ref_count >= 2 || (ref_count == 1 && (bpage->owner < 0 ||
+				bpage->expiration > now)))
+			continue;
+		if (!spin_trylock_bh(&bpage->lock))
+			continue;
+		ref_count = atomic_read(&bpage->refs);
+		if (ref_count >= 2 || (ref_count == 1 && (bpage->owner < 0 ||
+				bpage->expiration > now))) {
+			spin_unlock_bh(&bpage->lock);
+			continue;
+		}
+		if (bpage->owner >= 0)
+			atomic_inc(&pool->free_bpages);
+		if (set_owner) {
+			atomic_set(&bpage->refs, 2);
+			bpage->owner = core_num;
+			bpage->expiration = now + 1000 *
+					pool->hsk->homa->bpage_lease_usecs;
+		} else {
+			atomic_set(&bpage->refs, 1);
+			bpage->owner = -1;
+		}
+		spin_unlock_bh(&bpage->lock);
+		pages[alloced] = cur;
+		alloced++;
+	}
+	return 0;
+}
+
+/**
+ * homa_pool_allocate() - Allocate buffer space for an RPC.
+ * @rpc:  RPC that needs space allocated for its incoming message (space must
+ *        not already have been allocated). The fields @msgin->num_buffers
+ *        and @msgin->buffers are filled in. Must be locked by caller.
+ * Return: The return value is normally 0, which means either buffer space
+ * was allocated or the @rpc was queued on @hsk->waiting. If a fatal error
+ * occurred, such as no buffer pool present, then a negative errno is
+ * returned.
+ */
+int homa_pool_allocate(struct homa_rpc *rpc)
+{
+	struct homa_pool *pool = rpc->hsk->buffer_pool;
+	int full_pages, partial, i, core_id;
+	__u32 pages[HOMA_MAX_BPAGES];
+	struct homa_pool_core *core;
+	struct homa_bpage *bpage;
+	struct homa_rpc *other;
+
+	if (!pool->region)
+		return -ENOMEM;
+
+	/* First allocate any full bpages that are needed. */
+	full_pages = rpc->msgin.length >> HOMA_BPAGE_SHIFT;
+	if (unlikely(full_pages)) {
+		if (homa_pool_get_pages(pool, full_pages, pages, 0) != 0)
+			goto out_of_space;
+		for (i = 0; i < full_pages; i++)
+			rpc->msgin.bpage_offsets[i] = pages[i] <<
+					HOMA_BPAGE_SHIFT;
+	}
+	rpc->msgin.num_bpages = full_pages;
+
+	/* The last chunk may be less than a full bpage; for this we use
+	 * the bpage that we own (and reuse it for multiple messages).
+	 */
+	partial = rpc->msgin.length & (HOMA_BPAGE_SIZE - 1);
+	if (unlikely(partial == 0))
+		goto success;
+	core_id = raw_smp_processor_id();
+	core = &pool->cores[core_id];
+	bpage = &pool->descriptors[core->page_hint];
+	if (!spin_trylock_bh(&bpage->lock))
+		spin_lock_bh(&bpage->lock);
+	if (bpage->owner != core_id) {
+		spin_unlock_bh(&bpage->lock);
+		goto new_page;
+	}
+	if ((core->allocated + partial) > HOMA_BPAGE_SIZE) {
+		if (atomic_read(&bpage->refs) == 1) {
+			/* Bpage is totally free, so we can reuse it. */
+			core->allocated = 0;
+		} else {
+			bpage->owner = -1;
+
+			/* We know the reference count can't reach zero here
+			 * because of check above, so we won't have to decrement
+			 * pool->free_bpages.
+			 */
+			atomic_dec_return(&bpage->refs);
+			spin_unlock_bh(&bpage->lock);
+			goto new_page;
+		}
+	}
+	bpage->expiration = sched_clock() +
+			1000 * pool->hsk->homa->bpage_lease_usecs;
+	atomic_inc(&bpage->refs);
+	spin_unlock_bh(&bpage->lock);
+	goto allocate_partial;
+
+	/* Can't use the current page; get another one. */
+new_page:
+	if (homa_pool_get_pages(pool, 1, pages, 1) != 0) {
+		homa_pool_release_buffers(pool, rpc->msgin.num_bpages,
+					  rpc->msgin.bpage_offsets);
+		rpc->msgin.num_bpages = 0;
+		goto out_of_space;
+	}
+	core->page_hint = pages[0];
+	core->allocated = 0;
+
+allocate_partial:
+	rpc->msgin.bpage_offsets[rpc->msgin.num_bpages] = core->allocated
+			+ (core->page_hint << HOMA_BPAGE_SHIFT);
+	rpc->msgin.num_bpages++;
+	core->allocated += partial;
+
+success:
+	return 0;
+
+	/* We get here if there wasn't enough buffer space for this
+	 * message; add the RPC to hsk->waiting_for_bufs.
+	 */
+out_of_space:
+	homa_sock_lock(pool->hsk, "homa_pool_allocate");
+	list_for_each_entry(other, &pool->hsk->waiting_for_bufs, buf_links) {
+		if (other->msgin.length > rpc->msgin.length) {
+			list_add_tail(&rpc->buf_links, &other->buf_links);
+			goto queued;
+		}
+	}
+	list_add_tail_rcu(&rpc->buf_links, &pool->hsk->waiting_for_bufs);
+
+queued:
+	set_bpages_needed(pool);
+	homa_sock_unlock(pool->hsk);
+	return 0;
+}
+
+/**
+ * homa_pool_get_buffer() - Given an RPC, figure out where to store incoming
+ * message data.
+ * @rpc:        RPC for which incoming message data is being processed; its
+ *              msgin must be properly initialized and buffer space must have
+ *              been allocated for the message.
+ * @offset:     Offset within @rpc's incoming message.
+ * @available:  Will be filled in with the number of bytes of space available
+ *              at the returned address (could be zero if offset is
+ *              (erroneously) past the end of the message).
+ * Return:      The application's virtual address for buffer space corresponding
+ *              to @offset in the incoming message for @rpc.
+ */
+void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset,
+				  int *available)
+{
+	int bpage_index, bpage_offset;
+
+	bpage_index = offset >> HOMA_BPAGE_SHIFT;
+	if (offset >= rpc->msgin.length) {
+		WARN_ONCE(true, "%s got offset %d >= message length %d\n",
+			  __func__, offset, rpc->msgin.length);
+		*available = 0;
+		return NULL;
+	}
+	bpage_offset = offset & (HOMA_BPAGE_SIZE - 1);
+	*available = (bpage_index < (rpc->msgin.num_bpages - 1))
+			? HOMA_BPAGE_SIZE - bpage_offset
+			: rpc->msgin.length - offset;
+	return rpc->hsk->buffer_pool->region +
+			rpc->msgin.bpage_offsets[bpage_index] + bpage_offset;
+}
+
+/**
+ * homa_pool_release_buffers() - Release buffer space so that it can be
+ * reused.
+ * @pool:         Pool that the buffer space belongs to. Doesn't need to
+ *                be locked.
+ * @num_buffers:  How many buffers to release.
+ * @buffers:      Points to @num_buffers values, each of which is an offset
+ *                from the start of the pool to the buffer to be released.
+ * Return:        0 for success, otherwise a negative errno.
+ */
+int homa_pool_release_buffers(struct homa_pool *pool, int num_buffers,
+			      __u32 *buffers)
+{
+	int result = 0;
+	int i;
+
+	if (!pool->region)
+		return result;
+	for (i = 0; i < num_buffers; i++) {
+		__u32 bpage_index = buffers[i] >> HOMA_BPAGE_SHIFT;
+		struct homa_bpage *bpage = &pool->descriptors[bpage_index];
+
+		if (bpage_index < pool->num_bpages) {
+			if (atomic_dec_return(&bpage->refs) == 0)
+				atomic_inc(&pool->free_bpages);
+		} else {
+			result = -EINVAL;
+		}
+	}
+	return result;
+}
+
+/**
+ * homa_pool_check_waiting() - Checks to see if there are enough free
+ * bpages to wake up any RPCs that were blocked. Whenever
+ * homa_pool_release_buffers is invoked, this function must be invoked later,
+ * at a point when the caller holds no locks (homa_pool_release_buffers may
+ * be invoked with locks held, so it can't safely invoke this function).
+ * This is regrettably tricky, but I can't think of a better solution.
+ * @pool:         Information about the buffer pool.
+ */
+void homa_pool_check_waiting(struct homa_pool *pool)
+{
+	if (!pool->region)
+		return;
+	while (atomic_read(&pool->free_bpages) >= pool->bpages_needed) {
+		struct homa_rpc *rpc;
+
+		homa_sock_lock(pool->hsk, "buffer pool");
+		if (list_empty(&pool->hsk->waiting_for_bufs)) {
+			pool->bpages_needed = INT_MAX;
+			homa_sock_unlock(pool->hsk);
+			break;
+		}
+		rpc = list_first_entry(&pool->hsk->waiting_for_bufs,
+				       struct homa_rpc, buf_links);
+		if (!homa_rpc_try_lock(rpc, "homa_pool_check_waiting")) {
+			/* Can't just spin on the RPC lock because we're
+			 * holding the socket lock (see sync.txt). Instead,
+			 * release the socket lock and try the entire
+			 * operation again.
+			 */
+			homa_sock_unlock(pool->hsk);
+			continue;
+		}
+		list_del_init(&rpc->buf_links);
+		if (list_empty(&pool->hsk->waiting_for_bufs))
+			pool->bpages_needed = INT_MAX;
+		else
+			set_bpages_needed(pool);
+		homa_sock_unlock(pool->hsk);
+		homa_pool_allocate(rpc);
+		if (rpc->msgin.num_bpages > 0)
+			/* Allocation succeeded; "wake up" the RPC. */
+			rpc->msgin.resend_all = 1;
+		homa_rpc_unlock(rpc);
+	}
+}
diff --git a/net/homa/homa_pool.h b/net/homa/homa_pool.h
new file mode 100644
index 000000000000..6dbe7d77dd07
--- /dev/null
+++ b/net/homa/homa_pool.h
@@ -0,0 +1,154 @@ 
+/* SPDX-License-Identifier: BSD-2-Clause */
+
+/* This file contains definitions used to manage user-space buffer pools.
+ */
+
+#ifndef _HOMA_POOL_H
+#define _HOMA_POOL_H
+
+#include "homa_rpc.h"
+
+/**
+ * struct homa_bpage - Contains information about a single page in
+ * a buffer pool.
+ */
+struct homa_bpage {
+	union {
+		/**
+		 * @cache_line: Ensures that each homa_bpage object
+		 * is exactly one cache line long.
+		 */
+		char cache_line[L1_CACHE_BYTES];
+		struct {
+			/** @lock: to synchronize shared access. */
+			spinlock_t lock;
+
+			/**
+			 * @refs: Counts number of distinct uses of this
+			 * bpage (1 tick for each message that is using
+			 * this page, plus an additional tick if the @owner
+			 * field is set).
+			 */
+			atomic_t refs;
+
+			/**
+			 * @owner: kernel core that currently owns this page
+			 * (< 0 if none).
+			 */
+			int owner;
+
+			/**
+			 * @expiration: time (in sched_clock() units) after
+			 * which it's OK to steal this page from its current
+			 * owner (if @refs is 1).
+			 */
+			__u64 expiration;
+		};
+	};
+};
+
+/**
+ * struct homa_pool_core - Holds core-specific data for a homa_pool (a bpage
+ * out of which that core is allocating small chunks).
+ */
+struct homa_pool_core {
+	union {
+		/**
+		 * @cache_line: Ensures that each object is exactly one
+		 * cache line long.
+		 */
+		char cache_line[L1_CACHE_BYTES];
+		struct {
+			/**
+			 * @page_hint: Index of bpage in pool->descriptors,
+			 * which may be owned by this core. If so, we'll use it
+			 * for allocating partial pages.
+			 */
+			int page_hint;
+
+			/**
+			 * @allocated: if the page given by @page_hint is
+			 * owned by this core, this variable gives the number of
+			 * (initial) bytes that have already been allocated
+			 * from the page.
+			 */
+			int allocated;
+
+			/**
+			 * @next_candidate: when searching for free bpages,
+			 * check this index next.
+			 */
+			int next_candidate;
+		};
+	};
+};
+
+/**
+ * struct homa_pool - Describes a pool of buffer space for incoming
+ * messages for a particular socket; managed by homa_pool.c. The pool is
+ * divided up into "bpages", which are a multiple of the hardware page size.
+ * A bpage may be owned by a particular core so that it can more efficiently
+ * allocate space for small messages.
+ */
+struct homa_pool {
+	/**
+	 * @hsk: the socket that this pool belongs to.
+	 */
+	struct homa_sock *hsk;
+
+	/**
+	 * @region: beginning of the pool's region (in the app's virtual
+	 * memory). Divided into bpages. 0 means the pool hasn't yet been
+	 * initialized.
+	 */
+	char __user *region;
+
+	/** @num_bpages: total number of bpages in the pool. */
+	int num_bpages;
+
+	/** @descriptors: kmalloced area containing one entry for each bpage. */
+	struct homa_bpage *descriptors;
+
+	/**
+	 * @free_bpages: the number of pages still available for allocation
+	 * by homa_pool_get pages. This equals the number of pages with zero
+	 * reference counts, minus the number of pages that have been claimed
+	 * by homa_get_pool_pages but not yet allocated.
+	 */
+	atomic_t free_bpages;
+
+	/**
+	 * @bpages_needed: the number of free bpages required to satisfy the
+	 * needs of the first RPC on @hsk->waiting_for_bufs, or INT_MAX if
+	 * that queue is empty.
+	 */
+	int bpages_needed;
+
+	/** @cores: core-specific info; dynamically allocated. */
+	struct homa_pool_core *cores;
+
+	/** @num_cores: number of elements in @cores. */
+	int num_cores;
+
+	/**
+	 * @check_waiting_invoked: incremented during unit tests when
+	 * homa_pool_check_waiting is invoked.
+	 */
+	int check_waiting_invoked;
+};
+
+int      homa_pool_allocate(struct homa_rpc *rpc);
+void     homa_pool_check_waiting(struct homa_pool *pool);
+void     homa_pool_destroy(struct homa_pool *pool);
+void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset,
+				  int *available);
+int      homa_pool_get_pages(struct homa_pool *pool, int num_pages,
+			     __u32 *pages, int leave_locked);
+void     homa_pool_get_rcvbuf(struct homa_sock *hsk,
+			      struct homa_rcvbuf_args *args);
+int      homa_pool_init(struct homa_sock *hsk, void *buf_region,
+			__u64 region_size);
+int      homa_pool_release_buffers(struct homa_pool *pool,
+				   int num_buffers, __u32 *buffers);
+
+#endif /* _HOMA_POOL_H */