Message ID | 2013091213542263913011@gmail.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> On 09/12/2013 01:54 PM, majianpeng wrote: > For writev/pwritev sync-operatoin, ceph only do the first iov. > It don't think other iovs.Now implement this. > I divided the write-sync-operation into two functions.One for > direct-write,other for none-direct-sync-write.This is because for > none-direct-sync-write we can merge iovs to one.But for direct-write, > we can't merge iovs. > > V4: > reconstruct the code by Yan, Zheng > V2: > -using struct iov_iter replace clone iovs in ceph_sync_write. > > Signed-off-by: Jianpeng Ma <majianpeng@gmail.com> > Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> > --- > fs/ceph/file.c | 273 ++++++++++++++++++++++++++++++++++++++++----------------- > 1 file changed, 193 insertions(+), 80 deletions(-) > > diff --git a/fs/ceph/file.c b/fs/ceph/file.c > index 3de8982..5cf034e 100644 > --- a/fs/ceph/file.c > +++ b/fs/ceph/file.c > @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) > } > } > > + > /* > - * Synchronous write, straight from __user pointer or user pages (if > - * O_DIRECT). > + * Synchronous write, straight from __user pointer or user pages. > * > * If write spans object boundary, just do multiple writes. (For a > * correct atomic write, we should e.g. take write locks on all > * objects, rollback on failure, etc.) > */ > -static ssize_t ceph_sync_write(struct file *file, const char __user *data, > - size_t left, loff_t pos, loff_t *ppos) > +static ssize_t > +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, > + unsigned long nr_segs, size_t count) > { > + struct file *file = iocb->ki_filp; > struct inode *inode = file_inode(file); > struct ceph_inode_info *ci = ceph_inode(inode); > struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > struct ceph_snap_context *snapc; > struct ceph_vino vino; > struct ceph_osd_request *req; > - int num_ops = 1; > struct page **pages; > int num_pages; > - u64 len; > int written = 0; > int flags; > int check_caps = 0; > - int page_align, io_align; > - unsigned long buf_align; > + int page_align; > int ret; > struct timespec mtime = CURRENT_TIME; > - bool own_pages = false; > + loff_t pos = iocb->ki_pos; > + struct iov_iter i; > > if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) > return -EROFS; > > - dout("sync_write on file %p %lld~%u %s\n", file, pos, > - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); > + dout("sync_direct_write on file %p %lld~%u\n", file, pos, > + (unsigned)count); > > - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); > + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); > if (ret < 0) > return ret; > > ret = invalidate_inode_pages2_range(inode->i_mapping, > pos >> PAGE_CACHE_SHIFT, > - (pos + left) >> PAGE_CACHE_SHIFT); > + (pos + count) >> PAGE_CACHE_SHIFT); > if (ret < 0) > dout("invalidate_inode_pages2_range returned %d\n", ret); > > flags = CEPH_OSD_FLAG_ORDERSNAP | > CEPH_OSD_FLAG_ONDISK | > CEPH_OSD_FLAG_WRITE; > - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) > - flags |= CEPH_OSD_FLAG_ACK; > - else > - num_ops++; /* Also include a 'startsync' command. */ > > - /* > - * we may need to do multiple writes here if we span an object > - * boundary. this isn't atomic, unfortunately. :( > - */ > -more: > - io_align = pos & ~PAGE_MASK; > - buf_align = (unsigned long)data & ~PAGE_MASK; > - len = left; > - > - snapc = ci->i_snap_realm->cached_context; > - vino = ceph_vino(inode); > - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > - vino, pos, &len, num_ops, > - CEPH_OSD_OP_WRITE, flags, snapc, > - ci->i_truncate_seq, ci->i_truncate_size, > - false); > - if (IS_ERR(req)) > - return PTR_ERR(req); > + iov_iter_init(&i, iov, nr_segs, count, 0); > + > + while (iov_iter_count(&i) > 0) { > + void __user *data = i.iov->iov_base + i.iov_offset; > + u64 len = i.iov->iov_len - i.iov_offset; > + > + page_align = (unsigned long)data & ~PAGE_MASK; > + > + snapc = ci->i_snap_realm->cached_context; > + vino = ceph_vino(inode); > + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > + vino, pos, &len, > + 2,/*include a 'startsync' command*/ > + CEPH_OSD_OP_WRITE, flags, snapc, > + ci->i_truncate_seq, > + ci->i_truncate_size, > + false); > + if (IS_ERR(req)) { > + ret = PTR_ERR(req); > + goto out; > + } > > - /* write from beginning of first page, regardless of io alignment */ > - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; > - num_pages = calc_pages_for(page_align, len); > - if (file->f_flags & O_DIRECT) { > + num_pages = calc_pages_for(page_align, len); > pages = ceph_get_direct_page_vector(data, num_pages, false); > if (IS_ERR(pages)) { > ret = PTR_ERR(pages); > @@ -577,60 +573,175 @@ more: > * may block. > */ > truncate_inode_pages_range(inode->i_mapping, pos, > - (pos+len) | (PAGE_CACHE_SIZE-1)); > - } else { > + (pos+len) | (PAGE_CACHE_SIZE-1)); > + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, > + false, false); > + > + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ > + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); > + > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > + > + ceph_put_page_vector(pages, num_pages, false); > + > +out: > + ceph_osdc_put_request(req); > + if (ret == 0) { > + pos += len; > + written += len; > + iov_iter_advance(&i, (size_t)len); > + > + if (pos > i_size_read(inode)) { > + check_caps = ceph_inode_set_size(inode, pos); > + if (check_caps) > + ceph_check_caps(ceph_inode(inode), > + CHECK_CAPS_AUTHONLY, > + NULL); > + } > + } else > + break; > + } > + > + if (ret != -EOLDSNAPC && written > 0) { > + iocb->ki_pos = pos; > + ret = written; > + } > + return ret; > +} > + > + > +/* > + * Synchronous write, straight from __user pointer or user pages. > + * > + * If write spans object boundary, just do multiple writes. (For a > + * correct atomic write, we should e.g. take write locks on all > + * objects, rollback on failure, etc.) > + */ > +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, > + unsigned long nr_segs, size_t count) > +{ > + struct file *file = iocb->ki_filp; > + struct inode *inode = file_inode(file); > + struct ceph_inode_info *ci = ceph_inode(inode); > + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); > + struct ceph_snap_context *snapc; > + struct ceph_vino vino; > + struct ceph_osd_request *req; > + struct page **pages; > + u64 len; > + int num_pages; > + int written = 0; > + int flags; > + int check_caps = 0; > + int ret; > + struct timespec mtime = CURRENT_TIME; > + loff_t pos = iocb->ki_pos; > + struct iov_iter i; > + > + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) > + return -EROFS; > + > + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); > + > + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); > + if (ret < 0) > + return ret; > + > + ret = invalidate_inode_pages2_range(inode->i_mapping, > + pos >> PAGE_CACHE_SHIFT, > + (pos + count) >> PAGE_CACHE_SHIFT); > + if (ret < 0) > + dout("invalidate_inode_pages2_range returned %d\n", ret); > + > + flags = CEPH_OSD_FLAG_ORDERSNAP | > + CEPH_OSD_FLAG_ONDISK | > + CEPH_OSD_FLAG_WRITE | > + CEPH_OSD_FLAG_ACK; > + > + iov_iter_init(&i, iov, nr_segs, count, 0); > + > + while ((len = iov_iter_count(&i)) > 0) { > + size_t left; > + int n; > + > + snapc = ci->i_snap_realm->cached_context; > + vino = ceph_vino(inode); > + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, > + vino, pos, &len, 1, > + CEPH_OSD_OP_WRITE, flags, snapc, > + ci->i_truncate_seq, > + ci->i_truncate_size, > + false); > + if (IS_ERR(req)) { > + ret = PTR_ERR(req); > + goto out; > + } > + > + /* > + * write from beginning of first page, > + * regardless of io alignment > + */ > + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; > + > pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); > if (IS_ERR(pages)) { > ret = PTR_ERR(pages); > goto out; > } > - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); > + > + left = len; > + for (n = 0; n < num_pages; n++) { > + size_t plen = min(left, PAGE_SIZE); > + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); > + if (ret != plen) { > + ret = -EFAULT; > + break; > + } > + left -= ret; > + iov_iter_advance(&i, ret); > + } > + > if (ret < 0) { > ceph_release_page_vector(pages, num_pages); > goto out; > } > > - if ((file->f_flags & O_SYNC) == 0) { > - /* get a second commit callback */ > - req->r_unsafe_callback = ceph_sync_write_unsafe; > - req->r_inode = inode; > - own_pages = true; > - } > - } > - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, > - false, own_pages); > + /* get a second commit callback */ > + req->r_unsafe_callback = ceph_sync_write_unsafe; > + req->r_inode = inode; > > - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ > - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); > + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, > + false, true); > > - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > - if (!ret) > - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ > + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); > > - if (file->f_flags & O_DIRECT) > - ceph_put_page_vector(pages, num_pages, false); > - else if (file->f_flags & O_SYNC) > - ceph_release_page_vector(pages, num_pages); > + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); > + if (!ret) > + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); > > out: > - ceph_osdc_put_request(req); > - if (ret == 0) { > - pos += len; > - written += len; > - left -= len; > - data += len; > - if (left) > - goto more; > + ceph_osdc_put_request(req); > + if (ret == 0) { > + pos += len; > + written += len; > + > + if (pos > i_size_read(inode)) { > + check_caps = ceph_inode_set_size(inode, pos); > + if (check_caps) > + ceph_check_caps(ceph_inode(inode), > + CHECK_CAPS_AUTHONLY, > + NULL); > + } > + } else > + break; > + } > > + if (ret != -EOLDSNAPC && written > 0) { > ret = written; > - *ppos = pos; > - if (pos > i_size_read(inode)) > - check_caps = ceph_inode_set_size(inode, pos); > - if (check_caps) > - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, > - NULL); > - } else if (ret != -EOLDSNAPC && written > 0) { > - ret = written; > + iocb->ki_pos = pos; > } > return ret; > } > @@ -772,11 +883,13 @@ retry_snap: > inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); > > if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || > - (iocb->ki_filp->f_flags & O_DIRECT) || > - (fi->flags & CEPH_F_SYNC)) { > + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { > mutex_unlock(&inode->i_mutex); > - written = ceph_sync_write(file, iov->iov_base, count, > - pos, &iocb->ki_pos); > + if (file->f_flags & O_DIRECT) > + written = ceph_sync_direct_write(iocb, iov, > + nr_segs, count); > + else > + written = ceph_sync_write(iocb, iov, nr_segs, count); > if (written == -EOLDSNAPC) { > dout("aio_write %p %llx.%llx %llu~%u" > "got EOLDSNAPC, retrying\n", > -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
>Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> Thanks very much! Jianpeng Ma > >On 09/12/2013 01:54 PM, majianpeng wrote: >> For writev/pwritev sync-operatoin, ceph only do the first iov. >> It don't think other iovs.Now implement this. >> I divided the write-sync-operation into two functions.One for >> direct-write,other for none-direct-sync-write.This is because for >> none-direct-sync-write we can merge iovs to one.But for direct-write, >> we can't merge iovs. >> >> V4: >> reconstruct the code by Yan, Zheng >> V2: >> -using struct iov_iter replace clone iovs in ceph_sync_write. >> >> Signed-off-by: Jianpeng Ma <majianpeng@gmail.com> >> Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com> >> --- >> fs/ceph/file.c | 273 ++++++++++++++++++++++++++++++++++++++++----------------- >> 1 file changed, 193 insertions(+), 80 deletions(-) >> >> diff --git a/fs/ceph/file.c b/fs/ceph/file.c >> index 3de8982..5cf034e 100644 >> --- a/fs/ceph/file.c >> +++ b/fs/ceph/file.c >> @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) >> } >> } >> >> + >> /* >> - * Synchronous write, straight from __user pointer or user pages (if >> - * O_DIRECT). >> + * Synchronous write, straight from __user pointer or user pages. >> * >> * If write spans object boundary, just do multiple writes. (For a >> * correct atomic write, we should e.g. take write locks on all >> * objects, rollback on failure, etc.) >> */ >> -static ssize_t ceph_sync_write(struct file *file, const char __user *data, >> - size_t left, loff_t pos, loff_t *ppos) >> +static ssize_t >> +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, >> + unsigned long nr_segs, size_t count) >> { >> + struct file *file = iocb->ki_filp; >> struct inode *inode = file_inode(file); >> struct ceph_inode_info *ci = ceph_inode(inode); >> struct ceph_fs_client *fsc = ceph_inode_to_client(inode); >> struct ceph_snap_context *snapc; >> struct ceph_vino vino; >> struct ceph_osd_request *req; >> - int num_ops = 1; >> struct page **pages; >> int num_pages; >> - u64 len; >> int written = 0; >> int flags; >> int check_caps = 0; >> - int page_align, io_align; >> - unsigned long buf_align; >> + int page_align; >> int ret; >> struct timespec mtime = CURRENT_TIME; >> - bool own_pages = false; >> + loff_t pos = iocb->ki_pos; >> + struct iov_iter i; >> >> if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) >> return -EROFS; >> >> - dout("sync_write on file %p %lld~%u %s\n", file, pos, >> - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); >> + dout("sync_direct_write on file %p %lld~%u\n", file, pos, >> + (unsigned)count); >> >> - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); >> + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); >> if (ret < 0) >> return ret; >> >> ret = invalidate_inode_pages2_range(inode->i_mapping, >> pos >> PAGE_CACHE_SHIFT, >> - (pos + left) >> PAGE_CACHE_SHIFT); >> + (pos + count) >> PAGE_CACHE_SHIFT); >> if (ret < 0) >> dout("invalidate_inode_pages2_range returned %d\n", ret); >> >> flags = CEPH_OSD_FLAG_ORDERSNAP | >> CEPH_OSD_FLAG_ONDISK | >> CEPH_OSD_FLAG_WRITE; >> - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) >> - flags |= CEPH_OSD_FLAG_ACK; >> - else >> - num_ops++; /* Also include a 'startsync' command. */ >> >> - /* >> - * we may need to do multiple writes here if we span an object >> - * boundary. this isn't atomic, unfortunately. :( >> - */ >> -more: >> - io_align = pos & ~PAGE_MASK; >> - buf_align = (unsigned long)data & ~PAGE_MASK; >> - len = left; >> - >> - snapc = ci->i_snap_realm->cached_context; >> - vino = ceph_vino(inode); >> - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> - vino, pos, &len, num_ops, >> - CEPH_OSD_OP_WRITE, flags, snapc, >> - ci->i_truncate_seq, ci->i_truncate_size, >> - false); >> - if (IS_ERR(req)) >> - return PTR_ERR(req); >> + iov_iter_init(&i, iov, nr_segs, count, 0); >> + >> + while (iov_iter_count(&i) > 0) { >> + void __user *data = i.iov->iov_base + i.iov_offset; >> + u64 len = i.iov->iov_len - i.iov_offset; >> + >> + page_align = (unsigned long)data & ~PAGE_MASK; >> + >> + snapc = ci->i_snap_realm->cached_context; >> + vino = ceph_vino(inode); >> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> + vino, pos, &len, >> + 2,/*include a 'startsync' command*/ >> + CEPH_OSD_OP_WRITE, flags, snapc, >> + ci->i_truncate_seq, >> + ci->i_truncate_size, >> + false); >> + if (IS_ERR(req)) { >> + ret = PTR_ERR(req); >> + goto out; >> + } >> >> - /* write from beginning of first page, regardless of io alignment */ >> - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; >> - num_pages = calc_pages_for(page_align, len); >> - if (file->f_flags & O_DIRECT) { >> + num_pages = calc_pages_for(page_align, len); >> pages = ceph_get_direct_page_vector(data, num_pages, false); >> if (IS_ERR(pages)) { >> ret = PTR_ERR(pages); >> @@ -577,60 +573,175 @@ more: >> * may block. >> */ >> truncate_inode_pages_range(inode->i_mapping, pos, >> - (pos+len) | (PAGE_CACHE_SIZE-1)); >> - } else { >> + (pos+len) | (PAGE_CACHE_SIZE-1)); >> + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, >> + false, false); >> + >> + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> + >> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> + if (!ret) >> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> + >> + ceph_put_page_vector(pages, num_pages, false); >> + >> +out: >> + ceph_osdc_put_request(req); >> + if (ret == 0) { >> + pos += len; >> + written += len; >> + iov_iter_advance(&i, (size_t)len); >> + >> + if (pos > i_size_read(inode)) { >> + check_caps = ceph_inode_set_size(inode, pos); >> + if (check_caps) >> + ceph_check_caps(ceph_inode(inode), >> + CHECK_CAPS_AUTHONLY, >> + NULL); >> + } >> + } else >> + break; >> + } >> + >> + if (ret != -EOLDSNAPC && written > 0) { >> + iocb->ki_pos = pos; >> + ret = written; >> + } >> + return ret; >> +} >> + >> + >> +/* >> + * Synchronous write, straight from __user pointer or user pages. >> + * >> + * If write spans object boundary, just do multiple writes. (For a >> + * correct atomic write, we should e.g. take write locks on all >> + * objects, rollback on failure, etc.) >> + */ >> +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, >> + unsigned long nr_segs, size_t count) >> +{ >> + struct file *file = iocb->ki_filp; >> + struct inode *inode = file_inode(file); >> + struct ceph_inode_info *ci = ceph_inode(inode); >> + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); >> + struct ceph_snap_context *snapc; >> + struct ceph_vino vino; >> + struct ceph_osd_request *req; >> + struct page **pages; >> + u64 len; >> + int num_pages; >> + int written = 0; >> + int flags; >> + int check_caps = 0; >> + int ret; >> + struct timespec mtime = CURRENT_TIME; >> + loff_t pos = iocb->ki_pos; >> + struct iov_iter i; >> + >> + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) >> + return -EROFS; >> + >> + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); >> + >> + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); >> + if (ret < 0) >> + return ret; >> + >> + ret = invalidate_inode_pages2_range(inode->i_mapping, >> + pos >> PAGE_CACHE_SHIFT, >> + (pos + count) >> PAGE_CACHE_SHIFT); >> + if (ret < 0) >> + dout("invalidate_inode_pages2_range returned %d\n", ret); >> + >> + flags = CEPH_OSD_FLAG_ORDERSNAP | >> + CEPH_OSD_FLAG_ONDISK | >> + CEPH_OSD_FLAG_WRITE | >> + CEPH_OSD_FLAG_ACK; >> + >> + iov_iter_init(&i, iov, nr_segs, count, 0); >> + >> + while ((len = iov_iter_count(&i)) > 0) { >> + size_t left; >> + int n; >> + >> + snapc = ci->i_snap_realm->cached_context; >> + vino = ceph_vino(inode); >> + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, >> + vino, pos, &len, 1, >> + CEPH_OSD_OP_WRITE, flags, snapc, >> + ci->i_truncate_seq, >> + ci->i_truncate_size, >> + false); >> + if (IS_ERR(req)) { >> + ret = PTR_ERR(req); >> + goto out; >> + } >> + >> + /* >> + * write from beginning of first page, >> + * regardless of io alignment >> + */ >> + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; >> + >> pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); >> if (IS_ERR(pages)) { >> ret = PTR_ERR(pages); >> goto out; >> } >> - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); >> + >> + left = len; >> + for (n = 0; n < num_pages; n++) { >> + size_t plen = min(left, PAGE_SIZE); >> + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); >> + if (ret != plen) { >> + ret = -EFAULT; >> + break; >> + } >> + left -= ret; >> + iov_iter_advance(&i, ret); >> + } >> + >> if (ret < 0) { >> ceph_release_page_vector(pages, num_pages); >> goto out; >> } >> >> - if ((file->f_flags & O_SYNC) == 0) { >> - /* get a second commit callback */ >> - req->r_unsafe_callback = ceph_sync_write_unsafe; >> - req->r_inode = inode; >> - own_pages = true; >> - } >> - } >> - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, >> - false, own_pages); >> + /* get a second commit callback */ >> + req->r_unsafe_callback = ceph_sync_write_unsafe; >> + req->r_inode = inode; >> >> - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, >> + false, true); >> >> - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> - if (!ret) >> - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ >> + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); >> >> - if (file->f_flags & O_DIRECT) >> - ceph_put_page_vector(pages, num_pages, false); >> - else if (file->f_flags & O_SYNC) >> - ceph_release_page_vector(pages, num_pages); >> + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); >> + if (!ret) >> + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); >> >> out: >> - ceph_osdc_put_request(req); >> - if (ret == 0) { >> - pos += len; >> - written += len; >> - left -= len; >> - data += len; >> - if (left) >> - goto more; >> + ceph_osdc_put_request(req); >> + if (ret == 0) { >> + pos += len; >> + written += len; >> + >> + if (pos > i_size_read(inode)) { >> + check_caps = ceph_inode_set_size(inode, pos); >> + if (check_caps) >> + ceph_check_caps(ceph_inode(inode), >> + CHECK_CAPS_AUTHONLY, >> + NULL); >> + } >> + } else >> + break; >> + } >> >> + if (ret != -EOLDSNAPC && written > 0) { >> ret = written; >> - *ppos = pos; >> - if (pos > i_size_read(inode)) >> - check_caps = ceph_inode_set_size(inode, pos); >> - if (check_caps) >> - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, >> - NULL); >> - } else if (ret != -EOLDSNAPC && written > 0) { >> - ret = written; >> + iocb->ki_pos = pos; >> } >> return ret; >> } >> @@ -772,11 +883,13 @@ retry_snap: >> inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); >> >> if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || >> - (iocb->ki_filp->f_flags & O_DIRECT) || >> - (fi->flags & CEPH_F_SYNC)) { >> + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { >> mutex_unlock(&inode->i_mutex); >> - written = ceph_sync_write(file, iov->iov_base, count, >> - pos, &iocb->ki_pos); >> + if (file->f_flags & O_DIRECT) >> + written = ceph_sync_direct_write(iocb, iov, >> + nr_segs, count); >> + else >> + written = ceph_sync_write(iocb, iov, nr_segs, count); >> if (written == -EOLDSNAPC) { >> dout("aio_write %p %llx.%llx %llu~%u" >> "got EOLDSNAPC, retrying\n", >> >
diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3de8982..5cf034e 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } + /* - * Synchronous write, straight from __user pointer or user pages (if - * O_DIRECT). + * Synchronous write, straight from __user pointer or user pages. * * If write spans object boundary, just do multiple writes. (For a * correct atomic write, we should e.g. take write locks on all * objects, rollback on failure, etc.) */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t pos, loff_t *ppos) +static ssize_t +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; - int num_ops = 1; struct page **pages; int num_pages; - u64 len; int written = 0; int flags; int check_caps = 0; - int page_align, io_align; - unsigned long buf_align; + int page_align; int ret; struct timespec mtime = CURRENT_TIME; - bool own_pages = false; + loff_t pos = iocb->ki_pos; + struct iov_iter i; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, pos, - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); + dout("sync_direct_write on file %p %lld~%u\n", file, pos, + (unsigned)count); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_CACHE_SHIFT, - (pos + left) >> PAGE_CACHE_SHIFT); + (pos + count) >> PAGE_CACHE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_FLAG_ACK; - else - num_ops++; /* Also include a 'startsync' command. */ - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ -more: - io_align = pos & ~PAGE_MASK; - buf_align = (unsigned long)data & ~PAGE_MASK; - len = left; - - snapc = ci->i_snap_realm->cached_context; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, ci->i_truncate_size, - false); - if (IS_ERR(req)) - return PTR_ERR(req); + iov_iter_init(&i, iov, nr_segs, count, 0); + + while (iov_iter_count(&i) > 0) { + void __user *data = i.iov->iov_base + i.iov_offset; + u64 len = i.iov->iov_len - i.iov_offset; + + page_align = (unsigned long)data & ~PAGE_MASK; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, + 2,/*include a 'startsync' command*/ + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); - if (file->f_flags & O_DIRECT) { + num_pages = calc_pages_for(page_align, len); pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -577,60 +573,175 @@ more: * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_CACHE_SIZE-1)); - } else { + (pos+len) | (PAGE_CACHE_SIZE-1)); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, false); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + ceph_put_page_vector(pages, num_pages, false); + +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + iov_iter_advance(&i, (size_t)len); + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + + if (ret != -EOLDSNAPC && written > 0) { + iocb->ki_pos = pos; + ret = written; + } + return ret; +} + + +/* + * Synchronous write, straight from __user pointer or user pages. + * + * If write spans object boundary, just do multiple writes. (For a + * correct atomic write, we should e.g. take write locks on all + * objects, rollback on failure, etc.) + */ +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file_inode(file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; + struct ceph_osd_request *req; + struct page **pages; + u64 len; + int num_pages; + int written = 0; + int flags; + int check_caps = 0; + int ret; + struct timespec mtime = CURRENT_TIME; + loff_t pos = iocb->ki_pos; + struct iov_iter i; + + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + return -EROFS; + + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); + + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + if (ret < 0) + return ret; + + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); + + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK; + + iov_iter_init(&i, iov, nr_segs, count, 0); + + while ((len = iov_iter_count(&i)) > 0) { + size_t left; + int n; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, 1, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + /* + * write from beginning of first page, + * regardless of io alignment + */ + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; } - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); + + left = len; + for (n = 0; n < num_pages; n++) { + size_t plen = min(left, PAGE_SIZE); + ret = iov_iter_copy_from_user(pages[n], &i, 0, plen); + if (ret != plen) { + ret = -EFAULT; + break; + } + left -= ret; + iov_iter_advance(&i, ret); + } + if (ret < 0) { ceph_release_page_vector(pages, num_pages); goto out; } - if ((file->f_flags & O_SYNC) == 0) { - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; - req->r_inode = inode; - own_pages = true; - } - } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, - false, own_pages); + /* get a second commit callback */ + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + false, true); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, false); - else if (file->f_flags & O_SYNC) - ceph_release_page_vector(pages, num_pages); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); out: - ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - left -= len; - data += len; - if (left) - goto more; + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + + if (pos > i_size_read(inode)) { + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } + } else + break; + } + if (ret != -EOLDSNAPC && written > 0) { ret = written; - *ppos = pos; - if (pos > i_size_read(inode)) - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, - NULL); - } else if (ret != -EOLDSNAPC && written > 0) { - ret = written; + iocb->ki_pos = pos; } return ret; } @@ -772,11 +883,13 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) { + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); - written = ceph_sync_write(file, iov->iov_base, count, - pos, &iocb->ki_pos); + if (file->f_flags & O_DIRECT) + written = ceph_sync_direct_write(iocb, iov, + nr_segs, count); + else + written = ceph_sync_write(iocb, iov, nr_segs, count); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n",