diff mbox

[2/2] ceph: Implement writev/pwritev for sync operation.

Message ID 201309031652122920661@gmail.com (mailing list archive)
State New, archived
Headers show

Commit Message

majianpeng Sept. 3, 2013, 8:52 a.m. UTC
For writev/pwritev sync-operatoin, ceph only do the first iov.
It don't think other iovs.Now implement this.
I divided the write-sync-operation into two functions.One for
direct-write,other for none-direct-sync-write.This is because for
none-direct-sync-write we can merge iovs to one.But for direct-write,
we can't merge iovs.

Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>

---
 fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 248 insertions(+), 80 deletions(-)

-- 
1.8.1.2

Comments

Yan, Zheng Sept. 4, 2013, 1:17 p.m. UTC | #1
Hi,

Thank you for the patch.

On 09/03/2013 04:52 PM, majianpeng wrote:
> For writev/pwritev sync-operatoin, ceph only do the first iov.
> It don't think other iovs.Now implement this.
> I divided the write-sync-operation into two functions.One for
> direct-write,other for none-direct-sync-write.This is because for
> none-direct-sync-write we can merge iovs to one.But for direct-write,
> we can't merge iovs.
> 
> Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>
> ---
>  fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 248 insertions(+), 80 deletions(-)
> 
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 7d6a3ee..42c97b3 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -533,17 +533,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
>  	}
>  }
>  
> +
>  /*
> - * Synchronous write, straight from __user pointer or user pages (if
> - * O_DIRECT).
> + * Synchronous write, straight from __user pointer or user pages.
>   *
>   * If write spans object boundary, just do multiple writes.  (For a
>   * correct atomic write, we should e.g. take write locks on all
>   * objects, rollback on failure, etc.)
>   */
> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
> -			       size_t left, loff_t pos, loff_t *ppos)
> +static ssize_t
> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
> +		       unsigned long nr_segs, size_t count)
>  {
> +	struct file *file = iocb->ki_filp;
>  	struct inode *inode = file_inode(file);
>  	struct ceph_inode_info *ci = ceph_inode(inode);
>  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> @@ -557,59 +559,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
>  	int written = 0;
>  	int flags;
>  	int check_caps = 0;
> -	int page_align, io_align;
> -	unsigned long buf_align;
> -	int ret;
> +	int page_align;
> +	int ret, i;
>  	struct timespec mtime = CURRENT_TIME;
> -	bool own_pages = false;
> +	loff_t pos = iocb->ki_pos;
>  
>  	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>  		return -EROFS;
>  
> -	dout("sync_write on file %p %lld~%u %s\n", file, pos,
> -	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
> +	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
> +	     (unsigned)count);
>  
> -	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>  	if (ret < 0)
>  		return ret;
>  
>  	ret = invalidate_inode_pages2_range(inode->i_mapping,
>  					    pos >> PAGE_CACHE_SHIFT,
> -					    (pos + left) >> PAGE_CACHE_SHIFT);
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
>  	if (ret < 0)
>  		dout("invalidate_inode_pages2_range returned %d\n", ret);
>  
>  	flags = CEPH_OSD_FLAG_ORDERSNAP |
>  		CEPH_OSD_FLAG_ONDISK |
>  		CEPH_OSD_FLAG_WRITE;
> -	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
> -		flags |= CEPH_OSD_FLAG_ACK;
> -	else
> -		num_ops++;	/* Also include a 'startsync' command. */
> +	num_ops++;	/* Also include a 'startsync' command. */
>  
> -	/*
> -	 * we may need to do multiple writes here if we span an object
> -	 * boundary.  this isn't atomic, unfortunately.  :(
> -	 */
> -more:
> -	io_align = pos & ~PAGE_MASK;
> -	buf_align = (unsigned long)data & ~PAGE_MASK;
> -	len = left;
> +	for (i = 0; i < nr_segs && count; i++) {

POSIX requires that write syscall is atomic. I means we should allocate a single OSD request
for all buffer segments that belong to the same object.

Regards
Yan, Zheng

> +		void __user *data = iov[i].iov_base;
> +		size_t left;
>  
> -	snapc = ci->i_snap_realm->cached_context;
> -	vino = ceph_vino(inode);
> -	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> -				    vino, pos, &len, num_ops,
> -				    CEPH_OSD_OP_WRITE, flags, snapc,
> -				    ci->i_truncate_seq, ci->i_truncate_size,
> -				    false);
> -	if (IS_ERR(req))
> -		return PTR_ERR(req);
> +		left = min(count, iov[i].iov_len);
> +more:
> +		page_align = (unsigned long)data & ~PAGE_MASK;
> +		len = left;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len, num_ops,
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
>  
> -	/* write from beginning of first page, regardless of io alignment */
> -	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
> -	num_pages = calc_pages_for(page_align, len);
> -	if (file->f_flags & O_DIRECT) {
> +		num_pages = calc_pages_for(page_align, len);
>  		pages = ceph_get_direct_page_vector(data, num_pages, false);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
> @@ -621,61 +619,229 @@ more:
>  		 * may block.
>  		 */
>  		truncate_inode_pages_range(inode->i_mapping, pos,
> -					   (pos+len) | (PAGE_CACHE_SIZE-1));
> -	} else {
> +				   (pos+len) | (PAGE_CACHE_SIZE-1));
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> +						false, false);
> +
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +
> +		ceph_put_page_vector(pages, num_pages, false);
> +
> +out:
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +			left -= len;
> +			count -= len;
> +			data += len;
> +			if (left)
> +				goto more;
> +
> +			ret = written;
> +			if (pos > i_size_read(inode))
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);
> +		} else {
> +			if (ret != -EOLDSNAPC && written > 0)
> +				ret = written;
> +			break;
> +		}
> +	}
> +
> +	if (ret > 0)
> +		iocb->ki_pos = pos;
> +	return ret;
> +}
> +
> +
> +/*
> + * Synchronous write, straight from __user pointer or user pages.
> + *
> + * If write spans object boundary, just do multiple writes.  (For a
> + * correct atomic write, we should e.g. take write locks on all
> + * objects, rollback on failure, etc.)
> + */
> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
> +			       unsigned long nr_segs, size_t count)
> +{
> +	struct file *file = iocb->ki_filp;
> +	struct inode *inode = file_inode(file);
> +	struct ceph_inode_info *ci = ceph_inode(inode);
> +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> +	struct ceph_snap_context *snapc;
> +	struct ceph_vino vino;
> +	struct ceph_osd_request *req;
> +	int num_ops = 1;
> +	struct page **pages;
> +	int num_pages;
> +	u64 len;
> +	int written = 0;
> +	int flags;
> +	int check_caps = 0;
> +	int ret, i;
> +	struct timespec mtime = CURRENT_TIME;
> +	loff_t pos = iocb->ki_pos;
> +	struct iovec *iov_clone;
> +
> +	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
> +		return -EROFS;
> +
> +	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
> +
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
> +	if (ret < 0)
> +		return ret;
> +
> +	ret = invalidate_inode_pages2_range(inode->i_mapping,
> +					    pos >> PAGE_CACHE_SHIFT,
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
> +	if (ret < 0)
> +		dout("invalidate_inode_pages2_range returned %d\n", ret);
> +
> +	flags = CEPH_OSD_FLAG_ORDERSNAP |
> +		CEPH_OSD_FLAG_ONDISK |
> +		CEPH_OSD_FLAG_WRITE |
> +		CEPH_OSD_FLAG_ACK;
> +
> +	iov_clone = kmalloc(nr_segs * sizeof(struct iovec), GFP_KERNEL);
> +	if (iov_clone == NULL)
> +		return -ENOMEM;
> +	memcpy(iov_clone, iov, nr_segs * sizeof(struct iovec));
> +
> +	for (i = 0; i < nr_segs && count; i++) {
> +		void __user *data;
> +		size_t left;
> +
> +		left = count;
> +more:
> +		len = left;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len, num_ops,
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
> +
> +		/*
> +		 * write from beginning of first page,
> +		 * regardless of io alignment
> +		 */
> +		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
> +
>  		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
>  			goto out;
>  		}
> -		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
> +
> +		if (len <= iov_clone[i].iov_len) {
> +			data = iov_clone[i].iov_base;
> +			ret = ceph_copy_user_to_page_vector(pages,
> +								data, 0, len);
> +			if (ret > 0) {
> +				iov_clone[i].iov_base += ret;
> +				iov_clone[i].iov_len -= ret;
> +			}
> +		} else {
> +			int j, l, k = 0, copyed = 0;
> +			size_t tmp = len;
> +
> +			for (j = i; j < nr_segs && tmp; j++) {
> +				data = iov_clone[j].iov_base;
> +				l = iov_clone[j].iov_len;
> +
> +				if (tmp < l) {
> +					ret = ceph_copy_user_to_page_vector(&pages[k],
> +									    data,
> +									    copyed,
> +									    tmp);
> +					iov_clone[j].iov_len -= ret;
> +					iov_clone[j].iov_base += ret;
> +					break;
> +				} else if (l) {
> +					ret = ceph_copy_user_to_page_vector(&pages[k],
> +									    data,
> +									    copyed,
> +									    l);
> +					if (ret < 0)
> +						break;
> +					iov_clone[j].iov_len = 0;
> +					copyed += ret;
> +					tmp -= ret;
> +					k = calc_pages_for(0, copyed + 1) - 1;
> +				}
> +			}
> +
> +			/*
> +			 * For this case,it will call for action.i will add one
> +			 * But iov_clone[j].iov_len maybe not zero.
> +			 */
> +			if (left == len)
> +				i = j - 1;
> +		}
> +
>  		if (ret < 0) {
>  			ceph_release_page_vector(pages, num_pages);
>  			goto out;
>  		}
>  
> -		if ((file->f_flags & O_SYNC) == 0) {
> -			/* get a second commit callback */
> -			req->r_unsafe_callback = ceph_sync_write_unsafe;
> -			req->r_inode = inode;
> -			own_pages = true;
> -		}
> -	}
> -	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> -					false, own_pages);
> +		/* get a second commit callback */
> +		req->r_unsafe_callback = ceph_sync_write_unsafe;
> +		req->r_inode = inode;
>  
> -	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> -	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
> +						false, true);
>  
> -	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -	if (!ret)
> -		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>  
> -	if (file->f_flags & O_DIRECT)
> -		ceph_put_page_vector(pages, num_pages, false);
> -	else if (file->f_flags & O_SYNC)
> -		ceph_release_page_vector(pages, num_pages);
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  
>  out:
> -	ceph_osdc_put_request(req);
> -	if (ret == 0) {
> -		pos += len;
> -		written += len;
> -		left -= len;
> -		data += len;
> -		if (left)
> -			goto more;
> -
> -		ret = written;
> -		*ppos = pos;
> -		if (pos > i_size_read(inode))
> -			check_caps = ceph_inode_set_size(inode, pos);
> -		if (check_caps)
> -			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
> -					NULL);
> -	} else if (ret != -EOLDSNAPC && written > 0) {
> -		ret = written;
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +			left -= len;
> +			count -= len;
> +			if (left)
> +				goto more;
> +
> +			ret = written;
> +			if (pos > i_size_read(inode))
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);
> +		} else {
> +			if (ret != -EOLDSNAPC && written > 0)
> +				ret = written;
> +			break;
> +		}
>  	}
> +
> +	if (ret > 0)
> +		iocb->ki_pos = pos;
> +	kfree(iov_clone);
>  	return ret;
>  }
>  
> @@ -843,11 +1009,13 @@ retry_snap:
>  	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>  
>  	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
> -	    (iocb->ki_filp->f_flags & O_DIRECT) ||
> -	    (fi->flags & CEPH_F_SYNC)) {
> +	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
>  		mutex_unlock(&inode->i_mutex);
> -		written = ceph_sync_write(file, iov->iov_base, count,
> -					  pos, &iocb->ki_pos);
> +		if (file->f_flags & O_DIRECT)
> +			written = ceph_sync_direct_write(iocb, iov,
> +							 nr_segs, count);
> +		else
> +			written = ceph_sync_write(iocb, iov, nr_segs, count);
>  		if (written == -EOLDSNAPC) {
>  			dout("aio_write %p %llx.%llx %llu~%u"
>  				"got EOLDSNAPC, retrying\n",
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Yan, Zheng Sept. 4, 2013, 1:20 p.m. UTC | #2
Hi,

Thank you for the patch.

On 09/03/2013 04:52 PM, majianpeng wrote:
> For writev/pwritev sync-operatoin, ceph only do the first iov.
> It don't think other iovs.Now implement this.
> I divided the write-sync-operation into two functions.One for
> direct-write,other for none-direct-sync-write.This is because for
> none-direct-sync-write we can merge iovs to one.But for direct-write,
> we can't merge iovs.
> 
> Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>
> ---
>  fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 248 insertions(+), 80 deletions(-)
> 
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 7d6a3ee..42c97b3 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -533,17 +533,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
>  	}
>  }
>  
> +
>  /*
> - * Synchronous write, straight from __user pointer or user pages (if
> - * O_DIRECT).
> + * Synchronous write, straight from __user pointer or user pages.
>   *
>   * If write spans object boundary, just do multiple writes.  (For a
>   * correct atomic write, we should e.g. take write locks on all
>   * objects, rollback on failure, etc.)
>   */
> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
> -			       size_t left, loff_t pos, loff_t *ppos)
> +static ssize_t
> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
> +		       unsigned long nr_segs, size_t count)
>  {
> +	struct file *file = iocb->ki_filp;
>  	struct inode *inode = file_inode(file);
>  	struct ceph_inode_info *ci = ceph_inode(inode);
>  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> @@ -557,59 +559,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
>  	int written = 0;
>  	int flags;
>  	int check_caps = 0;
> -	int page_align, io_align;
> -	unsigned long buf_align;
> -	int ret;
> +	int page_align;
> +	int ret, i;
>  	struct timespec mtime = CURRENT_TIME;
> -	bool own_pages = false;
> +	loff_t pos = iocb->ki_pos;
>  
>  	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>  		return -EROFS;
>  
> -	dout("sync_write on file %p %lld~%u %s\n", file, pos,
> -	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
> +	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
> +	     (unsigned)count);
>  
> -	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>  	if (ret < 0)
>  		return ret;
>  
>  	ret = invalidate_inode_pages2_range(inode->i_mapping,
>  					    pos >> PAGE_CACHE_SHIFT,
> -					    (pos + left) >> PAGE_CACHE_SHIFT);
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
>  	if (ret < 0)
>  		dout("invalidate_inode_pages2_range returned %d\n", ret);
>  
>  	flags = CEPH_OSD_FLAG_ORDERSNAP |
>  		CEPH_OSD_FLAG_ONDISK |
>  		CEPH_OSD_FLAG_WRITE;
> -	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
> -		flags |= CEPH_OSD_FLAG_ACK;
> -	else
> -		num_ops++;	/* Also include a 'startsync' command. */
> +	num_ops++;	/* Also include a 'startsync' command. */
>  
> -	/*
> -	 * we may need to do multiple writes here if we span an object
> -	 * boundary.  this isn't atomic, unfortunately.  :(
> -	 */
> -more:
> -	io_align = pos & ~PAGE_MASK;
> -	buf_align = (unsigned long)data & ~PAGE_MASK;
> -	len = left;
> +	for (i = 0; i < nr_segs && count; i++) {

POSIX requires that write syscall is atomic. I means we should allocate a single OSD request
for all buffer segments that belong to the same object.

Regards
Yan, Zheng

> +		void __user *data = iov[i].iov_base;
> +		size_t left;
>  
> -	snapc = ci->i_snap_realm->cached_context;
> -	vino = ceph_vino(inode);
> -	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> -				    vino, pos, &len, num_ops,
> -				    CEPH_OSD_OP_WRITE, flags, snapc,
> -				    ci->i_truncate_seq, ci->i_truncate_size,
> -				    false);
> -	if (IS_ERR(req))
> -		return PTR_ERR(req);
> +		left = min(count, iov[i].iov_len);
> +more:
> +		page_align = (unsigned long)data & ~PAGE_MASK;
> +		len = left;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len, num_ops,
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
>  
> -	/* write from beginning of first page, regardless of io alignment */
> -	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
> -	num_pages = calc_pages_for(page_align, len);
> -	if (file->f_flags & O_DIRECT) {
> +		num_pages = calc_pages_for(page_align, len);
>  		pages = ceph_get_direct_page_vector(data, num_pages, false);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
> @@ -621,61 +619,229 @@ more:
>  		 * may block.
>  		 */
>  		truncate_inode_pages_range(inode->i_mapping, pos,
> -					   (pos+len) | (PAGE_CACHE_SIZE-1));
> -	} else {
> +				   (pos+len) | (PAGE_CACHE_SIZE-1));
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> +						false, false);
> +
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +
> +		ceph_put_page_vector(pages, num_pages, false);
> +
> +out:
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +			left -= len;
> +			count -= len;
> +			data += len;
> +			if (left)
> +				goto more;
> +
> +			ret = written;
> +			if (pos > i_size_read(inode))
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);
> +		} else {
> +			if (ret != -EOLDSNAPC && written > 0)
> +				ret = written;
> +			break;
> +		}
> +	}
> +
> +	if (ret > 0)
> +		iocb->ki_pos = pos;
> +	return ret;
> +}
> +
> +
> +/*
> + * Synchronous write, straight from __user pointer or user pages.
> + *
> + * If write spans object boundary, just do multiple writes.  (For a
> + * correct atomic write, we should e.g. take write locks on all
> + * objects, rollback on failure, etc.)
> + */
> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
> +			       unsigned long nr_segs, size_t count)
> +{
> +	struct file *file = iocb->ki_filp;
> +	struct inode *inode = file_inode(file);
> +	struct ceph_inode_info *ci = ceph_inode(inode);
> +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
> +	struct ceph_snap_context *snapc;
> +	struct ceph_vino vino;
> +	struct ceph_osd_request *req;
> +	int num_ops = 1;
> +	struct page **pages;
> +	int num_pages;
> +	u64 len;
> +	int written = 0;
> +	int flags;
> +	int check_caps = 0;
> +	int ret, i;
> +	struct timespec mtime = CURRENT_TIME;
> +	loff_t pos = iocb->ki_pos;
> +	struct iovec *iov_clone;
> +
> +	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
> +		return -EROFS;
> +
> +	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
> +
> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
> +	if (ret < 0)
> +		return ret;
> +
> +	ret = invalidate_inode_pages2_range(inode->i_mapping,
> +					    pos >> PAGE_CACHE_SHIFT,
> +					    (pos + count) >> PAGE_CACHE_SHIFT);
> +	if (ret < 0)
> +		dout("invalidate_inode_pages2_range returned %d\n", ret);
> +
> +	flags = CEPH_OSD_FLAG_ORDERSNAP |
> +		CEPH_OSD_FLAG_ONDISK |
> +		CEPH_OSD_FLAG_WRITE |
> +		CEPH_OSD_FLAG_ACK;
> +
> +	iov_clone = kmalloc(nr_segs * sizeof(struct iovec), GFP_KERNEL);
> +	if (iov_clone == NULL)
> +		return -ENOMEM;
> +	memcpy(iov_clone, iov, nr_segs * sizeof(struct iovec));
> +
> +	for (i = 0; i < nr_segs && count; i++) {
> +		void __user *data;
> +		size_t left;
> +
> +		left = count;
> +more:
> +		len = left;
> +
> +		snapc = ci->i_snap_realm->cached_context;
> +		vino = ceph_vino(inode);
> +		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
> +					    vino, pos, &len, num_ops,
> +					    CEPH_OSD_OP_WRITE, flags, snapc,
> +					    ci->i_truncate_seq,
> +					    ci->i_truncate_size,
> +					    false);
> +		if (IS_ERR(req)) {
> +			ret = PTR_ERR(req);
> +			goto out;
> +		}
> +
> +		/*
> +		 * write from beginning of first page,
> +		 * regardless of io alignment
> +		 */
> +		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
> +
>  		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
>  		if (IS_ERR(pages)) {
>  			ret = PTR_ERR(pages);
>  			goto out;
>  		}
> -		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
> +
> +		if (len <= iov_clone[i].iov_len) {
> +			data = iov_clone[i].iov_base;
> +			ret = ceph_copy_user_to_page_vector(pages,
> +								data, 0, len);
> +			if (ret > 0) {
> +				iov_clone[i].iov_base += ret;
> +				iov_clone[i].iov_len -= ret;
> +			}
> +		} else {
> +			int j, l, k = 0, copyed = 0;
> +			size_t tmp = len;
> +
> +			for (j = i; j < nr_segs && tmp; j++) {
> +				data = iov_clone[j].iov_base;
> +				l = iov_clone[j].iov_len;
> +
> +				if (tmp < l) {
> +					ret = ceph_copy_user_to_page_vector(&pages[k],
> +									    data,
> +									    copyed,
> +									    tmp);
> +					iov_clone[j].iov_len -= ret;
> +					iov_clone[j].iov_base += ret;
> +					break;
> +				} else if (l) {
> +					ret = ceph_copy_user_to_page_vector(&pages[k],
> +									    data,
> +									    copyed,
> +									    l);
> +					if (ret < 0)
> +						break;
> +					iov_clone[j].iov_len = 0;
> +					copyed += ret;
> +					tmp -= ret;
> +					k = calc_pages_for(0, copyed + 1) - 1;
> +				}
> +			}
> +
> +			/*
> +			 * For this case,it will call for action.i will add one
> +			 * But iov_clone[j].iov_len maybe not zero.
> +			 */
> +			if (left == len)
> +				i = j - 1;
> +		}
> +
>  		if (ret < 0) {
>  			ceph_release_page_vector(pages, num_pages);
>  			goto out;
>  		}
>  
> -		if ((file->f_flags & O_SYNC) == 0) {
> -			/* get a second commit callback */
> -			req->r_unsafe_callback = ceph_sync_write_unsafe;
> -			req->r_inode = inode;
> -			own_pages = true;
> -		}
> -	}
> -	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
> -					false, own_pages);
> +		/* get a second commit callback */
> +		req->r_unsafe_callback = ceph_sync_write_unsafe;
> +		req->r_inode = inode;
>  
> -	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> -	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
> +		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
> +						false, true);
>  
> -	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> -	if (!ret)
> -		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
> +		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
> +		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
>  
> -	if (file->f_flags & O_DIRECT)
> -		ceph_put_page_vector(pages, num_pages, false);
> -	else if (file->f_flags & O_SYNC)
> -		ceph_release_page_vector(pages, num_pages);
> +		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
> +		if (!ret)
> +			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  
>  out:
> -	ceph_osdc_put_request(req);
> -	if (ret == 0) {
> -		pos += len;
> -		written += len;
> -		left -= len;
> -		data += len;
> -		if (left)
> -			goto more;
> -
> -		ret = written;
> -		*ppos = pos;
> -		if (pos > i_size_read(inode))
> -			check_caps = ceph_inode_set_size(inode, pos);
> -		if (check_caps)
> -			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
> -					NULL);
> -	} else if (ret != -EOLDSNAPC && written > 0) {
> -		ret = written;
> +		ceph_osdc_put_request(req);
> +		if (ret == 0) {
> +			pos += len;
> +			written += len;
> +			left -= len;
> +			count -= len;
> +			if (left)
> +				goto more;
> +
> +			ret = written;
> +			if (pos > i_size_read(inode))
> +				check_caps = ceph_inode_set_size(inode, pos);
> +				if (check_caps)
> +					ceph_check_caps(ceph_inode(inode),
> +							CHECK_CAPS_AUTHONLY,
> +							NULL);
> +		} else {
> +			if (ret != -EOLDSNAPC && written > 0)
> +				ret = written;
> +			break;
> +		}
>  	}
> +
> +	if (ret > 0)
> +		iocb->ki_pos = pos;
> +	kfree(iov_clone);
>  	return ret;
>  }
>  
> @@ -843,11 +1009,13 @@ retry_snap:
>  	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
>  
>  	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
> -	    (iocb->ki_filp->f_flags & O_DIRECT) ||
> -	    (fi->flags & CEPH_F_SYNC)) {
> +	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
>  		mutex_unlock(&inode->i_mutex);
> -		written = ceph_sync_write(file, iov->iov_base, count,
> -					  pos, &iocb->ki_pos);
> +		if (file->f_flags & O_DIRECT)
> +			written = ceph_sync_direct_write(iocb, iov,
> +							 nr_segs, count);
> +		else
> +			written = ceph_sync_write(iocb, iov, nr_segs, count);
>  		if (written == -EOLDSNAPC) {
>  			dout("aio_write %p %llx.%llx %llu~%u"
>  				"got EOLDSNAPC, retrying\n",
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
majianpeng Sept. 6, 2013, 12:46 a.m. UTC | #3
>Hi,

>

>Thank you for the patch.

>

>On 09/03/2013 04:52 PM, majianpeng wrote:

>> For writev/pwritev sync-operatoin, ceph only do the first iov.

>> It don't think other iovs.Now implement this.

>> I divided the write-sync-operation into two functions.One for

>> direct-write,other for none-direct-sync-write.This is because for

>> none-direct-sync-write we can merge iovs to one.But for direct-write,

>> we can't merge iovs.

>> 

>> Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>

>> ---

>>  fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------

>>  1 file changed, 248 insertions(+), 80 deletions(-)

>> 

>> diff --git a/fs/ceph/file.c b/fs/ceph/file.c

>> index 7d6a3ee..42c97b3 100644

>> --- a/fs/ceph/file.c

>> +++ b/fs/ceph/file.c

>> @@ -533,17 +533,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)

>>  	}

>>  }

