diff mbox

[5/6] iomap: implement direct I/O

Message ID 1477408098-10153-6-git-send-email-hch@lst.de (mailing list archive)
State Superseded, archived
Headers show

Commit Message

Christoph Hellwig Oct. 25, 2016, 3:08 p.m. UTC
This adds a full fledget direct I/O implementation using the iomap
interface. Full fledged in this case means all features are supported:
AIO, vectored I/O, any iov_iter type including kernel pointers, bvecs
and pipes, support for hole filling and async apending writes.  It does
not mean supporting all the warts of the old generic code.  We expect
i_rwsem to be held over the duration of the call, and we expect to
maintain i_dio_count ourselves, and we pass on any kinds of mapping
to the file system for now.

The algorithm used is very simple: We use iomap_apply to iterate over
the range of the I/O, and then we use the new bio_iov_iter_get_pages
helper to lock down the user range for the size of the extent.
bio_iov_iter_get_pages can currently lock down twice as many pages as
the old direct I/O code did, which means that we will have a better
batch factor for everything but overwrites of badly fragmented files.

Signed-off-by: Christoph Hellwig <hch@lst.de>
---
 fs/iomap.c            | 358 ++++++++++++++++++++++++++++++++++++++++++++++++++
 include/linux/iomap.h |   8 ++
 2 files changed, 366 insertions(+)

Comments

Kent Overstreet Oct. 25, 2016, 7:51 p.m. UTC | #1
On Tue, Oct 25, 2016 at 05:08:17PM +0200, Christoph Hellwig wrote:
> This adds a full fledget direct I/O implementation using the iomap
> interface. Full fledged in this case means all features are supported:
> AIO, vectored I/O, any iov_iter type including kernel pointers, bvecs
> and pipes, support for hole filling and async apending writes.  It does
> not mean supporting all the warts of the old generic code.  We expect
> i_rwsem to be held over the duration of the call, and we expect to
> maintain i_dio_count ourselves, and we pass on any kinds of mapping
> to the file system for now.
> 
> The algorithm used is very simple: We use iomap_apply to iterate over
> the range of the I/O, and then we use the new bio_iov_iter_get_pages
> helper to lock down the user range for the size of the extent.
> bio_iov_iter_get_pages can currently lock down twice as many pages as
> the old direct I/O code did, which means that we will have a better
> batch factor for everything but overwrites of badly fragmented files.

So - you're hitting inode locks on each call to iomap_begin()/iomap_end()?  :/

But - (I'm not too familiar with the XFS code) - it looks like you're also doing
a full extents btree traversal on each iomap_begin() call too, behind
xfs_bmapi_read()?

I guess that's probably how it worked before though, so ah well - if it was a
performance issue worth caring about you'd know, and like you said it only
matters for fragmented files - or, wouldn't alternating written/unwritten
extents trigger this? That does happen.

Anyways... more relevant comments...

Are you returning the right thing on partial reads/writes? the existing dio code
squashes -EFAULT (but not on other errors, even when some IO was successfully
done, e.g. -ENOMEM from gup(), which doesn't seem right to me...)

One thing you're not doing that the existing dio code does is limit the number
of outstanding pinned pages. I don't think it needs to be, but it does mean
you'll return an error from gup() if someone issues a huge IO, too big to pin
all the pages at once (i'm not sure why they would do that... copying a mmap'd
file? actually, running under severe memory pressure probably makes this a real
issue) where it would've worked with the existing dio code - and userspace could
just continue after the short read/write except a) we all know how much
userspace code won't, and b) that means you've dropped and retaken locks, and
it's no longer atomic, so it is a change in semantics.

So I think it would be a good idea to handle any memory allocation failures by
waiting for outstanding IOs to complete and then continuing, and only bailing
out if there aren't any IOs outstanding (and that still gets rid of the stupid
hard limit on the number of pinned pages in the existing dio code).

Your dio refcounting - you have the submitting thread unconditionally holding
its own ref, and dropping it in iomap_dio_rw() after submitting all the IOs:
this is definitely the right way to do it for the sake of sanity, but it's going
to be a performance hit - this is something I've been bit by recently. The issue
is that you've already gone down the rest of the IO stack in either submit_bio()
or blk_finish_plug(), so the ref is no longer cache hot and that one atomic is
_significantly_ more expensive than the rest.

