diff mbox series

[v2,06/31] netfs: Abstract out a rolling folio buffer implementation

Message ID 20241025204008.4076565-7-dhowells@redhat.com (mailing list archive)
State New
Headers show
Series netfs: Read performance improvements and "single-blob" support | expand

Commit Message

David Howells Oct. 25, 2024, 8:39 p.m. UTC
A rolling buffer is a series of folios held in a list of folio_queues.  New
folios and folio_queue structs may be inserted at the head simultaneously
with spent ones being removed from the tail without the need for locking.

The rolling buffer includes an iov_iter and it has to be careful managing
this as the list of folio_queues is extended such that an oops doesn't
incurred because the iterator was pointing to the end of a folio_queue
segment that got appended to and then removed.

We need to use the mechanism twice, once for read and once for write, and,
in future patches, we will use a second rolling buffer to handle bounce
buffering for content encryption.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Jeff Layton <jlayton@kernel.org>
cc: netfs@lists.linux.dev
cc: linux-fsdevel@vger.kernel.org
---
 fs/netfs/Makefile              |   1 +
 fs/netfs/buffered_read.c       | 119 ++++-------------
 fs/netfs/direct_read.c         |  14 +-
 fs/netfs/direct_write.c        |  10 +-
 fs/netfs/internal.h            |   4 -
 fs/netfs/misc.c                | 147 ---------------------
 fs/netfs/objects.c             |   2 +-
 fs/netfs/read_pgpriv2.c        |  32 ++---
 fs/netfs/read_retry.c          |   2 +-
 fs/netfs/rolling_buffer.c      | 225 +++++++++++++++++++++++++++++++++
 fs/netfs/write_collect.c       |  19 +--
 fs/netfs/write_issue.c         |  26 ++--
 include/linux/netfs.h          |  10 +-
 include/linux/rolling_buffer.h |  61 +++++++++
 include/trace/events/netfs.h   |   2 +
 15 files changed, 375 insertions(+), 299 deletions(-)
 create mode 100644 fs/netfs/rolling_buffer.c
 create mode 100644 include/linux/rolling_buffer.h
diff mbox series

Patch

diff --git a/fs/netfs/Makefile b/fs/netfs/Makefile
index d08b0bfb6756..7492c4aa331e 100644
--- a/fs/netfs/Makefile
+++ b/fs/netfs/Makefile
@@ -13,6 +13,7 @@  netfs-y := \
 	read_collect.o \
 	read_pgpriv2.o \
 	read_retry.o \
+	rolling_buffer.o \
 	write_collect.o \
 	write_issue.o
 
diff --git a/fs/netfs/buffered_read.c b/fs/netfs/buffered_read.c
index df94538fde96..4cacb46e0cf7 100644
--- a/fs/netfs/buffered_read.c
+++ b/fs/netfs/buffered_read.c
@@ -63,37 +63,6 @@  static int netfs_begin_cache_read(struct netfs_io_request *rreq, struct netfs_in
 	return fscache_begin_read_operation(&rreq->cache_resources, netfs_i_cookie(ctx));
 }
 
-/*
- * Decant the list of folios to read into a rolling buffer.
- */
-static size_t netfs_load_buffer_from_ra(struct netfs_io_request *rreq,
-					struct folio_queue *folioq,
-					struct folio_batch *put_batch)
-{
-	unsigned int order, nr;
-	size_t size = 0;
-
-	nr = __readahead_batch(rreq->ractl, (struct page **)folioq->vec.folios,
-			       ARRAY_SIZE(folioq->vec.folios));
-	folioq->vec.nr = nr;
-	for (int i = 0; i < nr; i++) {
-		struct folio *folio = folioq_folio(folioq, i);
-
-		trace_netfs_folio(folio, netfs_folio_trace_read);
-		order = folio_order(folio);
-		folioq->orders[i] = order;
-		size += PAGE_SIZE << order;
-
-		if (!folio_batch_add(put_batch, folio))
-			folio_batch_release(put_batch);
-	}
-
-	for (int i = nr; i < folioq_nr_slots(folioq); i++)
-		folioq_clear(folioq, i);
-
-	return size;
-}
-
 /*
  * netfs_prepare_read_iterator - Prepare the subreq iterator for I/O
  * @subreq: The subrequest to be set up
@@ -128,18 +97,12 @@  static ssize_t netfs_prepare_read_iterator(struct netfs_io_subrequest *subreq)
 
 		folio_batch_init(&put_batch);
 		while (rreq->submitted < subreq->start + rsize) {
-			struct folio_queue *tail = rreq->buffer_tail, *new;
-			size_t added;
-
-			new = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS,
-						 netfs_trace_folioq_alloc_read_prep);
-			if (!new)
-				return -ENOMEM;
-			new->prev = tail;
-			tail->next = new;
-			rreq->buffer_tail = new;
-			added = netfs_load_buffer_from_ra(rreq, new, &put_batch);
-			rreq->iter.count += added;
+			ssize_t added;
+
+			added = rolling_buffer_load_from_ra(&rreq->buffer, rreq->ractl,
+							    &put_batch);
+			if (added < 0)
+				return added;
 			rreq->submitted += added;
 		}
 		folio_batch_release(&put_batch);
@@ -147,7 +110,7 @@  static ssize_t netfs_prepare_read_iterator(struct netfs_io_subrequest *subreq)
 
 	subreq->len = rsize;
 	if (unlikely(rreq->io_streams[0].sreq_max_segs)) {
-		size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize,
+		size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize,
 						rreq->io_streams[0].sreq_max_segs);
 
 		if (limit < rsize) {
@@ -156,20 +119,16 @@  static ssize_t netfs_prepare_read_iterator(struct netfs_io_subrequest *subreq)
 		}
 	}
 
-	subreq->io_iter	= rreq->iter;
+	subreq->io_iter	= rreq->buffer.iter;
 
 	if (iov_iter_is_folioq(&subreq->io_iter)) {
-		if (subreq->io_iter.folioq_slot >= folioq_nr_slots(subreq->io_iter.folioq)) {
-			subreq->io_iter.folioq = subreq->io_iter.folioq->next;
-			subreq->io_iter.folioq_slot = 0;
-		}
 		subreq->curr_folioq = (struct folio_queue *)subreq->io_iter.folioq;
 		subreq->curr_folioq_slot = subreq->io_iter.folioq_slot;
 		subreq->curr_folio_order = subreq->curr_folioq->orders[subreq->curr_folioq_slot];
 	}
 
 	iov_iter_truncate(&subreq->io_iter, subreq->len);
-	iov_iter_advance(&rreq->iter, subreq->len);
+	rolling_buffer_advance(&rreq->buffer, subreq->len);
 	return subreq->len;
 }
 
@@ -348,34 +307,6 @@  static int netfs_wait_for_read(struct netfs_io_request *rreq)
 	return ret;
 }
 
-/*
- * Set up the initial folioq of buffer folios in the rolling buffer and set the
- * iterator to refer to it.
- */
-static int netfs_prime_buffer(struct netfs_io_request *rreq)
-{
-	struct folio_queue *folioq;
-	struct folio_batch put_batch;
-	size_t added;
-
-	folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL,
-				    netfs_trace_folioq_alloc_read_prime);
-	if (!folioq)
-		return -ENOMEM;
-
-	rreq->buffer = folioq;
-	rreq->buffer_tail = folioq;
-	rreq->submitted = rreq->start;
-	iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, 0);
-
-	folio_batch_init(&put_batch);
-	added = netfs_load_buffer_from_ra(rreq, folioq, &put_batch);
-	folio_batch_release(&put_batch);
-	rreq->iter.count += added;
-	rreq->submitted += added;
-	return 0;
-}
-
 /**
  * netfs_readahead - Helper to manage a read request
  * @ractl: The description of the readahead request
@@ -415,7 +346,8 @@  void netfs_readahead(struct readahead_control *ractl)
 	netfs_rreq_expand(rreq, ractl);
 
 	rreq->ractl = ractl;
-	if (netfs_prime_buffer(rreq) < 0)
+	rreq->submitted = rreq->start;
+	if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0)
 		goto cleanup_free;
 	netfs_read_to_pagecache(rreq);
 
@@ -431,22 +363,18 @@  EXPORT_SYMBOL(netfs_readahead);
 /*
  * Create a rolling buffer with a single occupying folio.
  */