>>  

>> +

>>  /*

>> - * Synchronous write, straight from __user pointer or user pages (if

>> - * O_DIRECT).

>> + * Synchronous write, straight from __user pointer or user pages.

>>   *

>>   * If write spans object boundary, just do multiple writes.  (For a

>>   * correct atomic write, we should e.g. take write locks on all

>>   * objects, rollback on failure, etc.)

>>   */

>> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,

>> -			       size_t left, loff_t pos, loff_t *ppos)

>> +static ssize_t

>> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,

>> +		       unsigned long nr_segs, size_t count)

>>  {

>> +	struct file *file = iocb->ki_filp;

>>  	struct inode *inode = file_inode(file);

>>  	struct ceph_inode_info *ci = ceph_inode(inode);

>>  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);

>> @@ -557,59 +559,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,

>>  	int written = 0;

>>  	int flags;

>>  	int check_caps = 0;

>> -	int page_align, io_align;

>> -	unsigned long buf_align;

>> -	int ret;

>> +	int page_align;

>> +	int ret, i;

>>  	struct timespec mtime = CURRENT_TIME;

>> -	bool own_pages = false;

>> +	loff_t pos = iocb->ki_pos;

>>  

>>  	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)

>>  		return -EROFS;

>>  

>> -	dout("sync_write on file %p %lld~%u %s\n", file, pos,

>> -	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");

>> +	dout("sync_direct_write on file %p %lld~%u\n", file, pos,

>> +	     (unsigned)count);

>>  

>> -	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);

>> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);

>>  	if (ret < 0)

>>  		return ret;

>>  

>>  	ret = invalidate_inode_pages2_range(inode->i_mapping,

>>  					    pos >> PAGE_CACHE_SHIFT,

>> -					    (pos + left) >> PAGE_CACHE_SHIFT);

>> +					    (pos + count) >> PAGE_CACHE_SHIFT);

>>  	if (ret < 0)

>>  		dout("invalidate_inode_pages2_range returned %d\n", ret);

>>  

>>  	flags = CEPH_OSD_FLAG_ORDERSNAP |

>>  		CEPH_OSD_FLAG_ONDISK |

>>  		CEPH_OSD_FLAG_WRITE;

>> -	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)

>> -		flags |= CEPH_OSD_FLAG_ACK;

>> -	else

>> -		num_ops++;	/* Also include a 'startsync' command. */

>> +	num_ops++;	/* Also include a 'startsync' command. */

>>  

>> -	/*

>> -	 * we may need to do multiple writes here if we span an object

>> -	 * boundary.  this isn't atomic, unfortunately.  :(

>> -	 */

>> -more:

>> -	io_align = pos & ~PAGE_MASK;

>> -	buf_align = (unsigned long)data & ~PAGE_MASK;

>> -	len = left;

>> +	for (i = 0; i < nr_segs && count; i++) {

>

>POSIX requires that write syscall is atomic. I means we should allocate a single OSD request

>for all buffer segments that belong to the same object.

>

I think we could not.
For direct write, we use ceph_get_direct_page_vector to get pages.
Given iov1 and iov2 are in the same object. But we can't make the pages of iov1/2 to join together.
Because for ceph page_vector,it only record the offset of first page.

Or am i missing something?
Maybe we can use ceph pagelist but it will copy data.

Thanks!
Jianpeng Ma
>Regards

>Yan, Zheng
Yan, Zheng Sept. 6, 2013, 1:09 a.m. UTC | #4
On 09/06/2013 08:46 AM, majianpeng wrote:
>> Hi,
>>
>> Thank you for the patch.
>>
>> On 09/03/2013 04:52 PM, majianpeng wrote:
>>> For writev/pwritev sync-operatoin, ceph only do the first iov.
>>> It don't think other iovs.Now implement this.
>>> I divided the write-sync-operation into two functions.One for
>>> direct-write,other for none-direct-sync-write.This is because for
>>> none-direct-sync-write we can merge iovs to one.But for direct-write,
>>> we can't merge iovs.
>>>
>>> Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>
>>> ---
>>>  fs/ceph/file.c | 328 +++++++++++++++++++++++++++++++++++++++++++--------------
>>>  1 file changed, 248 insertions(+), 80 deletions(-)
>>>
>>> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
>>> index 7d6a3ee..42c97b3 100644
>>> --- a/fs/ceph/file.c
>>> +++ b/fs/ceph/file.c
>>> @@ -533,17 +533,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
>>>  	}
>>>  }
>>>  
>>> +
>>>  /*
>>> - * Synchronous write, straight from __user pointer or user pages (if
>>> - * O_DIRECT).
>>> + * Synchronous write, straight from __user pointer or user pages.
>>>   *
>>>   * If write spans object boundary, just do multiple writes.  (For a
>>>   * correct atomic write, we should e.g. take write locks on all
>>>   * objects, rollback on failure, etc.)
>>>   */
>>> -static ssize_t ceph_sync_write(struct file *file, const char __user *data,
>>> -			       size_t left, loff_t pos, loff_t *ppos)
>>> +static ssize_t
>>> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
>>> +		       unsigned long nr_segs, size_t count)
>>>  {
>>> +	struct file *file = iocb->ki_filp;
>>>  	struct inode *inode = file_inode(file);
>>>  	struct ceph_inode_info *ci = ceph_inode(inode);
>>>  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
>>> @@ -557,59 +559,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
>>>  	int written = 0;
>>>  	int flags;
>>>  	int check_caps = 0;
>>> -	int page_align, io_align;
>>> -	unsigned long buf_align;
>>> -	int ret;
>>> +	int page_align;
>>> +	int ret, i;
>>>  	struct timespec mtime = CURRENT_TIME;
>>> -	bool own_pages = false;
>>> +	loff_t pos = iocb->ki_pos;
>>>  
>>>  	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
>>>  		return -EROFS;
>>>  
>>> -	dout("sync_write on file %p %lld~%u %s\n", file, pos,
>>> -	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
>>> +	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
>>> +	     (unsigned)count);
>>>  
>>> -	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
>>> +	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
>>>  	if (ret < 0)
>>>  		return ret;
>>>  
>>>  	ret = invalidate_inode_pages2_range(inode->i_mapping,
>>>  					    pos >> PAGE_CACHE_SHIFT,
>>> -					    (pos + left) >> PAGE_CACHE_SHIFT);
>>> +					    (pos + count) >> PAGE_CACHE_SHIFT);
>>>  	if (ret < 0)
>>>  		dout("invalidate_inode_pages2_range returned %d\n", ret);
>>>  
>>>  	flags = CEPH_OSD_FLAG_ORDERSNAP |
>>>  		CEPH_OSD_FLAG_ONDISK |
>>>  		CEPH_OSD_FLAG_WRITE;
>>> -	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
>>> -		flags |= CEPH_OSD_FLAG_ACK;
>>> -	else
>>> -		num_ops++;	/* Also include a 'startsync' command. */
>>> +	num_ops++;	/* Also include a 'startsync' command. */
>>>  
>>> -	/*
>>> -	 * we may need to do multiple writes here if we span an object
>>> -	 * boundary.  this isn't atomic, unfortunately.  :(
>>> -	 */
>>> -more:
>>> -	io_align = pos & ~PAGE_MASK;
>>> -	buf_align = (unsigned long)data & ~PAGE_MASK;
>>> -	len = left;
>>> +	for (i = 0; i < nr_segs && count; i++) {
>>
>> POSIX requires that write syscall is atomic. I means we should allocate a single OSD request
>> for all buffer segments that belong to the same object.
>>
> I think we could not.
> For direct write, we use ceph_get_direct_page_vector to get pages.
> Given iov1 and iov2 are in the same object. But we can't make the pages of iov1/2 to join together.
> Because for ceph page_vector,it only record the offset of first page.
> 
> Or am i missing something?
> Maybe we can use ceph pagelist but it will copy data.
> 

I'm wrong with the direct IO case (ext4 doesn't guarantee atomicity in direct write). But please keep
buffered write atomic.

