diff mbox series

[16/27] libxfs: add a synchronous IO engine to the buftarg

Message ID 20201015072155.1631135-17-david@fromorbit.com (mailing list archive)
State Deferred, archived
Headers show
Series xfsprogs: xfs_buf unification and AIO | expand

Commit Message

Dave Chinner Oct. 15, 2020, 7:21 a.m. UTC
From: Dave Chinner <dchinner@redhat.com>

Replace the use of the rdwr.c uncached IO routines with a new
buftarg based IO engine. This will currently be synchronous so as to
match the existing functionality it replaces, but will be easily
modified to run AIO in future.

Signed-off-by: Dave Chinner <dchinner@redhat.com>
---
 libxfs/buftarg.c     | 197 ++++++++++++++++++++++++++++++++++++++++---
 libxfs/libxfs_io.h   |   2 +
 libxfs/xfs_buftarg.h |   4 +-
 3 files changed, 192 insertions(+), 11 deletions(-)
diff mbox series

Patch

diff --git a/libxfs/buftarg.c b/libxfs/buftarg.c
index 2a0aad2e0f8c..d98952940ee8 100644
--- a/libxfs/buftarg.c
+++ b/libxfs/buftarg.c
@@ -98,6 +98,182 @@  xfs_buftarg_free(
 	free(btp);
 }
 
+/*
+ * Low level IO routines
+ */
+static void
+xfs_buf_ioend(
+	struct xfs_buf	*bp)
+{
+	bool		read = bp->b_flags & XBF_READ;
+
+//	printf("endio bn %ld l %d/%d, io err %d err %d f 0x%x\n", bp->b_maps[0].bm_bn,
+//			bp->b_maps[0].bm_len, BBTOB(bp->b_length),
+//			bp->b_io_error, bp->b_error, bp->b_flags);
+
+	bp->b_flags &= ~(XBF_READ | XBF_WRITE);
+
+	/*
+	 * Pull in IO completion errors now. We are guaranteed to be running
+	 * single threaded, so we don't need the lock to read b_io_error.
+	 */
+	if (!bp->b_error && bp->b_io_error)
+		xfs_buf_ioerror(bp, bp->b_io_error);
+
+	/* Only validate buffers that were read without errors */
+	if (read && !bp->b_error && bp->b_ops) {
+		ASSERT(!bp->b_iodone);
+		bp->b_ops->verify_read(bp);
+	}
+}
+
+static void
+xfs_buf_complete_io(
+	struct xfs_buf	*bp,
+	int		status)
+{
+
+	/*
+	 * don't overwrite existing errors - otherwise we can lose errors on
+	 * buffers that require multiple bios to complete.
+	 */
+	if (status)
+		cmpxchg(&bp->b_io_error, 0, status);
+
+	if (atomic_dec_and_test(&bp->b_io_remaining) == 1)
+		xfs_buf_ioend(bp);
+}
+
+/*
+ * XXX: this will be replaced by an AIO submission engine in future. In the mean
+ * time, just complete the IO synchronously so all the machinery still works.
+ */
+static int
+submit_io(
+	struct xfs_buf *bp,
+	int		fd,
+	void		*buf,
+	xfs_daddr_t	blkno,
+	int		size,
+	bool		write)
+{
+	int		ret;
+
+	if (!write)
+		ret = pread(fd, buf, size, BBTOB(blkno));
+	else
+		ret = pwrite(fd, buf, size, BBTOB(blkno));
+	if (ret < 0)
+		ret = -errno;
+	else if (ret != size)
+		ret = -EIO;
+	else
+		ret = 0;
+	xfs_buf_complete_io(bp, ret);
+	return ret;
+}
+
+static void
+xfs_buftarg_submit_io_map(
+	struct xfs_buf	*bp,
+	int		map,
+	int		*buf_offset,
+	int		*count)
+{
+	int		size;
+	int		offset;
+	bool		rw = (bp->b_flags & XBF_WRITE);
+	int		error;
+
+	offset = *buf_offset;
+
+	/*
+	 * Limit the IO size to the length of the current vector, and update the
+	 * remaining IO count for the next time around.
+	 */
+	size = min_t(int, BBTOB(bp->b_maps[map].bm_len), *count);
+	*count -= size;
+	*buf_offset += size;
+
+	atomic_inc(&bp->b_io_remaining);
+
+	error = submit_io(bp, bp->b_target->bt_fd, bp->b_addr + offset,
+			  bp->b_maps[map].bm_bn, size, rw);
+	if (error) {
+		/*
+		 * This is guaranteed not to be the last io reference count
+		 * because the caller (xfs_buf_submit) holds a count itself.
+		 */
+		atomic_dec(&bp->b_io_remaining);
+		xfs_buf_ioerror(bp, error);
+	}
+}
+
+void
+xfs_buftarg_submit_io(
+	struct xfs_buf	*bp)
+{
+	int		offset;
+	int		size;
+	int		i;
+
+	/*
+	 * Make sure we capture only current IO errors rather than stale errors
+	 * left over from previous use of the buffer (e.g. failed readahead).
+	 */
+	bp->b_error = 0;
+
+	if (bp->b_flags & XBF_WRITE) {
+		/*
+		 * Run the write verifier callback function if it exists. If
+		 * this function fails it will mark the buffer with an error and
+		 * the IO should not be dispatched.
+		 */
+		if (bp->b_ops) {
+			bp->b_ops->verify_write(bp);
+			if (bp->b_error) {
+				xfs_force_shutdown(bp->b_target->bt_mount,
+						   SHUTDOWN_CORRUPT_INCORE);
+				return;
+			}
+		} else if (bp->b_bn != XFS_BUF_DADDR_NULL) {
+			struct xfs_mount *mp = bp->b_target->bt_mount;
+
+			/*
+			 * non-crc filesystems don't attach verifiers during
+			 * log recovery, so don't warn for such filesystems.
+			 */
+			if (xfs_sb_version_hascrc(&mp->m_sb)) {
+				xfs_warn(mp,
+					"%s: no buf ops on daddr 0x%llx len %d",
+					__func__, bp->b_bn, bp->b_length);
+				xfs_hex_dump(bp->b_addr,
+						XFS_CORRUPTION_DUMP_LEN);
+			}
+		}
+	}
+
+	atomic_set(&bp->b_io_remaining, 1);
+
+	/*
+	 * Walk all the vectors issuing IO on them. Set up the initial offset
+	 * into the buffer and the desired IO size before we start -
+	 * xfs_buf_ioapply_map() will modify them appropriately for each
+	 * subsequent call.
+	 */
+	offset = 0;
+	size = BBTOB(bp->b_length);
+	for (i = 0; i < bp->b_map_count; i++) {
+		xfs_buftarg_submit_io_map(bp, i, &offset, &size);
+		if (bp->b_error)
+			break;
+		if (size <= 0)
+			break;	/* all done */
+	}
+
+	xfs_buf_complete_io(bp, bp->b_error);
+}
+
 /*
  * Allocate an uncached buffer that points at daddr.  The refcount will be 1,
  * and the cache node hash list will be empty to indicate that it's uncached.
@@ -140,20 +316,21 @@  xfs_buf_read_uncached(
 	if (error)
 		return error;
 
-	error = libxfs_readbufr(target, daddr, bp, bblen, flags);
-	if (error)
-		goto release_buf;
+	/* set up the buffer for a read IO */
+	ASSERT(bp->b_map_count == 1);
+	bp->b_maps[0].bm_bn = daddr;
+	bp->b_flags |= XBF_READ;
+	bp->b_ops = ops;
 