-static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct folio *folio)
+static int netfs_create_singular_buffer(struct netfs_io_request *rreq, struct folio *folio,
+					unsigned int rollbuf_flags)
 {
-	struct folio_queue *folioq;
+	ssize_t added;
 
-	folioq = netfs_folioq_alloc(rreq->debug_id, GFP_KERNEL,
-				    netfs_trace_folioq_alloc_read_sing);
-	if (!folioq)
+	if (rolling_buffer_init(&rreq->buffer, rreq->debug_id, ITER_DEST) < 0)
 		return -ENOMEM;
 
-	folioq_append(folioq, folio);
-	BUG_ON(folioq_folio(folioq, 0) != folio);
-	BUG_ON(folioq_folio_order(folioq, 0) != folio_order(folio));
-	rreq->buffer = folioq;
-	rreq->buffer_tail = folioq;
-	rreq->submitted = rreq->start + rreq->len;
-	iov_iter_folio_queue(&rreq->iter, ITER_DEST, folioq, 0, 0, rreq->len);
+	added = rolling_buffer_append(&rreq->buffer, folio, rollbuf_flags);
+	if (added < 0)
+		return added;
+	rreq->submitted = rreq->start + added;
 	rreq->ractl = (struct readahead_control *)1UL;
 	return 0;
 }
@@ -514,7 +442,7 @@  static int netfs_read_gaps(struct file *file, struct folio *folio)
 	}
 	if (to < flen)
 		bvec_set_folio(&bvec[i++], folio, flen - to, to);
-	iov_iter_bvec(&rreq->iter, ITER_DEST, bvec, i, rreq->len);
+	iov_iter_bvec(&rreq->buffer.iter, ITER_DEST, bvec, i, rreq->len);
 	rreq->submitted = rreq->start + flen;
 
 	netfs_read_to_pagecache(rreq);
@@ -582,7 +510,7 @@  int netfs_read_folio(struct file *file, struct folio *folio)
 	trace_netfs_read(rreq, rreq->start, rreq->len, netfs_read_trace_readpage);
 
 	/* Set up the output buffer */
-	ret = netfs_create_singular_buffer(rreq, folio);
+	ret = netfs_create_singular_buffer(rreq, folio, 0);
 	if (ret < 0)
 		goto discard;
 