Regards
Yan, Zheng

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 7d6a3ee..42c97b3 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -533,17 +533,19 @@  static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 	}
 }
 
+
 /*
- * Synchronous write, straight from __user pointer or user pages (if
- * O_DIRECT).
+ * Synchronous write, straight from __user pointer or user pages.
  *
  * If write spans object boundary, just do multiple writes.  (For a
  * correct atomic write, we should e.g. take write locks on all
  * objects, rollback on failure, etc.)
  */
-static ssize_t ceph_sync_write(struct file *file, const char __user *data,
-			       size_t left, loff_t pos, loff_t *ppos)
+static ssize_t
+ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
+		       unsigned long nr_segs, size_t count)
 {
+	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
@@ -557,59 +559,55 @@  static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 	int written = 0;
 	int flags;
 	int check_caps = 0;
-	int page_align, io_align;
-	unsigned long buf_align;
-	int ret;
+	int page_align;
+	int ret, i;
 	struct timespec mtime = CURRENT_TIME;
-	bool own_pages = false;
+	loff_t pos = iocb->ki_pos;
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_write on file %p %lld~%u %s\n", file, pos,
-	     (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
+	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
+	     (unsigned)count);
 
-	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
+	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 	if (ret < 0)
 		return ret;
 
 	ret = invalidate_inode_pages2_range(inode->i_mapping,
 					    pos >> PAGE_CACHE_SHIFT,
-					    (pos + left) >> PAGE_CACHE_SHIFT);
+					    (pos + count) >> PAGE_CACHE_SHIFT);
 	if (ret < 0)
 		dout("invalidate_inode_pages2_range returned %d\n", ret);
 
 	flags = CEPH_OSD_FLAG_ORDERSNAP |
 		CEPH_OSD_FLAG_ONDISK |
 		CEPH_OSD_FLAG_WRITE;