The way you've got the code setup it looks like it wouldn't be that painful to
get rid of that final atomic_dec_and_test() when it's not needed (async kiocbs),
but I also wouldn't see anything wrong with keeping things simple until after
the code is in and leaving that optimization for later.

On the whole though it looks pretty clean to me, I can't find anything that
looks wrong.
--
To unsubscribe from this list: send the line "unsubscribe linux-xfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig Oct. 26, 2016, 7:44 a.m. UTC | #2
On Tue, Oct 25, 2016 at 09:13:29AM -0800, Kent Overstreet wrote:
> Also - what are you doing about the race between shooting down the range in the
> pagecache and dirty pages being readded? The existing direct IO code falls back
> to buffered IO for that, but your code doesn't appear to - I seem to recall that
> XFS has its own locking for this, are you just relying on that for now?  It'd be
> really nice to get some generic locking for this, anything that relies on
> pagecache invalidation is sketchy as hell in other filesystems.

Yes, XFS always had a shared/exclusive lock for I/O operations,
which is taken exclusive for buffered writes and those corner cases
of direct writes that needs exclusÑ–on (e.g. sub-fs block size I/O).

This prevents new dirty pages from being added while direct I/O is
in progress.  There is nothing to prevent direct reads, though - that's
why both the old common code, the old XFS code and this new code do
a second invalidation after the write is done.

Now that the VFS i_mutex has been replaced with i_rwsem we can apply
this scheme to common code as well by taking i_rwsem shared for
direct I/O reads.
--
To unsubscribe from this list: send the line "unsubscribe linux-xfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig Oct. 26, 2016, 7:57 a.m. UTC | #3
On Tue, Oct 25, 2016 at 11:51:53AM -0800, Kent Overstreet wrote:
> So - you're hitting inode locks on each call to iomap_begin()/iomap_end()?  :/

Depends on your defintion of inode locks.  In XFS we have three inode
locks:

 (1) the IOLOCK, which this patch series actually replaces entirely by the
     VFS i_rwsem.  This one just serializes I/O to a file.  It is taken
     exactly once for each read or write or truncate or similar operation.
 (2) the MMAPLOCK, which is taken in page faults to synchronize against
     truncate.  Note taken at all for read/write
 (3) the ILOCK, which serializes access to the extent list for a file as
     well as a few minor bits not relevant here.  This one is taken in
     each iomap_begin call at the moment, although I have a crude prototype
     that allows lockless lookup in the in-memory extent list.