@@ -739,7 +667,7 @@  int netfs_write_begin(struct netfs_inode *ctx,
 	trace_netfs_read(rreq, pos, len, netfs_read_trace_write_begin);
 
 	/* Set up the output buffer */
-	ret = netfs_create_singular_buffer(rreq, folio);
+	ret = netfs_create_singular_buffer(rreq, folio, 0);
 	if (ret < 0)
 		goto error_put;
 
@@ -804,11 +732,10 @@  int netfs_prefetch_for_write(struct file *file, struct folio *folio,
 	trace_netfs_read(rreq, start, flen, netfs_read_trace_prefetch_for_write);
 
 	/* Set up the output buffer */
-	ret = netfs_create_singular_buffer(rreq, folio);
+	ret = netfs_create_singular_buffer(rreq, folio, NETFS_ROLLBUF_PAGECACHE_MARK);
 	if (ret < 0)
 		goto error_put;
 
-	folioq_mark2(rreq->buffer, 0);
 	netfs_read_to_pagecache(rreq);
 	ret = netfs_wait_for_read(rreq);
 	netfs_put_request(rreq, false, netfs_rreq_trace_put_return);
diff --git a/fs/netfs/direct_read.c b/fs/netfs/direct_read.c
index b1a66a6e6bc2..a3f23adbae0f 100644
--- a/fs/netfs/direct_read.c
+++ b/fs/netfs/direct_read.c
@@ -25,7 +25,7 @@  static void netfs_prepare_dio_read_iterator(struct netfs_io_subrequest *subreq)
 	subreq->len = rsize;
 
 	if (unlikely(rreq->io_streams[0].sreq_max_segs)) {
-		size_t limit = netfs_limit_iter(&rreq->iter, 0, rsize,
+		size_t limit = netfs_limit_iter(&rreq->buffer.iter, 0, rsize,
 						rreq->io_streams[0].sreq_max_segs);
 
 		if (limit < rsize) {
@@ -36,9 +36,9 @@  static void netfs_prepare_dio_read_iterator(struct netfs_io_subrequest *subreq)
 
 	trace_netfs_sreq(subreq, netfs_sreq_trace_prepare);
 
-	subreq->io_iter	= rreq->iter;
+	subreq->io_iter	= rreq->buffer.iter;
 	iov_iter_truncate(&subreq->io_iter, subreq->len);
-	iov_iter_advance(&rreq->iter, subreq->len);
+	iov_iter_advance(&rreq->buffer.iter, subreq->len);
 }
 
 /*
@@ -199,15 +199,15 @@  ssize_t netfs_unbuffered_read_iter_locked(struct kiocb *iocb, struct iov_iter *i
 	 * the request.
 	 */
 	if (user_backed_iter(iter)) {
-		ret = netfs_extract_user_iter(iter, rreq->len, &rreq->iter, 0);
+		ret = netfs_extract_user_iter(iter, rreq->len, &rreq->buffer.iter, 0);
 		if (ret < 0)
 			goto out;
-		rreq->direct_bv = (struct bio_vec *)rreq->iter.bvec;
+		rreq->direct_bv = (struct bio_vec *)rreq->buffer.iter.bvec;
 		rreq->direct_bv_count = ret;
 		rreq->direct_bv_unpin = iov_iter_extract_will_pin(iter);
-		rreq->len = iov_iter_count(&rreq->iter);
+		rreq->len = iov_iter_count(&rreq->buffer.iter);
 	} else {
-		rreq->iter = *iter;
+		rreq->buffer.iter = *iter;
 		rreq->len = orig_count;
 		rreq->direct_bv_unpin = false;
 		iov_iter_advance(iter, orig_count);
diff --git a/fs/netfs/direct_write.c b/fs/netfs/direct_write.c
index 88f2adfab75e..0722fb9919a3 100644
--- a/fs/netfs/direct_write.c
+++ b/fs/netfs/direct_write.c
@@ -68,19 +68,19 @@  ssize_t netfs_unbuffered_write_iter_locked(struct kiocb *iocb, struct iov_iter *
 		 * request.
 		 */
 		if (async || user_backed_iter(iter)) {
-			n = netfs_extract_user_iter(iter, len, &wreq->iter, 0);
+			n = netfs_extract_user_iter(iter, len, &wreq->buffer.iter, 0);
 			if (n < 0) {
 				ret = n;
 				goto out;
 			}
-			wreq->direct_bv = (struct bio_vec *)wreq->iter.bvec;
+			wreq->direct_bv = (struct bio_vec *)wreq->buffer.iter.bvec;
 			wreq->direct_bv_count = n;
 			wreq->direct_bv_unpin = iov_iter_extract_will_pin(iter);
 		} else {
-			wreq->iter = *iter;
+			wreq->buffer.iter = *iter;
 		}
 
-		wreq->io_iter = wreq->iter;
+		wreq->buffer.iter = wreq->buffer.iter;
 	}
 
 	__set_bit(NETFS_RREQ_USE_IO_ITER, &wreq->flags);
@@ -92,7 +92,7 @@  ssize_t netfs_unbuffered_write_iter_locked(struct kiocb *iocb, struct iov_iter *
 	__set_bit(NETFS_RREQ_UPLOAD_TO_SERVER, &wreq->flags);
 	if (async)
 		wreq->iocb = iocb;
-	wreq->len = iov_iter_count(&wreq->io_iter);
+	wreq->len = iov_iter_count(&wreq->buffer.iter);
 	wreq->cleanup = netfs_cleanup_dio_write;
 	ret = netfs_unbuffered_write(wreq, is_sync_kiocb(iocb), wreq->len);
 	if (ret < 0) {
diff --git a/fs/netfs/internal.h b/fs/netfs/internal.h
index 01b013f558f7..ccd9058acb61 100644
--- a/fs/netfs/internal.h
+++ b/fs/netfs/internal.h
@@ -60,10 +60,6 @@  static inline void netfs_proc_del_rreq(struct netfs_io_request *rreq) {}
  */
 struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq,
 					    enum netfs_folioq_trace trace);
-int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio *folio,
-			      bool needs_put);
-struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq);
-void netfs_clear_buffer(struct netfs_io_request *rreq);
 void netfs_reset_iter(struct netfs_io_subrequest *subreq);
 
 /*
diff --git a/fs/netfs/misc.c b/fs/netfs/misc.c
index afe032551de5..4249715f4171 100644
--- a/fs/netfs/misc.c
+++ b/fs/netfs/misc.c
@@ -8,153 +8,6 @@ 
 #include <linux/swap.h>
 #include "internal.h"
 
-/**
- * netfs_folioq_alloc - Allocate a folio_queue struct
- * @rreq_id: Associated debugging ID for tracing purposes
- * @gfp: Allocation constraints
- * @trace: Trace tag to indicate the purpose of the allocation
- *
- * Allocate, initialise and account the folio_queue struct and log a trace line
- * to mark the allocation.
- */
-struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
-				       unsigned int /*enum netfs_folioq_trace*/ trace)
-{
-	static atomic_t debug_ids;
-	struct folio_queue *fq;
-
-	fq = kmalloc(sizeof(*fq), gfp);
-	if (fq) {
-		netfs_stat(&netfs_n_folioq);
-		folioq_init(fq, rreq_id);
-		fq->debug_id = atomic_inc_return(&debug_ids);
-		trace_netfs_folioq(fq, trace);
-	}
-	return fq;
-}
-EXPORT_SYMBOL(netfs_folioq_alloc);
-
-/**
- * netfs_folioq_free - Free a folio_queue struct
- * @folioq: The object to free
- * @trace: Trace tag to indicate which free
- *
- * Free and unaccount the folio_queue struct.
- */
-void netfs_folioq_free(struct folio_queue *folioq,
-		       unsigned int /*enum netfs_trace_folioq*/ trace)
-{
-	trace_netfs_folioq(folioq, trace);
-	netfs_stat_d(&netfs_n_folioq);
-	kfree(folioq);
-}
-EXPORT_SYMBOL(netfs_folioq_free);
-
-/*
- * Make sure there's space in the rolling queue.
- */
-struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq,
-					    enum netfs_folioq_trace trace)
-{
-	struct folio_queue *tail = rreq->buffer_tail, *prev;
-	unsigned int prev_nr_slots = 0;
-
-	if (WARN_ON_ONCE(!rreq->buffer && tail) ||
-	    WARN_ON_ONCE(rreq->buffer && !tail))
-		return ERR_PTR(-EIO);
-
-	prev = tail;
-	if (prev) {
-		if (!folioq_full(tail))
-			return tail;
-		prev_nr_slots = folioq_nr_slots(tail);
-	}
-
-	tail = netfs_folioq_alloc(rreq->debug_id, GFP_NOFS, trace);
-	if (!tail)
-		return ERR_PTR(-ENOMEM);
-	tail->prev = prev;
-	if (prev)
-		/* [!] NOTE: After we set prev->next, the consumer is entirely
-		 * at liberty to delete prev.
-		 */
-		WRITE_ONCE(prev->next, tail);
-
-	rreq->buffer_tail = tail;
-	if (!rreq->buffer) {
-		rreq->buffer = tail;
-		iov_iter_folio_queue(&rreq->io_iter, ITER_SOURCE, tail, 0, 0, 0);
-	} else {
-		/* Make sure we don't leave the master iterator pointing to a
-		 * block that might get immediately consumed.
-		 */
-		if (rreq->io_iter.folioq == prev &&
-		    rreq->io_iter.folioq_slot == prev_nr_slots) {
-			rreq->io_iter.folioq = tail;
-			rreq->io_iter.folioq_slot = 0;
-		}
-	}
-	rreq->buffer_tail_slot = 0;
-	return tail;
-}
-
-/*
- * Append a folio to the rolling queue.
- */
-int netfs_buffer_append_folio(struct netfs_io_request *rreq, struct folio *folio,
-			      bool needs_put)
-{
-	struct folio_queue *tail;
-	unsigned int slot, order = folio_order(folio);
-
-	tail = netfs_buffer_make_space(rreq, netfs_trace_folioq_alloc_append_folio);
-	if (IS_ERR(tail))
-		return PTR_ERR(tail);
-
-	rreq->io_iter.count += PAGE_SIZE << order;
-
-	slot = folioq_append(tail, folio);
-	/* Store the counter after setting the slot. */
-	smp_store_release(&rreq->buffer_tail_slot, slot);
-	return 0;
-}
-
-/*
- * Delete the head of a rolling queue.
- */
-struct folio_queue *netfs_delete_buffer_head(struct netfs_io_request *wreq)
-{
-	struct folio_queue *head = wreq->buffer, *next = head->next;
-
-	if (next)
-		next->prev = NULL;
-	netfs_folioq_free(head, netfs_trace_folioq_delete);
-	wreq->buffer = next;
-	return next;
-}
-
-/*
- * Clear out a rolling queue.
- */
-void netfs_clear_buffer(struct netfs_io_request *rreq)
-{
-	struct folio_queue *p;
-
-	while ((p = rreq->buffer)) {
-		rreq->buffer = p->next;
-		for (int slot = 0; slot < folioq_count(p); slot++) {
-			struct folio *folio = folioq_folio(p, slot);
-			if (!folio)
-				continue;
-			if (folioq_is_marked(p, slot)) {
-				trace_netfs_folio(folio, netfs_folio_trace_put);
-				folio_put(folio);
-			}
-		}
-		netfs_folioq_free(p, netfs_trace_folioq_clear);
-	}
-}
-
 /*
  * Reset the subrequest iterator to refer just to the region remaining to be
  * read.  The iterator may or may not have been advanced by socket ops or
diff --git a/fs/netfs/objects.c b/fs/netfs/objects.c
index 31e388ec6e48..5cdddaf1f978 100644
--- a/fs/netfs/objects.c
+++ b/fs/netfs/objects.c
@@ -143,7 +143,7 @@  static void netfs_free_request(struct work_struct *work)
 		}
 		kvfree(rreq->direct_bv);
 	}
-	netfs_clear_buffer(rreq);
+	rolling_buffer_clear(&rreq->buffer);
 
 	if (atomic_dec_and_test(&ictx->io_count))
 		wake_up_var(&ictx->io_count);
diff --git a/fs/netfs/read_pgpriv2.c b/fs/netfs/read_pgpriv2.c
index ba5af89d37fa..d84dccc44cab 100644
--- a/fs/netfs/read_pgpriv2.c
+++ b/fs/netfs/read_pgpriv2.c
@@ -34,8 +34,9 @@  void netfs_pgpriv2_mark_copy_to_cache(struct netfs_io_subrequest *subreq,
  * [DEPRECATED] Cancel PG_private_2 on all marked folios in the event of an
  * unrecoverable error.
  */
-static void netfs_pgpriv2_cancel(struct folio_queue *folioq)
+static void netfs_pgpriv2_cancel(struct rolling_buffer *buffer)
 {
+	struct folio_queue *folioq = buffer->tail;
 	struct folio *folio;
 	int slot;
 
@@ -94,7 +95,7 @@  static int netfs_pgpriv2_copy_folio(struct netfs_io_request *wreq, struct folio
 	trace_netfs_folio(folio, netfs_folio_trace_store_copy);
 
 	/* Attach the folio to the rolling buffer. */
-	if (netfs_buffer_append_folio(wreq, folio, false) < 0)
+	if (rolling_buffer_append(&wreq->buffer, folio, 0) < 0)
 		return -ENOMEM;
 
 	cache->submit_extendable_to = fsize;
@@ -109,7 +110,7 @@  static int netfs_pgpriv2_copy_folio(struct netfs_io_request *wreq, struct folio
 	do {
 		ssize_t part;
 
-		wreq->io_iter.iov_offset = cache->submit_off;
+		wreq->buffer.iter.iov_offset = cache->submit_off;
 
 		atomic64_set(&wreq->issued_to, fpos + cache->submit_off);
 		cache->submit_extendable_to = fsize - cache->submit_off;
@@ -122,8 +123,8 @@  static int netfs_pgpriv2_copy_folio(struct netfs_io_request *wreq, struct folio
 			cache->submit_len -= part;
 	} while (cache->submit_len > 0);
 
-	wreq->io_iter.iov_offset = 0;
-	iov_iter_advance(&wreq->io_iter, fsize);
+	wreq->buffer.iter.iov_offset = 0;
+	rolling_buffer_advance(&wreq->buffer, fsize);
 	atomic64_set(&wreq->issued_to, fpos + fsize);
 
 	if (flen < fsize)
@@ -151,7 +152,7 @@  void netfs_pgpriv2_write_to_the_cache(struct netfs_io_request *rreq)
 		goto couldnt_start;
 
 	/* Need the first folio to be able to set up the op. */
-	for (folioq = rreq->buffer; folioq; folioq = folioq->next) {
+	for (folioq = rreq->buffer.tail; folioq; folioq = folioq->next) {
 		if (folioq->marks3) {
 			slot = __ffs(folioq->marks3);
 			break;
@@ -194,7 +195,7 @@  void netfs_pgpriv2_write_to_the_cache(struct netfs_io_request *rreq)
 	netfs_put_request(wreq, false, netfs_rreq_trace_put_return);
 	_leave(" = %d", error);
 couldnt_start:
-	netfs_pgpriv2_cancel(rreq->buffer);
+	netfs_pgpriv2_cancel(&rreq->buffer);
 }
 
 /*
@@ -203,13 +204,13 @@  void netfs_pgpriv2_write_to_the_cache(struct netfs_io_request *rreq)
  */
 bool netfs_pgpriv2_unlock_copied_folios(struct netfs_io_request *wreq)
 {
-	struct folio_queue *folioq = wreq->buffer;
+	struct folio_queue *folioq = wreq->buffer.tail;
 	unsigned long long collected_to = wreq->collected_to;
-	unsigned int slot = wreq->buffer_head_slot;
+	unsigned int slot = wreq->buffer.first_tail_slot;
 	bool made_progress = false;
 
 	if (slot >= folioq_nr_slots(folioq)) {
-		folioq = netfs_delete_buffer_head(wreq);
+		folioq = rolling_buffer_delete_spent(&wreq->buffer);
 		slot = 0;
 	}
 
@@ -248,9 +249,9 @@  bool netfs_pgpriv2_unlock_copied_folios(struct netfs_io_request *wreq)
 		folioq_clear(folioq, slot);
 		slot++;
 		if (slot >= folioq_nr_slots(folioq)) {
-			if (READ_ONCE(wreq->buffer_tail) == folioq)
-				break;
-			folioq = netfs_delete_buffer_head(wreq);
+			folioq = rolling_buffer_delete_spent(&wreq->buffer);
+			if (!folioq)
+				goto done;
 			slot = 0;
 		}
 
@@ -258,7 +259,8 @@  bool netfs_pgpriv2_unlock_copied_folios(struct netfs_io_request *wreq)
 			break;
 	}
 
-	wreq->buffer = folioq;
-	wreq->buffer_head_slot = slot;
+	wreq->buffer.tail = folioq;
+done:
+	wreq->buffer.first_tail_slot = slot;
 	return made_progress;
 }
diff --git a/fs/netfs/read_retry.c b/fs/netfs/read_retry.c
index 0350592ea804..0fe7677b4022 100644
--- a/fs/netfs/read_retry.c
+++ b/fs/netfs/read_retry.c
@@ -243,7 +243,7 @@  void netfs_unlock_abandoned_read_pages(struct netfs_io_request *rreq)
 {
 	struct folio_queue *p;
 
-	for (p = rreq->buffer; p; p = p->next) {
+	for (p = rreq->buffer.tail; p; p = p->next) {
 		for (int slot = 0; slot < folioq_count(p); slot++) {
 			struct folio *folio = folioq_folio(p, slot);
 
diff --git a/fs/netfs/rolling_buffer.c b/fs/netfs/rolling_buffer.c
new file mode 100644
index 000000000000..539ecc3b32be
--- /dev/null
+++ b/fs/netfs/rolling_buffer.c
@@ -0,0 +1,225 @@ 
+// SPDX-License-Identifier: GPL-2.0-or-later
+/* Rolling buffer helpers
+ *
+ * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#include <linux/bitops.h>
+#include <linux/pagemap.h>
+#include <linux/rolling_buffer.h>
+#include <linux/slab.h>
+#include "internal.h"
+
+static atomic_t debug_ids;
+
+/**
+ * netfs_folioq_alloc - Allocate a folio_queue struct
+ * @rreq_id: Associated debugging ID for tracing purposes
+ * @gfp: Allocation constraints
+ * @trace: Trace tag to indicate the purpose of the allocation
+ *
+ * Allocate, initialise and account the folio_queue struct and log a trace line
+ * to mark the allocation.
+ */
+struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
+				       unsigned int /*enum netfs_folioq_trace*/ trace)
+{
+	struct folio_queue *fq;
+
+	fq = kmalloc(sizeof(*fq), gfp);
+	if (fq) {
+		netfs_stat(&netfs_n_folioq);
+		folioq_init(fq, rreq_id);
+		fq->debug_id = atomic_inc_return(&debug_ids);
+		trace_netfs_folioq(fq, trace);
+	}
+	return fq;
+}
+EXPORT_SYMBOL(netfs_folioq_alloc);
+
+/**
+ * netfs_folioq_free - Free a folio_queue struct
+ * @folioq: The object to free
+ * @trace: Trace tag to indicate which free
+ *
+ * Free and unaccount the folio_queue struct.
+ */
+void netfs_folioq_free(struct folio_queue *folioq,
+		       unsigned int /*enum netfs_trace_folioq*/ trace)
+{
+	trace_netfs_folioq(folioq, trace);
+	netfs_stat_d(&netfs_n_folioq);
+	kfree(folioq);
+}
+EXPORT_SYMBOL(netfs_folioq_free);
+
+/*
+ * Initialise a rolling buffer.  We allocate an empty folio queue struct to so
+ * that the pointers can be independently driven by the producer and the
+ * consumer.
+ */
+int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
+			unsigned int direction)
+{
+	struct folio_queue *fq;
+
+	fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, netfs_trace_folioq_rollbuf_init);
+	if (!fq)
+		return -ENOMEM;
+
+	roll->head = fq;
+	roll->tail = fq;
+	iov_iter_folio_queue(&roll->iter, direction, fq, 0, 0, 0);
+	return 0;
+}
+
+/*
+ * Add another folio_queue to a rolling buffer if there's no space left.
+ */
+int rolling_buffer_make_space(struct rolling_buffer *roll)
+{
+	struct folio_queue *fq, *head = roll->head;
+
+	if (!folioq_full(head))
+		return 0;
+
+	fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, netfs_trace_folioq_make_space);
+	if (!fq)
+		return -ENOMEM;
+	fq->prev = head;
+
+	roll->head = fq;
+	if (folioq_full(head)) {
+		/* Make sure we don't leave the master iterator pointing to a
+		 * block that might get immediately consumed.
+		 */
+		if (roll->iter.folioq == head &&
+		    roll->iter.folioq_slot == folioq_nr_slots(head)) {
+			roll->iter.folioq = fq;
+			roll->iter.folioq_slot = 0;
+		}
+	}
+
+	/* Make sure the initialisation is stored before the next pointer.
+	 *
+	 * [!] NOTE: After we set head->next, the consumer is at liberty to
+	 * immediately delete the old head.
+	 */
+	smp_store_release(&head->next, fq);
+	return 0;
+}
+
+/*
+ * Decant the list of folios to read into a rolling buffer.
+ */
+ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
+				    struct readahead_control *ractl,
+				    struct folio_batch *put_batch)
+{
+	struct folio_queue *fq;
+	struct page **vec;
+	int nr, ix, to;
+	ssize_t size = 0;
+
+	if (rolling_buffer_make_space(roll) < 0)
+		return -ENOMEM;
+
+	fq = roll->head;
+	vec = (struct page **)fq->vec.folios;
+	nr = __readahead_batch(ractl, vec + folio_batch_count(&fq->vec),
+			       folio_batch_space(&fq->vec));
+	ix = fq->vec.nr;
+	to = ix + nr;
+	fq->vec.nr = to;
+	for (; ix < to; ix++) {
+		struct folio *folio = folioq_folio(fq, ix);
+		unsigned int order = folio_order(folio);
+
+		fq->orders[ix] = order;
+		size += PAGE_SIZE << order;
+		trace_netfs_folio(folio, netfs_folio_trace_read);
+		if (!folio_batch_add(put_batch, folio))
+			folio_batch_release(put_batch);
+	}
+	WRITE_ONCE(roll->iter.count, roll->iter.count + size);
+
+	/* Store the counter after setting the slot. */
+	smp_store_release(&roll->next_head_slot, to);
+
+	for (; ix < folioq_nr_slots(fq); ix++)
+		folioq_clear(fq, ix);
+
+	return size;
+}
+
+/*
+ * Append a folio to the rolling buffer.
+ */
+ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
+ 			      unsigned int flags)
+{
+	ssize_t size = folio_size(folio);
+	int slot;
+
+	if (rolling_buffer_make_space(roll) < 0)
+		return -ENOMEM;
+
+	slot = folioq_append(roll->head, folio);
+	if (flags & ROLLBUF_MARK_1)
+		folioq_mark(roll->head, slot);
+	if (flags & ROLLBUF_MARK_2)
+		folioq_mark2(roll->head, slot);
+
+	WRITE_ONCE(roll->iter.count, roll->iter.count + size);
+
+	/* Store the counter after setting the slot. */
+	smp_store_release(&roll->next_head_slot, slot);
+	return size;
+}
+
+/*
+ * Delete a spent buffer from a rolling queue and return the next in line.  We
+ * don't return the last buffer to keep the pointers independent, but return
+ * NULL instead.
+ */
+struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll)
+{
+	struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next);
+
+	if (!next)
+		return NULL;
+	next->prev = NULL;
+	netfs_folioq_free(spent, netfs_trace_folioq_delete);
+	roll->tail = next;
+	return next;
+}
+
+/*
+ * Clear out a rolling queue.  Folios that have mark 1 set are put.
+ */
+void rolling_buffer_clear(struct rolling_buffer *roll)
+{
+	struct folio_batch fbatch;
+	struct folio_queue *p;
+
+	folio_batch_init(&fbatch);
+
+	while ((p = roll->tail)) {
+		roll->tail = p->next;
+		for (int slot = 0; slot < folioq_count(p); slot++) {
+			struct folio *folio = folioq_folio(p, slot);
+			if (!folio)
+				continue;
+			if (folioq_is_marked(p, slot)) {
+				trace_netfs_folio(folio, netfs_folio_trace_put);
+				if (!folio_batch_add(&fbatch, folio))
+					folio_batch_release(&fbatch);
+			}
+		}
+
+		netfs_folioq_free(p, netfs_trace_folioq_clear);
+	}
+
+	folio_batch_release(&fbatch);
+}
diff --git a/fs/netfs/write_collect.c b/fs/netfs/write_collect.c
index 1d438be2e1b4..f3fab41ca3e5 100644
--- a/fs/netfs/write_collect.c
+++ b/fs/netfs/write_collect.c
@@ -83,9 +83,9 @@  int netfs_folio_written_back(struct folio *folio)
 static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
 					  unsigned int *notes)
 {
-	struct folio_queue *folioq = wreq->buffer;
+	struct folio_queue *folioq = wreq->buffer.tail;
 	unsigned long long collected_to = wreq->collected_to;
-	unsigned int slot = wreq->buffer_head_slot;
+	unsigned int slot = wreq->buffer.first_tail_slot;
 
 	if (wreq->origin == NETFS_PGPRIV2_COPY_TO_CACHE) {
 		if (netfs_pgpriv2_unlock_copied_folios(wreq))
@@ -94,7 +94,9 @@  static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
 	}
 
 	if (slot >= folioq_nr_slots(folioq)) {
-		folioq = netfs_delete_buffer_head(wreq);
+		folioq = rolling_buffer_delete_spent(&wreq->buffer);
+		if (!folioq)
+			return;
 		slot = 0;
 	}
 
@@ -134,9 +136,9 @@  static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
 		folioq_clear(folioq, slot);
 		slot++;
 		if (slot >= folioq_nr_slots(folioq)) {
-			if (READ_ONCE(wreq->buffer_tail) == folioq)
-				break;
-			folioq = netfs_delete_buffer_head(wreq);
+			folioq = rolling_buffer_delete_spent(&wreq->buffer);
+			if (!folioq)
+				goto done;
 			slot = 0;
 		}
 
@@ -144,8 +146,9 @@  static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
 			break;
 	}
 
-	wreq->buffer = folioq;
-	wreq->buffer_head_slot = slot;
+	wreq->buffer.tail = folioq;
+done:
+	wreq->buffer.first_tail_slot = slot;
 }
 
 /*
diff --git a/fs/netfs/write_issue.c b/fs/netfs/write_issue.c
index 9b6c0dda9751..993cc6def38e 100644
--- a/fs/netfs/write_issue.c
+++ b/fs/netfs/write_issue.c
@@ -107,6 +107,8 @@  struct netfs_io_request *netfs_create_write_req(struct address_space *mapping,
 	ictx = netfs_inode(wreq->inode);
 	if (is_buffered && netfs_is_cache_enabled(ictx))
 		fscache_begin_write_operation(&wreq->cache_resources, netfs_i_cookie(ictx));
+	if (rolling_buffer_init(&wreq->buffer, wreq->debug_id, ITER_SOURCE) < 0)
+		goto nomem;
 
 	wreq->cleaned_to = wreq->start;
 
@@ -129,6 +131,10 @@  struct netfs_io_request *netfs_create_write_req(struct address_space *mapping,
 	}
 
 	return wreq;
+nomem:
+	wreq->error = -ENOMEM;
+	netfs_put_request(wreq, false, netfs_rreq_trace_put_failed);
+	return ERR_PTR(-ENOMEM);
 }
 
 /**
@@ -153,16 +159,15 @@  static void netfs_prepare_write(struct netfs_io_request *wreq,
 				loff_t start)
 {
 	struct netfs_io_subrequest *subreq;
-	struct iov_iter *wreq_iter = &wreq->io_iter;
+	struct iov_iter *wreq_iter = &wreq->buffer.iter;
 
 	/* Make sure we don't point the iterator at a used-up folio_queue
 	 * struct being used as a placeholder to prevent the queue from
 	 * collapsing.  In such a case, extend the queue.
 	 */
 	if (iov_iter_is_folioq(wreq_iter) &&
-	    wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq)) {
-		netfs_buffer_make_space(wreq, netfs_trace_folioq_prep_write);
-	}
+	    wreq_iter->folioq_slot >= folioq_nr_slots(wreq_iter->folioq))
+		rolling_buffer_make_space(&wreq->buffer);
 
 	subreq = netfs_alloc_subrequest(wreq);
 	subreq->source		= stream->source;