-	if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
-		flags |= CEPH_OSD_FLAG_ACK;
-	else
-		num_ops++;	/* Also include a 'startsync' command. */
+	num_ops++;	/* Also include a 'startsync' command. */
 
-	/*
-	 * we may need to do multiple writes here if we span an object
-	 * boundary.  this isn't atomic, unfortunately.  :(
-	 */
-more:
-	io_align = pos & ~PAGE_MASK;
-	buf_align = (unsigned long)data & ~PAGE_MASK;
-	len = left;
+	for (i = 0; i < nr_segs && count; i++) {
+		void __user *data = iov[i].iov_base;
+		size_t left;
 
-	snapc = ci->i_snap_realm->cached_context;
-	vino = ceph_vino(inode);
-	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-				    vino, pos, &len, num_ops,
-				    CEPH_OSD_OP_WRITE, flags, snapc,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
+		left = min(count, iov[i].iov_len);
+more:
+		page_align = (unsigned long)data & ~PAGE_MASK;
+		len = left;
+
+		snapc = ci->i_snap_realm->cached_context;
+		vino = ceph_vino(inode);
+		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+					    vino, pos, &len, num_ops,
+					    CEPH_OSD_OP_WRITE, flags, snapc,
+					    ci->i_truncate_seq,
+					    ci->i_truncate_size,
+					    false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			goto out;
+		}
 
