From patchwork Tue Sep 10 02:04:32 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: majianpeng X-Patchwork-Id: 2863681 Return-Path: X-Original-To: patchwork-ceph-devel@patchwork.kernel.org Delivered-To: patchwork-parsemail@patchwork1.web.kernel.org Received: from mail.kernel.org (mail.kernel.org [198.145.19.201]) by patchwork1.web.kernel.org (Postfix) with ESMTP id D1B4F9F495 for ; Tue, 10 Sep 2013 02:04:58 +0000 (UTC) Received: from mail.kernel.org (localhost [127.0.0.1]) by mail.kernel.org (Postfix) with ESMTP id 961DB20263 for ; Tue, 10 Sep 2013 02:04:57 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 6A3522025D for ; Tue, 10 Sep 2013 02:04:56 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1751955Ab3IJCEz (ORCPT ); Mon, 9 Sep 2013 22:04:55 -0400 Received: from mail-pa0-f42.google.com ([209.85.220.42]:57030 "EHLO mail-pa0-f42.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751645Ab3IJCEz (ORCPT ); Mon, 9 Sep 2013 22:04:55 -0400 Received: by mail-pa0-f42.google.com with SMTP id lj1so7088945pab.1 for ; Mon, 09 Sep 2013 19:04:54 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=date:from:to:cc:reply-to:subject:mime-version:message-id :content-type:content-transfer-encoding; bh=Vf0DCTefFLqudIU/fq8F+y5vJ2/8qPnVY65yZY4nar0=; b=swaMKHKmHojeFROkXVYeysf9P0KGJexewZWbVXqbhFMDR1Wbz0OjFgMkvhVI7QDgI/ dkORGRy2DiyFcuwVKqseMT0GYNiz+Bx2RohhOJjVMjGJDWFliz/vMuPkjMH7ohbQceOB FXnDmKCRZE3o4fzSROy84TEEDz8PhiP6HRPoBZo42LgKdDpVRslmxRvtZ3lDLEJzljUi x5IUFOZmsweLIVdjK7IqGTPnY6tuhTnij737jv6w/+3IN5O/ZWxaU5X0nxWbjj/8s72d p17PrdfbUiuCp18MFhGiu5RGCO6K2CM1ONEkNEdRSfQWMuN+zWMrzu37AS/7pp+OKGRc HneA== X-Received: by 10.66.250.138 with SMTP id zc10mr23751984pac.72.1378778694468; Mon, 09 Sep 2013 19:04:54 -0700 (PDT) Received: from majianpeng ([218.242.10.182]) by mx.google.com with ESMTPSA id ef10sm20912091pac.1.1969.12.31.16.00.00 (version=TLSv1 cipher=RC4-SHA bits=128/128); Mon, 09 Sep 2013 19:04:53 -0700 (PDT) Date: Tue, 10 Sep 2013 10:04:32 +0800 From: majianpeng To: sage Cc: =?us-ascii?B?WWFuLCBaaGVuZw==?= , ceph-devel Reply-To: majianpeng Subject: [PATCH V3 2/2] ceph: Implement writev/pwritev for sync operation. X-Priority: 3 X-GUID: 2078BD57-DCE2-4170-9301-E63704138073 X-Has-Attach: no X-Mailer: Foxmail 7.0.1.93[cn] Mime-Version: 1.0 Message-ID: <201309101004298139785@gmail.com> Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Spam-Status: No, score=-6.0 required=5.0 tests=BAYES_00, DKIM_ADSP_CUSTOM_MED, DKIM_SIGNED, FREEMAIL_FROM, MIME_BASE64_TEXT, RCVD_IN_DNSWL_HI, RP_MATCHES_RCVD, T_DKIM_INVALID,UNPARSEABLE_RELAY autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on mail.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP For writev/pwritev sync-operatoin, ceph only do the first iov. It don't think other iovs.Now implement this. I divided the write-sync-operation into two functions.One for direct-write,other for none-direct-sync-write.This is because for none-direct-sync-write we can merge iovs to one.But for direct-write, we can't merge iovs. V2: -using struct iov_iter replace clone iovs in ceph_sync_write. Signed-off-by: Jianpeng Ma Reviewed-by: Yan, Zheng --- fs/ceph/file.c | 314 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 234 insertions(+), 80 deletions(-) -- 1.8.1.2 diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 1c28c52..8dcde64 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -547,17 +547,19 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } } + /* - * Synchronous write, straight from __user pointer or user pages (if - * O_DIRECT). + * Synchronous write, straight from __user pointer or user pages. * * If write spans object boundary, just do multiple writes. (For a * correct atomic write, we should e.g. take write locks on all * objects, rollback on failure, etc.) */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t left, loff_t pos, loff_t *ppos) +static ssize_t +ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) { + struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); @@ -571,59 +573,55 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, int written = 0; int flags; int check_caps = 0; - int page_align, io_align; - unsigned long buf_align; - int ret; + int page_align; + int ret, i; struct timespec mtime = CURRENT_TIME; - bool own_pages = false; + loff_t pos = iocb->ki_pos; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_write on file %p %lld~%u %s\n", file, pos, - (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); + dout("sync_direct_write on file %p %lld~%u\n", file, pos, + (unsigned)count); - ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left); + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; ret = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_CACHE_SHIFT, - (pos + left) >> PAGE_CACHE_SHIFT); + (pos + count) >> PAGE_CACHE_SHIFT); if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_FLAG_ACK; - else - num_ops++; /* Also include a 'startsync' command. */ + num_ops++; /* Also include a 'startsync' command. */ - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ -more: - io_align = pos & ~PAGE_MASK; - buf_align = (unsigned long)data & ~PAGE_MASK; - len = left; + for (i = 0; i < nr_segs && count; i++) { + void __user *data = iov[i].iov_base; + size_t left; - snapc = ci->i_snap_realm->cached_context; - vino = ceph_vino(inode); - req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, - CEPH_OSD_OP_WRITE, flags, snapc, - ci->i_truncate_seq, ci->i_truncate_size, - false); - if (IS_ERR(req)) - return PTR_ERR(req); + left = min(count, iov[i].iov_len); +more: + page_align = (unsigned long)data & ~PAGE_MASK; + len = left; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, num_ops, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); - if (file->f_flags & O_DIRECT) { + num_pages = calc_pages_for(page_align, len); pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -635,61 +633,215 @@ more: * may block. */ truncate_inode_pages_range(inode->i_mapping, pos, - (pos+len) | (PAGE_CACHE_SIZE-1)); - } else { + (pos+len) | (PAGE_CACHE_SIZE-1)); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, false); + + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + + ceph_put_page_vector(pages, num_pages, false); + +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + left -= len; + count -= len; + data += len; + if (left) + goto more; + + ret = written; + if (pos > i_size_read(inode)) + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } else { + if (ret != -EOLDSNAPC && written > 0) + ret = written; + break; + } + } + + if (ret > 0) + iocb->ki_pos = pos; + return ret; +} + + +/* + * Synchronous write, straight from __user pointer or user pages. + * + * If write spans object boundary, just do multiple writes. (For a + * correct atomic write, we should e.g. take write locks on all + * objects, rollback on failure, etc.) + */ +static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov, + unsigned long nr_segs, size_t count) +{ + struct file *file = iocb->ki_filp; + struct inode *inode = file_inode(file); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; + struct ceph_osd_request *req; + int num_ops = 1; + struct page **pages; + int num_pages; + u64 len; + int written = 0; + int flags; + int check_caps = 0; + int ret; + struct timespec mtime = CURRENT_TIME; + loff_t pos = iocb->ki_pos; + struct iov_iter i; + + if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + return -EROFS; + + dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count); + + ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); + if (ret < 0) + return ret; + + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); + + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK; + + iov_iter_init(&i, iov, nr_segs, count, 0); + + while (i.count) { + void __user *data; + size_t left; + + left = i.count; +more: + len = left; + + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, pos, &len, num_ops, + CEPH_OSD_OP_WRITE, flags, snapc, + ci->i_truncate_seq, + ci->i_truncate_size, + false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out; + } + + /* + * write from beginning of first page, + * regardless of io alignment + */ + num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; } - ret = ceph_copy_user_to_page_vector(pages, data, pos, len); + if (len <= i.iov[0].iov_len - i.iov_offset) { + data = i.iov[0].iov_base + i.iov_offset; + ret = ceph_copy_user_to_page_vector(pages, + data, 0, len); + + if (ret > 0) + iov_iter_advance(&i, ret); + } else { + int l, k = 0, copyed = 0; + size_t tmp = len; + + while (tmp) { + data = i.iov[0].iov_base + i.iov_offset; + l = i.iov[0].iov_len - i.iov_offset; + + if (tmp < l) { + ret = ceph_copy_user_to_page_vector(&pages[k], + data, + copyed, + tmp); + if (ret > 0) + iov_iter_advance(&i, ret); + break; + } else if (l) { + ret = ceph_copy_user_to_page_vector(&pages[k], + data, + copyed, + l); + if (ret < 0) + break; + iov_iter_advance(&i, ret); + copyed += ret; + tmp -= ret; + k = calc_pages_for(0, copyed + 1) - 1; + } + } + } + if (ret < 0) { ceph_release_page_vector(pages, num_pages); goto out; } - if ((file->f_flags & O_SYNC) == 0) { - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; - req->r_inode = inode; - own_pages = true; - } - } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, - false, own_pages); + /* get a second commit callback */ + req->r_unsafe_callback = ceph_sync_write_unsafe; + req->r_inode = inode; - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, + false, true); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); - if (!ret) - ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (file->f_flags & O_DIRECT) - ceph_put_page_vector(pages, num_pages, false); - else if (file->f_flags & O_SYNC) - ceph_release_page_vector(pages, num_pages); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(&fsc->client->osdc, req); out: - ceph_osdc_put_request(req); - if (ret == 0) { - pos += len; - written += len; - left -= len; - data += len; - if (left) - goto more; - - ret = written; - *ppos = pos; - if (pos > i_size_read(inode)) - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) - ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, - NULL); - } else if (ret != -EOLDSNAPC && written > 0) { - ret = written; + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + left -= len; + if (left) + goto more; + + ret = written; + if (pos > i_size_read(inode)) + check_caps = ceph_inode_set_size(inode, pos); + if (check_caps) + ceph_check_caps(ceph_inode(inode), + CHECK_CAPS_AUTHONLY, + NULL); + } else { + if (ret != -EOLDSNAPC && written > 0) + ret = written; + break; + } } + + if (ret > 0) + iocb->ki_pos = pos; return ret; } @@ -844,11 +996,13 @@ retry_snap: inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || - (iocb->ki_filp->f_flags & O_DIRECT) || - (fi->flags & CEPH_F_SYNC)) { + (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) { mutex_unlock(&inode->i_mutex); - written = ceph_sync_write(file, iov->iov_base, count, - pos, &iocb->ki_pos); + if (file->f_flags & O_DIRECT) + written = ceph_sync_direct_write(iocb, iov, + nr_segs, count); + else + written = ceph_sync_write(iocb, iov, nr_segs, count); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n",