@@ -325,6 +330,9 @@  static int netfs_write_folio(struct netfs_io_request *wreq,
 
 	_enter("");
 
+	if (rolling_buffer_make_space(&wreq->buffer) < 0)
+		return -ENOMEM;
+
 	/* netfs_perform_write() may shift i_size around the page or from out
 	 * of the page to beyond it, but cannot move i_size into or through the
 	 * page since we have it locked.
@@ -429,7 +437,7 @@  static int netfs_write_folio(struct netfs_io_request *wreq,
 	}
 
 	/* Attach the folio to the rolling buffer. */
-	netfs_buffer_append_folio(wreq, folio, false);
+	rolling_buffer_append(&wreq->buffer, folio, 0);
 
 	/* Move the submission point forward to allow for write-streaming data
 	 * not starting at the front of the page.  We don't do write-streaming
@@ -476,7 +484,7 @@  static int netfs_write_folio(struct netfs_io_request *wreq,
 
 		/* Advance the iterator(s). */
 		if (stream->submit_off > iter_off) {
-			iov_iter_advance(&wreq->io_iter, stream->submit_off - iter_off);
+			rolling_buffer_advance(&wreq->buffer, stream->submit_off - iter_off);
 			iter_off = stream->submit_off;
 		}
 
@@ -494,7 +502,7 @@  static int netfs_write_folio(struct netfs_io_request *wreq,
 	}
 
 	if (fsize > iter_off)
-		iov_iter_advance(&wreq->io_iter, fsize - iter_off);
+		rolling_buffer_advance(&wreq->buffer, fsize - iter_off);
 	atomic64_set(&wreq->issued_to, fpos + fsize);
 
 	if (!debug)
@@ -633,7 +641,7 @@  int netfs_advance_writethrough(struct netfs_io_request *wreq, struct writeback_c
 			       struct folio **writethrough_cache)
 {
 	_enter("R=%x ic=%zu ws=%u cp=%zu tp=%u",
-	       wreq->debug_id, wreq->iter.count, wreq->wsize, copied, to_page_end);
+	       wreq->debug_id, wreq->buffer.iter.count, wreq->wsize, copied, to_page_end);
 
 	if (!*writethrough_cache) {
 		if (folio_test_dirty(folio))
@@ -708,7 +716,7 @@  int netfs_unbuffered_write(struct netfs_io_request *wreq, bool may_wait, size_t
 		part = netfs_advance_write(wreq, upload, start, len, false);
 		start += part;
 		len -= part;
-		iov_iter_advance(&wreq->io_iter, part);
+		rolling_buffer_advance(&wreq->buffer, part);
 		if (test_bit(NETFS_RREQ_PAUSE, &wreq->flags)) {
 			trace_netfs_rreq(wreq, netfs_rreq_trace_wait_pause);
 			wait_on_bit(&wreq->flags, NETFS_RREQ_PAUSE, TASK_UNINTERRUPTIBLE);
diff --git a/include/linux/netfs.h b/include/linux/netfs.h
index a30863e205de..0d4ed1229024 100644
--- a/include/linux/netfs.h
+++ b/include/linux/netfs.h
@@ -18,6 +18,7 @@ 
 #include <linux/fs.h>
 #include <linux/pagemap.h>
 #include <linux/uio.h>
+#include <linux/rolling_buffer.h>
 
 enum netfs_sreq_ref_trace;
 typedef struct mempool_s mempool_t;
@@ -238,10 +239,9 @@  struct netfs_io_request {
 	struct netfs_io_stream	io_streams[2];	/* Streams of parallel I/O operations */
 #define NR_IO_STREAMS 2 //wreq->nr_io_streams
 	struct netfs_group	*group;		/* Writeback group being written back */
-	struct folio_queue	*buffer;	/* Head of I/O buffer */
-	struct folio_queue	*buffer_tail;	/* Tail of I/O buffer */
-	struct iov_iter		iter;		/* Unencrypted-side iterator */
-	struct iov_iter		io_iter;	/* I/O (Encrypted-side) iterator */
+	struct rolling_buffer	buffer;		/* Unencrypted buffer */
+#define NETFS_ROLLBUF_PUT_MARK		ROLLBUF_MARK_1
+#define NETFS_ROLLBUF_PAGECACHE_MARK	ROLLBUF_MARK_2
 	void			*netfs_priv;	/* Private data for the netfs */
 	void			*netfs_priv2;	/* Private data for the netfs */
 	struct bio_vec		*direct_bv;	/* DIO buffer list (when handling iovec-iter) */
@@ -259,8 +259,6 @@  struct netfs_io_request {
 	long			error;		/* 0 or error that occurred */
 	enum netfs_io_origin	origin;		/* Origin of the request */
 	bool			direct_bv_unpin; /* T if direct_bv[] must be unpinned */
-	u8			buffer_head_slot; /* First slot in ->buffer */
-	u8			buffer_tail_slot; /* Next slot in ->buffer_tail */
 	unsigned long long	i_size;		/* Size of the file */
 	unsigned long long	start;		/* Start position */
 	atomic64_t		issued_to;	/* Write issuer folio cursor */
diff --git a/include/linux/rolling_buffer.h b/include/linux/rolling_buffer.h
new file mode 100644
index 000000000000..ac15b1ffdd83
--- /dev/null
+++ b/include/linux/rolling_buffer.h
@@ -0,0 +1,61 @@ 
+/* SPDX-License-Identifier: GPL-2.0-or-later */
+/* Rolling buffer of folios
+ *
+ * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ */
+
+#ifndef _ROLLING_BUFFER_H
+#define _ROLLING_BUFFER_H
+
+#include <linux/folio_queue.h>
+#include <linux/uio.h>
+
+/*
+ * Rolling buffer.  Whilst the buffer is live and in use, folios and folio
+ * queue segments can be added to one end by one thread and removed from the
+ * other end by another thread.  The buffer isn't allowed to be empty; it must
+ * always have at least one folio_queue in it so that neither side has to
+ * modify both queue pointers.
+ *
+ * The iterator in the buffer is extended as buffers are inserted.  It can be
+ * snapshotted to use a segment of the buffer.
+ */
+struct rolling_buffer {
+	struct folio_queue	*head;		/* Producer's insertion point */
+	struct folio_queue	*tail;		/* Consumer's removal point */
+	struct iov_iter		iter;		/* Iterator tracking what's left in the buffer */
+	u8			next_head_slot;	/* Next slot in ->head */
+	u8			first_tail_slot; /* First slot in ->tail */
+};
+
+/*
+ * Snapshot of a rolling buffer.
+ */
+struct rolling_buffer_snapshot {
+	struct folio_queue	*curr_folioq;	/* Queue segment in which current folio resides */
+	unsigned char		curr_slot;	/* Folio currently being read */
+	unsigned char		curr_order;	/* Order of folio */
+};
+
+/* Marks to store per-folio in the internal folio_queue structs. */
+#define ROLLBUF_MARK_1	BIT(0)
+#define ROLLBUF_MARK_2	BIT(1)
+
+int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
+			unsigned int direction);
+int rolling_buffer_make_space(struct rolling_buffer *roll);
+ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
+				    struct readahead_control *ractl,
+				    struct folio_batch *put_batch);
+ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
+			      unsigned int flags);
+struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll);
+void rolling_buffer_clear(struct rolling_buffer *roll);
+
+static inline void rolling_buffer_advance(struct rolling_buffer *roll, size_t amount)
+{
+	iov_iter_advance(&roll->iter, amount);
+}
+
+#endif /* _ROLLING_BUFFER_H */
diff --git a/include/trace/events/netfs.h b/include/trace/events/netfs.h
index c48dcbf74081..a0f5b13aab86 100644
--- a/include/trace/events/netfs.h
+++ b/include/trace/events/netfs.h
@@ -198,7 +198,9 @@ 
 	EM(netfs_trace_folioq_alloc_read_sing,	"alloc-r-sing")	\
 	EM(netfs_trace_folioq_clear,		"clear")	\
 	EM(netfs_trace_folioq_delete,		"delete")	\
+	EM(netfs_trace_folioq_make_space,	"make-space")	\
 	EM(netfs_trace_folioq_prep_write,	"prep-wr")	\
+	EM(netfs_trace_folioq_rollbuf_init,	"roll-init")	\
 	E_(netfs_trace_folioq_read_progress,	"r-progress")
 
 #ifndef __NETFS_DECLARE_TRACE_ENUMS_ONCE_ONLY