-	/* write from beginning of first page, regardless of io alignment */
-	page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
-	num_pages = calc_pages_for(page_align, len);
-	if (file->f_flags & O_DIRECT) {
+		num_pages = calc_pages_for(page_align, len);
 		pages = ceph_get_direct_page_vector(data, num_pages, false);
 		if (IS_ERR(pages)) {
 			ret = PTR_ERR(pages);
@@ -621,61 +619,229 @@  more:
 		 * may block.
 		 */
 		truncate_inode_pages_range(inode->i_mapping, pos,
-					   (pos+len) | (PAGE_CACHE_SIZE-1));
-	} else {
+				   (pos+len) | (PAGE_CACHE_SIZE-1));
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+						false, false);
+
+		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+
+		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+		ceph_put_page_vector(pages, num_pages, false);
+
+out:
+		ceph_osdc_put_request(req);
+		if (ret == 0) {
+			pos += len;
+			written += len;
+			left -= len;
+			count -= len;
+			data += len;
+			if (left)
+				goto more;
+
+			ret = written;
+			if (pos > i_size_read(inode))
+				check_caps = ceph_inode_set_size(inode, pos);
+				if (check_caps)
+					ceph_check_caps(ceph_inode(inode),
+							CHECK_CAPS_AUTHONLY,
+							NULL);
+		} else {
+			if (ret != -EOLDSNAPC && written > 0)
+				ret = written;
+			break;
+		}
+	}
+
+	if (ret > 0)
+		iocb->ki_pos = pos;
+	return ret;
+}
+
+
+/*
+ * Synchronous write, straight from __user pointer or user pages.
+ *
+ * If write spans object boundary, just do multiple writes.  (For a
+ * correct atomic write, we should e.g. take write locks on all
+ * objects, rollback on failure, etc.)
+ */
+static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
+			       unsigned long nr_segs, size_t count)
+{
+	struct file *file = iocb->ki_filp;
+	struct inode *inode = file_inode(file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_snap_context *snapc;
+	struct ceph_vino vino;
+	struct ceph_osd_request *req;
+	int num_ops = 1;
+	struct page **pages;
+	int num_pages;
+	u64 len;
+	int written = 0;
+	int flags;
+	int check_caps = 0;
+	int ret, i;
+	struct timespec mtime = CURRENT_TIME;
+	loff_t pos = iocb->ki_pos;
+	struct iovec *iov_clone;
+
+	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+		return -EROFS;
+
+	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
+
+	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
+	if (ret < 0)
+		return ret;
+
+	ret = invalidate_inode_pages2_range(inode->i_mapping,
+					    pos >> PAGE_CACHE_SHIFT,
+					    (pos + count) >> PAGE_CACHE_SHIFT);
+	if (ret < 0)
+		dout("invalidate_inode_pages2_range returned %d\n", ret);
+
+	flags = CEPH_OSD_FLAG_ORDERSNAP |
+		CEPH_OSD_FLAG_ONDISK |
+		CEPH_OSD_FLAG_WRITE |
+		CEPH_OSD_FLAG_ACK;
+
+	iov_clone = kmalloc(nr_segs * sizeof(struct iovec), GFP_KERNEL);
+	if (iov_clone == NULL)
+		return -ENOMEM;
+	memcpy(iov_clone, iov, nr_segs * sizeof(struct iovec));
+
+	for (i = 0; i < nr_segs && count; i++) {
+		void __user *data;
+		size_t left;
+
+		left = count;
+more:
+		len = left;
+
+		snapc = ci->i_snap_realm->cached_context;
+		vino = ceph_vino(inode);
+		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+					    vino, pos, &len, num_ops,
+					    CEPH_OSD_OP_WRITE, flags, snapc,
+					    ci->i_truncate_seq,
+					    ci->i_truncate_size,
+					    false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			goto out;
+		}
+
+		/*
+		 * write from beginning of first page,
+		 * regardless of io alignment
+		 */
+		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+
 		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
 		if (IS_ERR(pages)) {
 			ret = PTR_ERR(pages);
 			goto out;
 		}
-		ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
+
+		if (len <= iov_clone[i].iov_len) {
+			data = iov_clone[i].iov_base;
+			ret = ceph_copy_user_to_page_vector(pages,
+								data, 0, len);
+			if (ret > 0) {
+				iov_clone[i].iov_base += ret;
+				iov_clone[i].iov_len -= ret;
+			}
+		} else {
+			int j, l, k = 0, copyed = 0;
+			size_t tmp = len;
+
+			for (j = i; j < nr_segs && tmp; j++) {
+				data = iov_clone[j].iov_base;
+				l = iov_clone[j].iov_len;
+
+				if (tmp < l) {
+					ret = ceph_copy_user_to_page_vector(&pages[k],
+									    data,
+									    copyed,
+									    tmp);
+					iov_clone[j].iov_len -= ret;
+					iov_clone[j].iov_base += ret;
+					break;
+				} else if (l) {
+					ret = ceph_copy_user_to_page_vector(&pages[k],
+									    data,
+									    copyed,
+									    l);
+					if (ret < 0)
+						break;
+					iov_clone[j].iov_len = 0;
+					copyed += ret;
+					tmp -= ret;
+					k = calc_pages_for(0, copyed + 1) - 1;
+				}
+			}
+
+			/*
+			 * For this case,it will call for action.i will add one
+			 * But iov_clone[j].iov_len maybe not zero.
+			 */
+			if (left == len)
+				i = j - 1;
+		}
+
 		if (ret < 0) {
 			ceph_release_page_vector(pages, num_pages);
 			goto out;
 		}
 
-		if ((file->f_flags & O_SYNC) == 0) {
-			/* get a second commit callback */
-			req->r_unsafe_callback = ceph_sync_write_unsafe;
-			req->r_inode = inode;
-			own_pages = true;
-		}
-	}
-	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
-					false, own_pages);
+		/* get a second commit callback */
+		req->r_unsafe_callback = ceph_sync_write_unsafe;
+		req->r_inode = inode;
 