-	error = libxfs_readbuf_verify(bp, ops);
-	if (error)
-		goto release_buf;
+	xfs_buftarg_submit_io(bp);
+	if (bp->b_error) {
+		error = bp->b_error;
+		xfs_buf_relse(bp);
+		return error;
+	}
 
 	*bpp = bp;
 	return 0;
-
-release_buf:
-	libxfs_buf_relse(bp);
-	return error;
 }
 
 /*
diff --git a/libxfs/libxfs_io.h b/libxfs/libxfs_io.h
index 7f8fd88f7de8..8408f436e5a5 100644
--- a/libxfs/libxfs_io.h
+++ b/libxfs/libxfs_io.h
@@ -62,6 +62,8 @@  struct xfs_buf {
 	struct xfs_buf_map	*b_maps;
 	struct xfs_buf_map	__b_map;
 	int			b_map_count;
+	int			b_io_remaining;
+	int			b_io_error;
 	struct list_head	b_list;
 };
 
diff --git a/libxfs/xfs_buftarg.h b/libxfs/xfs_buftarg.h
index 5429c96c0547..b6e365c4f5be 100644
--- a/libxfs/xfs_buftarg.h
+++ b/libxfs/xfs_buftarg.h
@@ -60,7 +60,6 @@  int xfs_buftarg_setsize(struct xfs_buftarg *target, unsigned int size);
  * This includes the uncached buffer IO API, as the memory management associated
  * with uncached buffers is tightly tied to the kernel buffer implementation.
  */
-
 void xfs_buf_set_empty(struct xfs_buf *bp, size_t numblks);
 int xfs_buf_associate_memory(struct xfs_buf *bp, void *mem, size_t length);
 
@@ -80,6 +79,9 @@  int xfs_buf_read_uncached(struct xfs_buftarg *target, xfs_daddr_t daddr,
 			  size_t bblen, int flags, struct xfs_buf **bpp,
 			  const struct xfs_buf_ops *ops);
 
+#define XBF_READ	(1 << 0)
+#define XBF_WRITE	(1 << 1)
+
 /*
  * Raw buffer access functions. These exist as temporary bridges for uncached IO
  * that uses direct access to the buffers to submit IO. These will go away with