> But - (I'm not too familiar with the XFS code) - it looks like you're also doing
> a full extents btree traversal on each iomap_begin() call too, behind
> xfs_bmapi_read()?

xfs_bmapi_read does a "full" extent lookup in the in-memory extent list,
yes.  It's not actually a btree but a linear list with indirection
arrays at the moment.  A somewhat messy structure that hopefully won't
be around for too long.

> Are you returning the right thing on partial reads/writes? the existing dio code
> squashes -EFAULT (but not on other errors, even when some IO was successfully
> done, e.g. -ENOMEM from gup(), which doesn't seem right to me...)

Both the old and the new code only support partial reads when hitting
EOF - everything else is getting turned into a negative error.

> One thing you're not doing that the existing dio code does is limit the number
> of outstanding pinned pages. I don't think it needs to be, but it does mean
> you'll return an error from gup() if someone issues a huge IO, too big to pin
> all the pages at once

Where would that error come from?  It's not like get_user_pages accounts
for the number of pinned pages in any way.  I also don't see the old
code to care for the pinned pages anywhere, it just has it's little
64 page array that it wants to reuse to not have unbounded kernel
allocation for large I/O.  For the new code the bio mempool takes care
of that throtteling.

> So I think it would be a good idea to handle any memory allocation failures by
> waiting for outstanding IOs to complete and then continuing, and only bailing
> out if there aren't any IOs outstanding (and that still gets rid of the stupid
> hard limit on the number of pinned pages in the existing dio code).

We have exactly two memory allocation in the iomap part of the code (the
fs might have more):  The initial dio structure, and then the bios.  For
a dio allocation failure we just return -ENOMEM, and bio allocation don't
fail, they just wait for other bios to complete, so I get this behavior
for free.

> Your dio refcounting - you have the submitting thread unconditionally holding
> its own ref, and dropping it in iomap_dio_rw() after submitting all the IOs:
> this is definitely the right way to do it for the sake of sanity, but it's going
> to be a performance hit - this is something I've been bit by recently. The issue
> is that you've already gone down the rest of the IO stack in either submit_bio()
> or blk_finish_plug(), so the ref is no longer cache hot and that one atomic is
> _significantly_ more expensive than the rest.
> 
> The way you've got the code setup it looks like it wouldn't be that painful to
> get rid of that final atomic_dec_and_test() when it's not needed (async kiocbs),
> but I also wouldn't see anything wrong with keeping things simple until after
> the code is in and leaving that optimization for later.

Yes, I don't want to over-optimize things - there still is a lot
easier fish to fry for the fs direct I/O case.  But I also have a cut
down variant of this code just for block devices that has optimized
every little bit we can - with that we get 4 microsecond latency from
a userspace program to an (DRAM based) NVMe device.  The code is here:

http://git.infradead.org/users/hch/block.git/commitdiff/2491eb79a983f7f6841189ad179c714a93316603
--
To unsubscribe from this list: send the line "unsubscribe linux-xfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Bob Peterson Oct. 26, 2016, 1:53 p.m. UTC | #4
----- Original Message -----
| This adds a full fledget direct I/O implementation using the iomap
| interface. Full fledged in this case means all features are supported:
| AIO, vectored I/O, any iov_iter type including kernel pointers, bvecs
| and pipes, support for hole filling and async apending writes.  It does
| not mean supporting all the warts of the old generic code.  We expect
| i_rwsem to be held over the duration of the call, and we expect to
| maintain i_dio_count ourselves, and we pass on any kinds of mapping
| to the file system for now.
| 
| The algorithm used is very simple: We use iomap_apply to iterate over
| the range of the I/O, and then we use the new bio_iov_iter_get_pages
| helper to lock down the user range for the size of the extent.
| bio_iov_iter_get_pages can currently lock down twice as many pages as
| the old direct I/O code did, which means that we will have a better
| batch factor for everything but overwrites of badly fragmented files.
| 
| Signed-off-by: Christoph Hellwig <hch@lst.de>
| ---
(snip)
| +static blk_qc_t
| +iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
| +		unsigned len)
| +{
| +	struct page *page = ZERO_PAGE(0);
| +	struct bio *bio;
| +
| +	bio = bio_alloc(GFP_KERNEL, 1);

It's unlikely, but bio_alloc can return NULL; shouldn't the code be checking for that?

| +	bio->bi_bdev = iomap->bdev;
| +	bio->bi_iter.bi_sector =
| +		iomap->blkno + ((pos - iomap->offset) >> 9);
| +	bio->bi_private = dio;
| +	bio->bi_end_io = iomap_dio_bio_end_io;
| +
| +	get_page(page);
| +	if (bio_add_page(bio, page, len, 0) != len)
| +		BUG();
| +	bio_set_op_attrs(bio, REQ_OP_WRITE, WRITE_ODIRECT);
| +
| +	atomic_inc(&dio->ref);
| +	return submit_bio(bio);
| +}
| +
| +static loff_t
| +iomap_dio_actor(struct inode *inode, loff_t pos, loff_t length,
| +		void *data, struct iomap *iomap)
| +{
| +	struct iomap_dio *dio = data;
| +	unsigned blkbits = blksize_bits(bdev_logical_block_size(iomap->bdev));
| +	unsigned fs_block_size = (1 << inode->i_blkbits), pad;
| +	struct iov_iter iter = *dio->submit.iter;
| +	struct bio *bio;
| +	bool may_zero = false;
| +	int nr_pages, ret;
| +
| +	if ((pos | length | iov_iter_alignment(&iter)) & ((1 << blkbits) - 1))
| +		return -EINVAL;
| +
| +	switch (iomap->type) {
| +	case IOMAP_HOLE:
| +		/*
| +		 * We return -ENOTBLK to fall back to buffered I/O for file
| +		 * systems that can't fill holes from direct writes.
| +		 */
| +		if (dio->flags & IOMAP_DIO_WRITE)
| +			return -ENOTBLK;
| +		/*FALLTHRU*/
| +	case IOMAP_UNWRITTEN:
| +		if (!(dio->flags & IOMAP_DIO_WRITE)) {
| +			iov_iter_zero(length, dio->submit.iter);
| +			dio->size += length;
| +			return length;
| +		}
| +		dio->flags |= IOMAP_DIO_UNWRITTEN;
| +		may_zero = true;
| +		break;
| +	case IOMAP_MAPPED:
| +		if (iomap->flags & IOMAP_F_SHARED)
| +			dio->flags |= IOMAP_DIO_COW;
| +		if (iomap->flags & IOMAP_F_NEW)
| +			may_zero = true;
| +		break;
| +	default:
| +		WARN_ON_ONCE(1);
| +		return -EIO;
| +	}
| +
| +	iov_iter_truncate(&iter, length);
| +	nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
| +	if (nr_pages <= 0)
| +		return nr_pages;
| +
| +	if (may_zero) {
| +		pad = pos & (fs_block_size - 1);
| +		if (pad)
| +			iomap_dio_zero(dio, iomap, pos - pad, pad);
| +	}
| +
| +	do {
| +		if (dio->error)
| +			return 0;
| +
| +		bio = bio_alloc(GFP_KERNEL, nr_pages);

Same here. Also: the code that follows is nearly identical; do you want to make
it a macro or inline function or something?

Regards,

Bob Peterson
Red Hat File Systems
--
To unsubscribe from this list: send the line "unsubscribe linux-xfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Christoph Hellwig Oct. 26, 2016, 2:02 p.m. UTC | #5
On Wed, Oct 26, 2016 at 09:53:43AM -0400, Bob Peterson wrote:
> It's unlikely, but bio_alloc can return NULL; shouldn't the code be
> checking for that?

No, a sleeping bio_alloc can not return NULL.  If it did our I/O
code would break down badly - take a look at the implementation,
the bio_alloc_bioset documentation even explains this in detail.

> | +		if (dio->error)
> | +			return 0;
> | +
> | +		bio = bio_alloc(GFP_KERNEL, nr_pages);
> 
> Same here. Also: the code that follows is nearly identical; do you want to make
> it a macro or inline function or something?

I'll take a look, but having another helper needed to follow for a
trivial struct initialization doesn't seem all that useful.
--
To unsubscribe from this list: send the line "unsubscribe linux-xfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/fs/iomap.c b/fs/iomap.c
index a8ee8c3..502be3d 100644
--- a/fs/iomap.c
+++ b/fs/iomap.c
@@ -24,6 +24,7 @@ 
 #include <linux/uio.h>
 #include <linux/backing-dev.h>
 #include <linux/buffer_head.h>
+#include <linux/task_io_accounting_ops.h>
 #include <linux/dax.h>
 #include "internal.h"
 
@@ -583,3 +584,360 @@  int iomap_fiemap(struct inode *inode, struct fiemap_extent_info *fi,
 	return 0;
 }
 EXPORT_SYMBOL_GPL(iomap_fiemap);
+
+/*
+ * Private flags for iomap_dio, must not overlap with the public ones in
+ * iomap.h:
+ */
+#define IOMAP_DIO_WRITE		(1 << 30)
+#define IOMAP_DIO_DIRTY	(1 << 31)
+
+struct iomap_dio {
+	struct kiocb		*iocb;
+	iomap_dio_end_io_t	*end_io;
+	loff_t			i_size;
+	loff_t			size;
+	atomic_t		ref;
+	unsigned		flags;
+	int			error;
+
+	union {
+		/* used during submission and for synchronous completion: */
+		struct {
+			struct iov_iter		*iter;
+			struct task_struct	*waiter;
+			struct request_queue	*last_queue;
+			blk_qc_t		cookie;
+		} submit;
+
+		/* used for aio completion: */
+		struct {
+			struct work_struct	work;
+		} aio;
+	};
+};
+
+static ssize_t iomap_dio_complete(struct iomap_dio *dio)
+{
+	struct kiocb *iocb = dio->iocb;
+	ssize_t ret;
+
+	if (dio->end_io) {
+		ret = dio->end_io(iocb,
+				dio->error ? dio->error : dio->size,
+				dio->flags);
+	} else {
+		ret = dio->error;
+	}
+
+	if (likely(!ret)) {
+		ret = dio->size;
+		/* check for short read */
+		if (iocb->ki_pos + ret > dio->i_size &&
+		    !(dio->flags & IOMAP_DIO_WRITE))
+			ret = dio->i_size - iocb->ki_pos;
+		iocb->ki_pos += ret;
+	}
+
+	inode_dio_end(file_inode(iocb->ki_filp));
+	kfree(dio);
+
+	return ret;
+}
+
+static void iomap_dio_complete_work(struct work_struct *work)
+{
+	struct iomap_dio *dio = container_of(work, struct iomap_dio, aio.work);
+	struct kiocb *iocb = dio->iocb;
+	bool is_write = (dio->flags & IOMAP_DIO_WRITE);
+	ssize_t ret;
+
+	ret = iomap_dio_complete(dio);
+	if (is_write && ret > 0)
+		ret = generic_write_sync(iocb, ret);
+	iocb->ki_complete(iocb, ret, 0);
+}
+
+static void iomap_dio_bio_end_io(struct bio *bio)
+{
+	struct iomap_dio *dio = bio->bi_private;
+	bool should_dirty = (dio->flags & IOMAP_DIO_DIRTY);
+
+	if (bio->bi_error)
+		cmpxchg(&dio->error, 0, bio->bi_error);
+
+	if (atomic_dec_and_test(&dio->ref)) {
+		if (is_sync_kiocb(dio->iocb)) {
+			struct task_struct *waiter = dio->submit.waiter;
+
+			WRITE_ONCE(dio->submit.waiter, NULL);
+			wake_up_process(waiter);
+		} else if (dio->flags & IOMAP_DIO_WRITE) {
+			struct inode *inode = file_inode(dio->iocb->ki_filp);
+
+			INIT_WORK(&dio->aio.work, iomap_dio_complete_work);
+			queue_work(inode->i_sb->s_dio_done_wq, &dio->aio.work);
+		} else {
+			iomap_dio_complete_work(&dio->aio.work);
+		}
+	}
+
+	if (should_dirty) {
+		bio_check_pages_dirty(bio);
+	} else {
+		struct bio_vec *bvec;
+		int i;
+
+		bio_for_each_segment_all(bvec, bio, i)
+			put_page(bvec->bv_page);
+		bio_put(bio);
+	}
+}
+
+static blk_qc_t
+iomap_dio_zero(struct iomap_dio *dio, struct iomap *iomap, loff_t pos,
+		unsigned len)
+{
+	struct page *page = ZERO_PAGE(0);
+	struct bio *bio;
+
+	bio = bio_alloc(GFP_KERNEL, 1);
+	bio->bi_bdev = iomap->bdev;
+	bio->bi_iter.bi_sector =
+		iomap->blkno + ((pos - iomap->offset) >> 9);
+	bio->bi_private = dio;
+	bio->bi_end_io = iomap_dio_bio_end_io;
+
+	get_page(page);
+	if (bio_add_page(bio, page, len, 0) != len)
+		BUG();
+	bio_set_op_attrs(bio, REQ_OP_WRITE, WRITE_ODIRECT);
+
+	atomic_inc(&dio->ref);
+	return submit_bio(bio);
+}
+
+static loff_t
+iomap_dio_actor(struct inode *inode, loff_t pos, loff_t length,
+		void *data, struct iomap *iomap)
+{
+	struct iomap_dio *dio = data;
+	unsigned blkbits = blksize_bits(bdev_logical_block_size(iomap->bdev));
+	unsigned fs_block_size = (1 << inode->i_blkbits), pad;
+	struct iov_iter iter = *dio->submit.iter;
+	struct bio *bio;
+	bool may_zero = false;
+	int nr_pages, ret;
+
+	if ((pos | length | iov_iter_alignment(&iter)) & ((1 << blkbits) - 1))
+		return -EINVAL;
+
+	switch (iomap->type) {
+	case IOMAP_HOLE:
+		/*
+		 * We return -ENOTBLK to fall back to buffered I/O for file
+		 * systems that can't fill holes from direct writes.
+		 */
+		if (dio->flags & IOMAP_DIO_WRITE)
+			return -ENOTBLK;
+		/*FALLTHRU*/
+	case IOMAP_UNWRITTEN:
+		if (!(dio->flags & IOMAP_DIO_WRITE)) {
+			iov_iter_zero(length, dio->submit.iter);
+			dio->size += length;
+			return length;
+		}
+		dio->flags |= IOMAP_DIO_UNWRITTEN;
+		may_zero = true;
+		break;
+	case IOMAP_MAPPED:
+		if (iomap->flags & IOMAP_F_SHARED)
+			dio->flags |= IOMAP_DIO_COW;
+		if (iomap->flags & IOMAP_F_NEW)
+			may_zero = true;
+		break;
+	default:
+		WARN_ON_ONCE(1);
+		return -EIO;
+	}
+
+	iov_iter_truncate(&iter, length);
+	nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
+	if (nr_pages <= 0)
+		return nr_pages;
+
+	if (may_zero) {
+		pad = pos & (fs_block_size - 1);
+		if (pad)
+			iomap_dio_zero(dio, iomap, pos - pad, pad);
+	}
+
+	do {
+		if (dio->error)
+			return 0;
+
+		bio = bio_alloc(GFP_KERNEL, nr_pages);
+		bio->bi_bdev = iomap->bdev;
+		bio->bi_iter.bi_sector =
+			iomap->blkno + ((pos - iomap->offset) >> 9);
+		bio->bi_private = dio;
+		bio->bi_end_io = iomap_dio_bio_end_io;
+
+		ret = bio_iov_iter_get_pages(bio, &iter);
+		if (unlikely(ret)) {
+			bio_put(bio);
+			return ret;
+		}
+
+		if (dio->flags & IOMAP_DIO_WRITE) {
+			bio_set_op_attrs(bio, REQ_OP_WRITE, WRITE_ODIRECT);
+			task_io_account_write(bio->bi_iter.bi_size);
+		} else {
+			bio_set_op_attrs(bio, REQ_OP_READ, 0);
+			if (dio->flags & IOMAP_DIO_DIRTY)
+				bio_set_pages_dirty(bio);
+		}
+
+		dio->size += bio->bi_iter.bi_size;
+		pos += bio->bi_iter.bi_size;
+
+		nr_pages = iov_iter_npages(&iter, BIO_MAX_PAGES);
+
+		atomic_inc(&dio->ref);
+
+		dio->submit.last_queue = bdev_get_queue(iomap->bdev);
+		dio->submit.cookie = submit_bio(bio);
+	} while (nr_pages);
+
+	if (may_zero) {
+		pad = pos & (fs_block_size - 1);
+		if (pad)
+			iomap_dio_zero(dio, iomap, pos, fs_block_size - pad);
+	}
+
+	iov_iter_advance(dio->submit.iter, length);
+	return length;
+}
+
+ssize_t
+iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter, struct iomap_ops *ops,
+		iomap_dio_end_io_t end_io)
+{
+	struct address_space *mapping = iocb->ki_filp->f_mapping;
+	struct inode *inode = file_inode(iocb->ki_filp);
+	size_t count = iov_iter_count(iter);
+	loff_t pos = iocb->ki_pos, end = iocb->ki_pos + count - 1, ret = 0;
+	unsigned int flags = IOMAP_DIRECT;
+	struct blk_plug plug;
+	struct iomap_dio *dio;
+
+	lockdep_assert_held(&inode->i_rwsem);
+
+	if (!count)
+		return 0;
+
+	dio = kmalloc(sizeof(*dio), GFP_KERNEL);
+	if (!dio)
+		return -ENOMEM;
+
+	dio->iocb = iocb;
+	atomic_set(&dio->ref, 1);
+	dio->size = 0;
+	dio->i_size = i_size_read(inode);
+	dio->end_io = end_io;
+	dio->error = 0;
+	dio->flags = 0;
+
+	dio->submit.iter = iter;
+	if (is_sync_kiocb(iocb)) {
+		dio->submit.waiter = current;
+		dio->submit.cookie = BLK_QC_T_NONE;
+		dio->submit.last_queue = NULL;
+	}
+
+	if (iov_iter_rw(iter) == READ) {
+		if (pos >= dio->i_size)
+			goto out_free_dio;
+
+		if (iter->type == ITER_IOVEC)
+			dio->flags |= IOMAP_DIO_DIRTY;
+	} else {
+		dio->flags |= IOMAP_DIO_WRITE;
+		flags |= IOMAP_WRITE;
+	}
+
+	if (mapping->nrpages) {
+		ret = filemap_write_and_wait_range(mapping, iocb->ki_pos, end);
+		if (ret)
+			goto out_free_dio;
+
+		ret = invalidate_inode_pages2_range(mapping,
+				iocb->ki_pos >> PAGE_SHIFT, end >> PAGE_SHIFT);
+		WARN_ON_ONCE(ret);
+		ret = 0;
+	}
+
+	inode_dio_begin(inode);
+
+	blk_start_plug(&plug);
+	do {
+		ret = iomap_apply(inode, pos, count, flags, ops, dio,
+				iomap_dio_actor);
+		if (ret <= 0) {
+			/* magic error code to fall back to buffered I/O */
+			if (ret == -ENOTBLK)
+				ret = 0;
+			break;
+		}
+		pos += ret;
+	} while ((count = iov_iter_count(iter)) > 0);
+	blk_finish_plug(&plug);
+
+	if (ret < 0)
+		cmpxchg(&dio->error, 0, ret);
+
+	if (ret >= 0 && iov_iter_rw(iter) == WRITE && !is_sync_kiocb(iocb) &&
+			!inode->i_sb->s_dio_done_wq) {
+		ret = sb_init_dio_done_wq(inode->i_sb);
+		if (ret < 0)
+			cmpxchg(&dio->error, 0, ret);
+	}
+
+	if (!atomic_dec_and_test(&dio->ref)) {
+		if (!is_sync_kiocb(iocb))
+			return -EIOCBQUEUED;
+
+		for (;;) {
+			set_current_state(TASK_UNINTERRUPTIBLE);
+			if (!READ_ONCE(dio->submit.waiter))
+				break;
+
+			if (!(iocb->ki_flags & IOCB_HIPRI) ||
+			    !dio->submit.last_queue ||
+			    !blk_poll(dio->submit.last_queue,
+					dio->submit.cookie))
+				io_schedule();
+		}
+		__set_current_state(TASK_RUNNING);
+	}
+
+	/*
+	 * Try again to invalidate clean pages which might have been cached by
+	 * non-direct readahead, or faulted in by get_user_pages() if the source
+	 * of the write was an mmap'ed region of the file we're writing.  Either
+	 * one is a pretty crazy thing to do, so we don't support it 100%.  If
+	 * this invalidation fails, tough, the write still worked...
+	 */
+	if (iov_iter_rw(iter) == WRITE && mapping->nrpages) {
+		ret = invalidate_inode_pages2_range(mapping,
+				iocb->ki_pos >> PAGE_SHIFT, end >> PAGE_SHIFT);
+		WARN_ON_ONCE(ret);
+	}
+
+	return iomap_dio_complete(dio);
+
+out_free_dio:
+	kfree(dio);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(iomap_dio_rw);
diff --git a/include/linux/iomap.h b/include/linux/iomap.h
index 7892f55..1b53109 100644
--- a/include/linux/iomap.h
+++ b/include/linux/iomap.h
@@ -49,6 +49,7 @@  struct iomap {
 #define IOMAP_WRITE		(1 << 0) /* writing, must allocate blocks */
 #define IOMAP_ZERO		(1 << 1) /* zeroing operation, may skip holes */
 #define IOMAP_REPORT		(1 << 2) /* report extent status, e.g. FIEMAP */
+#define IOMAP_DIRECT		(1 << 3)
 
 struct iomap_ops {
 	/*
@@ -82,4 +83,11 @@  int iomap_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf,
 int iomap_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
 		loff_t start, loff_t len, struct iomap_ops *ops);
 
+#define IOMAP_DIO_UNWRITTEN	(1 << 0)
+#define IOMAP_DIO_COW		(1 << 1)
+typedef int (iomap_dio_end_io_t)(struct kiocb *iocb, ssize_t ret,
+		unsigned flags);
+ssize_t iomap_dio_rw(struct kiocb *iocb, struct iov_iter *iter,
+		struct iomap_ops *ops, iomap_dio_end_io_t end_io);
+
 #endif /* LINUX_IOMAP_H */