-	/* BUG_ON(vino.snap != CEPH_NOSNAP); */
-	ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+						false, true);
 
-	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-	if (!ret)
-		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
+		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
-	if (file->f_flags & O_DIRECT)
-		ceph_put_page_vector(pages, num_pages, false);
-	else if (file->f_flags & O_SYNC)
-		ceph_release_page_vector(pages, num_pages);
+		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 out:
-	ceph_osdc_put_request(req);
-	if (ret == 0) {
-		pos += len;
-		written += len;
-		left -= len;
-		data += len;
-		if (left)
-			goto more;
-
-		ret = written;
-		*ppos = pos;
-		if (pos > i_size_read(inode))
-			check_caps = ceph_inode_set_size(inode, pos);
-		if (check_caps)
-			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
-					NULL);
-	} else if (ret != -EOLDSNAPC && written > 0) {
-		ret = written;
+		ceph_osdc_put_request(req);
+		if (ret == 0) {
+			pos += len;
+			written += len;
+			left -= len;
+			count -= len;
+			if (left)
+				goto more;
+
+			ret = written;
+			if (pos > i_size_read(inode))
+				check_caps = ceph_inode_set_size(inode, pos);
+				if (check_caps)
+					ceph_check_caps(ceph_inode(inode),
+							CHECK_CAPS_AUTHONLY,
+							NULL);
+		} else {
+			if (ret != -EOLDSNAPC && written > 0)
+				ret = written;
+			break;
+		}
 	}
+
+	if (ret > 0)
+		iocb->ki_pos = pos;
+	kfree(iov_clone);
 	return ret;
 }
 
@@ -843,11 +1009,13 @@  retry_snap:
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (fi->flags & CEPH_F_SYNC)) {
+	    (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
 		mutex_unlock(&inode->i_mutex);
-		written = ceph_sync_write(file, iov->iov_base, count,
-					  pos, &iocb->ki_pos);
+		if (file->f_flags & O_DIRECT)
+			written = ceph_sync_direct_write(iocb, iov,
+							 nr_segs, count);
+		else
+			written = ceph_sync_write(iocb, iov, nr_segs, count);
 		if (written == -EOLDSNAPC) {
 			dout("aio_write %p %llx.%llx %llu~%u"
 				"got EOLDSNAPC, retrying\n",