From patchwork Wed Sep 25 09:07:23 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160211 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 6110A13BD for ; Wed, 25 Sep 2019 09:08:46 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 3E20B20673 for ; Wed, 25 Sep 2019 09:08:46 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1729091AbfIYJIm (ORCPT ); Wed, 25 Sep 2019 05:08:42 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21145 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1726882AbfIYJIV (ORCPT ); Wed, 25 Sep 2019 05:08:21 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S3; Wed, 25 Sep 2019 17:07:37 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 01/12] libceph: introduce ceph_extract_encoded_string_kvmalloc Date: Wed, 25 Sep 2019 09:07:23 +0000 Message-Id: <1569402454-4736-2-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S3 X-Coremail-Antispam: 1Uf129KBjDUn29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73 VFW2AGmfu7bjvjm3AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJTGQUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbigBk7elrpOVmtIQAAsH Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When we are going to extract the encoded string, there would be a large string encoded. Especially in the journaling case, if we use the default journal object size, 16M, there could be a 16M string encoded in this object. Signed-off-by: Dongsheng Yang --- include/linux/ceph/decode.h | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h index 450384f..555879f 100644 --- a/include/linux/ceph/decode.h +++ b/include/linux/ceph/decode.h @@ -104,8 +104,11 @@ static inline bool ceph_has_room(void **p, void *end, size_t n) * beyond the "end" pointer provided (-ERANGE) * - memory could not be allocated for the result (-ENOMEM) */ -static inline char *ceph_extract_encoded_string(void **p, void *end, - size_t *lenp, gfp_t gfp) +typedef void * (mem_alloc_t)(size_t size, gfp_t flags); +extern void *ceph_kvmalloc(size_t size, gfp_t flags); +static inline char *extract_encoded_string(void **p, void *end, + mem_alloc_t alloc_fn, + size_t *lenp, gfp_t gfp) { u32 len; void *sp = *p; @@ -115,7 +118,7 @@ static inline char *ceph_extract_encoded_string(void **p, void *end, if (!ceph_has_room(&sp, end, len)) goto bad; - buf = kmalloc(len + 1, gfp); + buf = alloc_fn(len + 1, gfp); if (!buf) return ERR_PTR(-ENOMEM); @@ -133,6 +136,18 @@ static inline char *ceph_extract_encoded_string(void **p, void *end, return ERR_PTR(-ERANGE); } +static inline char *ceph_extract_encoded_string(void **p, void *end, + size_t *lenp, gfp_t gfp) +{ + return extract_encoded_string(p, end, kmalloc, lenp, gfp); +} + +static inline char *ceph_extract_encoded_string_kvmalloc(void **p, void *end, + size_t *lenp, gfp_t gfp) +{ + return extract_encoded_string(p, end, ceph_kvmalloc, lenp, gfp); +} + /* * skip helpers */ From patchwork Wed Sep 25 09:07:24 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160209 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 4206113BD for ; Wed, 25 Sep 2019 09:08:42 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 2AAF920673 for ; Wed, 25 Sep 2019 09:08:42 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728882AbfIYJIa (ORCPT ); Wed, 25 Sep 2019 05:08:30 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21171 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728070AbfIYJIM (ORCPT ); Wed, 25 Sep 2019 05:08:12 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S4; Wed, 25 Sep 2019 17:07:37 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 02/12] libceph: introduce a new parameter of workqueue in ceph_osdc_watch() Date: Wed, 25 Sep 2019 09:07:24 +0000 Message-Id: <1569402454-4736-3-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S4 X-Coremail-Antispam: 1Uf129KBjvJXoWxCr1ktFWfJw48Jw4xZw4fKrg_yoW5Ar18pa y3Cw17Aay8Jr47WanxAasavrsYg3ykuFy7Kryjk34akFnIqFZIqF1kKFyYvFy7XFyfGaya vF1jyrZxGa1jy3DanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0J1b1CLUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiWxk7elf4pct4ywAAsx Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org Currently, if we share osdc in rbd device and journaling, they are sharing the notify_wq in osdc to complete watch_cb. When we need to close journal held with mutex of rbd device, we need to flush the notify_wq. But we don't want to flush the watch_cb of rbd_device, maybe some of it need to lock rbd mutex. To solve this problem, this patch allow user to manage the notify workqueue by themselves in watching. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 2 +- include/linux/ceph/osd_client.h | 2 ++ net/ceph/osd_client.c | 8 +++++++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ce9ea83..f1fc28d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4569,7 +4569,7 @@ static int __rbd_register_watch(struct rbd_device *rbd_dev) dout("%s rbd_dev %p\n", __func__, rbd_dev); handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, - &rbd_dev->header_oloc, rbd_watch_cb, + &rbd_dev->header_oloc, NULL, rbd_watch_cb, rbd_watch_errcb, rbd_dev); if (IS_ERR(handle)) return PTR_ERR(handle); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index ad7fe5d..9a4533a 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -282,6 +282,7 @@ struct ceph_osd_linger_request { rados_watcherrcb_t errcb; void *data; + struct workqueue_struct *wq; struct page ***preply_pages; size_t *preply_len; }; @@ -539,6 +540,7 @@ struct ceph_osd_linger_request * ceph_osdc_watch(struct ceph_osd_client *osdc, struct ceph_object_id *oid, struct ceph_object_locator *oloc, + struct workqueue_struct *wq, rados_watchcb2_t wcb, rados_watcherrcb_t errcb, void *data); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0b2df09..62d2f54 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2842,7 +2842,10 @@ static void lwork_queue(struct linger_work *lwork) lwork->queued_stamp = jiffies; list_add_tail(&lwork->pending_item, &lreq->pending_lworks); - queue_work(osdc->notify_wq, &lwork->work); + if (lreq->wq) + queue_work(lreq->wq, &lwork->work); + else + queue_work(osdc->notify_wq, &lwork->work); } static void do_watch_notify(struct work_struct *w) @@ -4595,6 +4598,7 @@ struct ceph_osd_linger_request * ceph_osdc_watch(struct ceph_osd_client *osdc, struct ceph_object_id *oid, struct ceph_object_locator *oloc, + struct workqueue_struct *wq, rados_watchcb2_t wcb, rados_watcherrcb_t errcb, void *data) @@ -4610,6 +4614,8 @@ struct ceph_osd_linger_request * lreq->wcb = wcb; lreq->errcb = errcb; lreq->data = data; + if (wq) + lreq->wq = wq; lreq->watch_valid_thru = jiffies; ceph_oid_copy(&lreq->t.base_oid, oid); From patchwork Wed Sep 25 09:07:25 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160213 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 538C614DB for ; Wed, 25 Sep 2019 09:09:13 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 3BC2020673 for ; Wed, 25 Sep 2019 09:09:13 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728960AbfIYJJG (ORCPT ); Wed, 25 Sep 2019 05:09:06 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21021 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728260AbfIYJI0 (ORCPT ); Wed, 25 Sep 2019 05:08:26 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S5; Wed, 25 Sep 2019 17:07:37 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 03/12] libceph: support op append Date: Wed, 25 Sep 2019 09:07:25 +0000 Message-Id: <1569402454-4736-4-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S5 X-Coremail-Antispam: 1Uf129KBjvJXoWxXF47XF45GrWrtry5Kr4kZwb_yoWrJFWkpF ZrA3yjyFW3Ja4xZFs7WFZ5t3yrJ3yvyF42qrWDKrs3Can3Jry8Z3Z8Xr9Fgr1UZF4Fg348 CF1Y9r90qw1SvrDanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0J1b1CLUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbifhk7elrpOTHH-AAAsm Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org we need to send append operation when we want to support journaling in kernel client. Signed-off-by: Dongsheng Yang --- net/ceph/osd_client.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 62d2f54..336e1c3 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -378,6 +378,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -694,6 +695,7 @@ static void get_num_data_items(struct ceph_osd_request *req, /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_NOTIFY_ACK: @@ -779,13 +781,14 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && - opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_APPEND); op->extent.offset = offset; op->extent.length = length; op->extent.truncate_size = truncate_size; op->extent.truncate_seq = truncate_seq; - if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) + if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL || + opcode == CEPH_OSD_OP_APPEND) payload_len += length; op->indata_len = payload_len; @@ -807,7 +810,8 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req, BUG_ON(length > previous); op->extent.length = length; - if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL || + op->op == CEPH_OSD_OP_APPEND) op->indata_len -= previous - length; } EXPORT_SYMBOL(osd_req_op_extent_update); @@ -829,7 +833,8 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, op->extent.offset += offset_inc; op->extent.length -= offset_inc; - if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL || + op->op == CEPH_OSD_OP_APPEND) op->indata_len -= offset_inc; } EXPORT_SYMBOL(osd_req_op_extent_dup_last); @@ -969,6 +974,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: dst->extent.offset = cpu_to_le64(src->extent.offset); @@ -1062,7 +1068,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && - opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); + opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE && + opcode != CEPH_OSD_OP_APPEND); req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); @@ -1936,6 +1943,7 @@ static void setup_request_data(struct ceph_osd_request *req) /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_APPEND: WARN_ON(op->indata_len != op->extent.length); ceph_osdc_msg_data_add(request_msg, &op->extent.osd_data); From patchwork Wed Sep 25 09:07:26 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160231 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 8119F13BD for ; Wed, 25 Sep 2019 09:09:23 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 697DB20673 for ; Wed, 25 Sep 2019 09:09:23 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728921AbfIYJIv (ORCPT ); Wed, 25 Sep 2019 05:08:51 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21219 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728441AbfIYJIa (ORCPT ); Wed, 25 Sep 2019 05:08:30 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S6; Wed, 25 Sep 2019 17:07:37 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 04/12] libceph: add prefix and suffix in ceph_osd_req_op.extent Date: Wed, 25 Sep 2019 09:07:26 +0000 Message-Id: <1569402454-4736-5-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S6 X-Coremail-Antispam: 1Uf129KBjvJXoWxAFWftw4ftw4DAr1rAF43Jrb_yoWrCw15pF ZrCa1Yy3yDX34xW3y2qayrurnIgr18AFW2gry7G3WfGan3JFW0vF1DtF9aqr17WFs7Wryq yF4jgFW5W3W2vrJanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0J1b1CLUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiKBo7elz4rC4UNQAAs0 Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When we are going to support rbd journaling, we need a prefix and suffix of ceph_osd_req_op.extent for append op. Signed-off-by: Dongsheng Yang --- include/linux/ceph/osd_client.h | 19 +++++++++++++++++++ net/ceph/osd_client.c | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 9a4533a..1a3f8e1 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -97,7 +97,15 @@ struct ceph_osd_req_op { u64 offset, length; u64 truncate_size; u32 truncate_seq; + /* + * In common case, extent only need + * one ceph_osd_data, extent.osd_data. + * But in journaling, we need a prefix + * and suffix in append op, + */ + struct ceph_osd_data prefix; struct ceph_osd_data osd_data; + struct ceph_osd_data suffix; } extent; struct { u32 name_len; @@ -442,6 +450,17 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, unsigned int which, struct ceph_bvec_iter *bvec_pos); + +extern void osd_req_op_extent_prefix_pages(struct ceph_osd_request *, + unsigned int which, + struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, + bool own_pages); +extern void osd_req_op_extent_suffix_pages(struct ceph_osd_request *, + unsigned int which, + struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, + bool own_pages); extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *, unsigned int which, struct ceph_pagelist *pagelist); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 336e1c3..296edd7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -264,6 +264,32 @@ void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); +void osd_req_op_extent_prefix_pages(struct ceph_osd_request *osd_req, + unsigned int which, struct page **pages, + u64 length, u32 alignment, + bool pages_from_pool, bool own_pages) +{ + struct ceph_osd_data *prefix; + + prefix = osd_req_op_data(osd_req, which, extent, prefix); + ceph_osd_data_pages_init(prefix, pages, length, alignment, + pages_from_pool, own_pages); +} +EXPORT_SYMBOL(osd_req_op_extent_prefix_pages); + +void osd_req_op_extent_suffix_pages(struct ceph_osd_request *osd_req, + unsigned int which, struct page **pages, + u64 length, u32 alignment, + bool pages_from_pool, bool own_pages) +{ + struct ceph_osd_data *suffix; + + suffix = osd_req_op_data(osd_req, which, extent, suffix); + ceph_osd_data_pages_init(suffix, pages, length, alignment, + pages_from_pool, own_pages); +} +EXPORT_SYMBOL(osd_req_op_extent_suffix_pages); + static void osd_req_op_cls_request_info_pagelist( struct ceph_osd_request *osd_req, unsigned int which, struct ceph_pagelist *pagelist) @@ -379,7 +405,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_APPEND: + ceph_osd_data_release(&op->extent.prefix); ceph_osd_data_release(&op->extent.osd_data); + ceph_osd_data_release(&op->extent.suffix); break; case CEPH_OSD_OP_CALL: ceph_osd_data_release(&op->cls.request_info); @@ -696,6 +724,8 @@ static void get_num_data_items(struct ceph_osd_request *req, case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_APPEND: + *num_request_data_items += 3; + break; case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_NOTIFY_ACK: @@ -1945,8 +1975,13 @@ static void setup_request_data(struct ceph_osd_request *req) case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_APPEND: WARN_ON(op->indata_len != op->extent.length); + /* extent.prefix and extent.suffix can be NONE */ + ceph_osdc_msg_data_add(request_msg, + &op->extent.prefix); ceph_osdc_msg_data_add(request_msg, &op->extent.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->extent.suffix); break; case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: From patchwork Wed Sep 25 09:07:27 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160215 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 68AD714DB for ; Wed, 25 Sep 2019 09:09:15 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 3D9E720673 for ; Wed, 25 Sep 2019 09:09:15 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728998AbfIYJJO (ORCPT ); Wed, 25 Sep 2019 05:09:14 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21282 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728504AbfIYJIi (ORCPT ); Wed, 25 Sep 2019 05:08:38 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S7; Wed, 25 Sep 2019 17:07:38 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 05/12] libceph: introduce cls_journal_client Date: Wed, 25 Sep 2019 09:07:27 +0000 Message-Id: <1569402454-4736-6-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S7 X-Coremail-Antispam: 1Uf129KBjvAXoWfJr15uw1UJw4DuFW3Ww13CFg_yoW8Ww1fWo W2yr45Gwn5GFyUCFWvkrn2gFWYgayrGF1rAr1YqF4DuanrZ34fJw17Ga13ta4fuF4ayrsr Kw4xJ3WfJw48J3W7n29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJ0JPDUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiWho7elf4pcjpDQAAsn Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org This is a cls client module for journaler. Signed-off-by: Dongsheng Yang --- include/linux/ceph/cls_journal_client.h | 84 +++++ net/ceph/cls_journal_client.c | 527 ++++++++++++++++++++++++++++++++ 2 files changed, 611 insertions(+) create mode 100644 include/linux/ceph/cls_journal_client.h create mode 100644 net/ceph/cls_journal_client.c diff --git a/include/linux/ceph/cls_journal_client.h b/include/linux/ceph/cls_journal_client.h new file mode 100644 index 0000000..05e0c42 --- /dev/null +++ b/include/linux/ceph/cls_journal_client.h @@ -0,0 +1,84 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _LINUX_CEPH_CLS_JOURNAL_CLIENT_H +#define _LINUX_CEPH_CLS_JOURNAL_CLIENT_H + +#include + +struct ceph_journaler; +struct ceph_journaler_client; + +struct ceph_journaler_object_pos { + struct list_head node; + u64 object_num; + u64 tag_tid; + u64 entry_tid; + /* + * ->in_using means this object_pos is initialized. + * There would be some stub for it created in init step + * to allocate memory as early as possible. + */ + bool in_using; +}; + +struct ceph_journaler_client { + struct list_head node; + size_t id_len; + char *id; + struct list_head object_positions; + struct ceph_journaler_object_pos *object_positions_array; +}; + +struct ceph_journaler_tag { + u64 tid; + u64 tag_class; +}; + +void destroy_client(struct ceph_journaler_client *client); + +int ceph_cls_journal_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u8 *order, u8 *splay_width, + int64_t *pool_id); + +int ceph_cls_journal_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 *minimum_set, u64 *active_set); + +int ceph_cls_journal_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct list_head *clients, u8 splay_width); + +int ceph_cls_journal_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 *tag_tid); + +int ceph_cls_journal_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, u64 tag_tid, + struct ceph_journaler_tag *tag); + +int ceph_cls_journal_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, u64 tag_tid, + u64 tag_class, void *buf, u32 buf_len); + +int ceph_cls_journal_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions); + +int ceph_cls_journal_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 active_set); + +int ceph_cls_journal_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 minimum_set); +#endif diff --git a/net/ceph/cls_journal_client.c b/net/ceph/cls_journal_client.c new file mode 100644 index 0000000..6c8ce48 --- /dev/null +++ b/net/ceph/cls_journal_client.c @@ -0,0 +1,527 @@ +// SPDX-License-Identifier: GPL-2.0 +#include +#include +#include + +#include + +/* max return of client_list to fit in 4KB */ +#define JOURNAL_CLIENT_MAX_RETURN 64 + +/* TODO get all metas in one single request */ +int ceph_cls_journal_get_immutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u8 *order, u8 *splay_width, + int64_t *pool_id) +{ + struct page *reply_page; + size_t reply_len = sizeof(*order); + void *p; + int ret; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + /* get order */ + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_order", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + if (ret) + goto out; + p = page_address(reply_page); + ceph_decode_8_safe(&p, p + reply_len, *order, bad); + + /* get splay_width */ + reply_len = sizeof(*splay_width); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_splay_width", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + if (ret) + goto out; + p = page_address(reply_page); + ceph_decode_8_safe(&p, p + reply_len, *splay_width, bad); + + /* get pool_id */ + reply_len = sizeof(*pool_id); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_pool_id", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + if (ret) + goto out; + p = page_address(reply_page); + ceph_decode_64_safe(&p, p + reply_len, *pool_id, bad); +out: + __free_page(reply_page); + return ret; +bad: + ret = -EINVAL; + goto out; +} +EXPORT_SYMBOL(ceph_cls_journal_get_immutable_metas); + +/* TODO get all metas in one single request */ +int ceph_cls_journal_get_mutable_metas(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 *minimum_set, u64 *active_set) +{ + struct page *reply_page; + size_t reply_len = sizeof(*minimum_set); + void *p; + int ret; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + /* get minimum_set */ + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_minimum_set", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + if (ret) + goto out; + p = page_address(reply_page); + ceph_decode_64_safe(&p, p + reply_len, *minimum_set, bad); + + /* get active_set */ + reply_len = sizeof(active_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_active_set", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + if (ret) + goto out; + p = page_address(reply_page); + ceph_decode_64_safe(&p, p + reply_len, *active_set, bad); +out: + __free_page(reply_page); + return ret; +bad: + ret = -EINVAL; + goto out; +} +EXPORT_SYMBOL(ceph_cls_journal_get_mutable_metas); + +static int decode_object_position(void **p, void *end, + struct ceph_journaler_object_pos *pos) +{ + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_object_position", + &struct_v, &struct_len); + if (ret) + return ret; + + ceph_decode_64_safe(p, end, pos->object_num, bad); + ceph_decode_64_safe(p, end, pos->tag_tid, bad); + ceph_decode_64_safe(p, end, pos->entry_tid, bad); + return ret; +bad: + return -EINVAL; +} + +void destroy_client(struct ceph_journaler_client *client) +{ + kfree(client->object_positions_array); + kfree(client->id); + + kfree(client); +} + +struct ceph_journaler_client *create_client(u8 splay_width) +{ + struct ceph_journaler_client *client; + struct ceph_journaler_object_pos *pos; + int i; + + client = kzalloc(sizeof(*client), GFP_NOIO); + if (!client) + return NULL; + + client->object_positions_array = + kcalloc(splay_width, sizeof(*pos), GFP_NOIO); + if (!client->object_positions_array) + goto free_client; + + INIT_LIST_HEAD(&client->object_positions); + for (i = 0; i < splay_width; i++) { + pos = &client->object_positions_array[i]; + INIT_LIST_HEAD(&pos->node); + list_add_tail(&pos->node, &client->object_positions); + } + INIT_LIST_HEAD(&client->node); + client->id = NULL; + + return client; +free_client: + kfree(client); + return NULL; +} + +static int decode_client(void **p, void *end, + struct ceph_journaler_client *client) +{ + u8 struct_v; + u32 struct_len; + struct ceph_journaler_object_pos *pos; + int ret, num, i = 0; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_get_client_reply", + &struct_v, &struct_len); + if (ret) + return ret; + + client->id = + ceph_extract_encoded_string(p, end, &client->id_len, GFP_NOIO); + if (IS_ERR(client->id)) { + ret = PTR_ERR(client->id); + client->id = NULL; + goto err; + } + + /* skip client->data */ + ceph_decode_skip_string(p, end, bad); + ret = ceph_start_decoding(p, end, 1, + "cls_joural_client_object_set_position", + &struct_v, &struct_len); + if (ret) + goto free_id; + num = ceph_decode_32(p); + list_for_each_entry(pos, &client->object_positions, node) { + if (i < num) { + /* we will use this position stub */ + pos->in_using = true; + ret = decode_object_position(p, end, pos); + if (ret) + goto free_id; + } else { + /* This stub is not used anymore */ + pos->in_using = false; + } + i++; + } + /* skip the state_raw */ + ceph_decode_skip_8(p, end, bad); + return 0; +bad: + ret = -EINVAL; +free_id: + kfree(client->id); +err: + return ret; +} + +static int decode_clients(void **p, void *end, struct list_head *clients, + u8 splay_width) +{ + struct ceph_journaler_client *client, *next; + u32 client_num; + int i = 0; + int ret; + + client_num = ceph_decode_32(p); + if (client_num >= JOURNAL_CLIENT_MAX_RETURN) { + /* + * JOURNAL_CLIENT_MAX_RETURN seems large enough currently. + * TODO: call client_list again for more clients. + */ + return -ERANGE; + } + + /* Reuse the clients already exist in list. */ + list_for_each_entry_safe(client, next, clients, node) { + /* Some clients unregistered. */ + if (i < client_num) { + kfree(client->id); + ret = decode_client(p, end, client); + if (ret) + return ret; + } else { + list_del(&client->node); + destroy_client(client); + } + i++; + } + /* Some more clients registered. */ + for (; i < client_num; i++) { + client = create_client(splay_width); + if (!client) + return -ENOMEM; + ret = decode_client(p, end, client); + if (ret) { + destroy_client(client); + return ret; + } + list_add_tail(&client->node, clients); + } + return 0; +} + +int ceph_cls_journal_client_list(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct list_head *clients, u8 splay_width) +{ + struct page *reply_page; + struct page *req_page; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + int ret; + + buf_size = sizeof(__le32) + sizeof(u64); + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) { + ret = -ENOMEM; + goto free_reply_page; + } + + p = page_address(req_page); + end = p + buf_size; + + /* encode "" */ + ceph_encode_32(&p, 0); + ceph_encode_64(&p, JOURNAL_CLIENT_MAX_RETURN); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_list", + CEPH_OSD_FLAG_READ, req_page, buf_size, + &reply_page, &reply_len); + + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_clients(&p, end, clients, splay_width); + } + + __free_page(req_page); +free_reply_page: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_client_list); + +int ceph_cls_journal_get_next_tag_tid(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 *tag_tid) +{ + struct page *reply_page; + size_t reply_len = PAGE_SIZE; + void *p; + int ret; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_next_tag_tid", + CEPH_OSD_FLAG_READ, NULL, 0, &reply_page, + &reply_len); + + if (!ret) { + p = page_address(reply_page); + ceph_decode_8_safe(&p, p + reply_len, *tag_tid, bad); + } +out: + __free_page(reply_page); + return ret; +bad: + ret = -EINVAL; + goto out; +} +EXPORT_SYMBOL(ceph_cls_journal_get_next_tag_tid); + +int ceph_cls_journal_tag_create(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, u64 tag_tid, + u64 tag_class, void *buf, u32 buf_len) +{ + struct page *req_page; + int buf_size; + void *p, *end; + int ret; + + buf_size = buf_len + sizeof(__le32) + sizeof(u64) + sizeof(u64); + if (buf_size > PAGE_SIZE) + return -E2BIG; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + end = p + buf_size; + + ceph_encode_64(&p, tag_tid); + ceph_encode_64(&p, tag_class); + ceph_encode_string(&p, end, buf, buf_len); + + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "tag_create", + CEPH_OSD_FLAG_WRITE, req_page, buf_size, NULL, + NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_tag_create); + +int decode_tag(void **p, void *end, struct ceph_journaler_tag *tag) +{ + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 1, "cls_journal_tag", &struct_v, + &struct_len); + if (ret) + return ret; + + tag->tid = ceph_decode_64(p); + tag->tag_class = ceph_decode_64(p); + + ceph_decode_skip_string(p, end, bad); + return 0; +bad: + return -EINVAL; +} + +int ceph_cls_journal_get_tag(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, u64 tag_tid, + struct ceph_journaler_tag *tag) +{ + struct page *reply_page; + struct page *req_page; + size_t reply_len = PAGE_SIZE; + int buf_size; + void *p, *end; + int ret; + + buf_size = sizeof(tag_tid); + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) + return -ENOMEM; + req_page = alloc_page(GFP_NOIO); + if (!req_page) { + ret = -ENOMEM; + goto free_reply_page; + } + + p = page_address(req_page); + end = p + buf_size; + ceph_encode_64(&p, tag_tid); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "get_tag", + CEPH_OSD_FLAG_READ, req_page, buf_size, + &reply_page, &reply_len); + if (!ret) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_tag(&p, end, tag); + } + __free_page(req_page); +free_reply_page: + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_get_tag); + +int ceph_cls_journal_client_committed(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_journaler_client *client, + struct list_head *object_positions) +{ + struct ceph_journaler_object_pos *position; + int object_position_len = CEPH_ENCODING_START_BLK_LEN + 8 + 8 + 8; + struct page *req_page; + int buf_size; + void *p, *end; + int pos_num = 0; + int ret; + + buf_size = 4 + client->id_len + CEPH_ENCODING_START_BLK_LEN + 4; + list_for_each_entry(position, object_positions, node) { + buf_size += object_position_len; + pos_num++; + } + if (buf_size > PAGE_SIZE) + return -E2BIG; + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + p = page_address(req_page); + end = p + buf_size; + ceph_encode_string(&p, end, client->id, client->id_len); + ceph_start_encoding(&p, 1, 1, + buf_size - client->id_len - + CEPH_ENCODING_START_BLK_LEN - 4); + ceph_encode_32(&p, pos_num); + + list_for_each_entry(position, object_positions, node) { + ceph_start_encoding(&p, 1, 1, 24); + ceph_encode_64(&p, position->object_num); + ceph_encode_64(&p, position->tag_tid); + ceph_encode_64(&p, position->entry_tid); + } + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "client_commit", + CEPH_OSD_FLAG_WRITE, req_page, buf_size, NULL, + NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_client_committed); + +int ceph_cls_journal_set_minimum_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 minimum_set) +{ + struct page *req_page; + void *p; + int ret; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + ceph_encode_64(&p, minimum_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_minimum_set", + CEPH_OSD_FLAG_WRITE, req_page, 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_set_minimum_set); + +int ceph_cls_journal_set_active_set(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 active_set) +{ + struct page *req_page; + void *p; + int ret; + + req_page = alloc_page(GFP_NOIO); + if (!req_page) + return -ENOMEM; + + p = page_address(req_page); + ceph_encode_64(&p, active_set); + ret = ceph_osdc_call(osdc, oid, oloc, "journal", "set_active_set", + CEPH_OSD_FLAG_WRITE, req_page, 8, NULL, NULL); + + __free_page(req_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_journal_set_active_set); From patchwork Wed Sep 25 09:07:28 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160233 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 7EA9014DB for ; Wed, 25 Sep 2019 09:09:28 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 2A41620673 for ; Wed, 25 Sep 2019 09:09:28 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728849AbfIYJJ1 (ORCPT ); Wed, 25 Sep 2019 05:09:27 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21720 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728727AbfIYJJ0 (ORCPT ); Wed, 25 Sep 2019 05:09:26 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S8; Wed, 25 Sep 2019 17:07:38 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 06/12] libceph: introduce generic journaler module Date: Wed, 25 Sep 2019 09:07:28 +0000 Message-Id: <1569402454-4736-7-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S8 X-Coremail-Antispam: 1Uf129KBjvAXoWDJryfWFyxWFy3XF4DuryrtFb_yoWxKw13Co Z7ur4UuFn5Ga47ZFWkKr1kJ34fXa48JayrAr4YqF4Y93ZrAry8Z3y7Gr15Jry3Aw4UArsF qw1xJwnaqr4DJ3WUn29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73VFW2AGmfu7bjvjm3 AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJeOJUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiKBs7elz4rC4UPgAAs+ Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org This is a generic journaling module for ceph client in linux kernel. It provide three kinds of APIs for clients: (1) open and close: 1.1 ceph_journaler_create, ceph_journaler_destroy 1.2 ceph_journaler_open, ceph_journaler_close (2) replay: ceph_journaler_start_replay(), this should be called after ceph_journaler_open, which would check the journal and do replay if there is any uncommitted entry in journal. (3) appending: 3.1 ceph_journaler_allocate_tag() is used to get an uniq tag_tid for this client. Every journal event from this client would be tagged by this tag_tid. 3.2 ceph_journaler_append() is used to append journal entry into journal. 3.3 ceph_journaler_client_committed() is called when the data is committed to data object, then we need to tell journal that the related journal entries are safe now. Signed-off-by: Dongsheng Yang --- include/linux/ceph/journaler.h | 182 ++++ net/ceph/Makefile | 3 +- net/ceph/journaler.c | 2205 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 2389 insertions(+), 1 deletion(-) create mode 100644 include/linux/ceph/journaler.h create mode 100644 net/ceph/journaler.c diff --git a/include/linux/ceph/journaler.h b/include/linux/ceph/journaler.h new file mode 100644 index 0000000..b9d482d --- /dev/null +++ b/include/linux/ceph/journaler.h @@ -0,0 +1,182 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef _FS_CEPH_JOURNAL_H +#define _FS_CEPH_JOURNAL_H + +#include +#include + +struct ceph_osd_client; + +#define JOURNAL_HEADER_PREFIX "journal." +#define JOURNAL_OBJECT_PREFIX "journal_data." +#define JOURNALER_EVENT_FIXED_SIZE 33 +#define PREAMBLE 0x3141592653589793 + +struct ceph_journaler_ctx; +typedef void (*ceph_journaler_callback_t)(struct ceph_journaler_ctx *); + +/* + * A ceph_journaler_ctx should be allocated for each journaler appending + * op, and caller need to set the ->callback, which will be called + * when this journaler event appending finish. + */ +struct ceph_journaler_ctx { + struct list_head node; + struct ceph_bio_iter bio_iter; + size_t bio_len; + + struct page *prefix_page; + unsigned int prefix_offset; + unsigned int prefix_len; + + struct page *suffix_page; + unsigned int suffix_offset; + unsigned int suffix_len; + + int result; + u64 commit_tid; + void *priv; + ceph_journaler_callback_t callback; +}; + +/* tag_tid is used to identify the client. */ +struct ceph_journaler_entry { + struct list_head node; + u64 tag_tid; + u64 entry_tid; + ssize_t data_len; + char *data; +}; + +/* + * ->safe = true means this append op is already written to osd servers + * ->consistent = true means the prev append op is already finished + * (safe && consistent) means this append finished. we can call the + * callback to upper caller. + * + * ->wait is the next append which depends on this append, when this + * append finish, it will tell wait to be consistent. + */ +struct ceph_journaler_future { + u64 tag_tid; + u64 entry_tid; + u64 commit_tid; + + spinlock_t lock; + bool safe; + bool consistent; + int result; + + struct ceph_journaler_ctx *ctx; + struct journaler_append_ctx *wait; +}; + +/* each journaler object have a recorder to append event to it. */ +struct object_recorder { + spinlock_t lock; + u8 splay_offset; + u64 inflight_append; + + struct list_head append_list; + struct list_head overflow_list; +}; + +/* each journaler object have a replayer to do replay in journaler openning. */ +struct object_replayer { + u64 object_num; + struct ceph_journaler_object_pos *pos; + struct list_head entry_list; +}; + +struct ceph_journaler { + struct ceph_osd_client *osdc; + struct ceph_object_id header_oid; + struct ceph_object_locator header_oloc; + struct ceph_object_locator data_oloc; + char *object_oid_prefix; + char *client_id; + + /* + * TODO put these bool into ->flags + * don't need to do another advance if we are advancing + */ + bool advancing; + /*don't do advance when we are flushing */ + bool flushing; + bool overflowed; + bool commit_scheduled; + u8 order; + u8 splay_width; + u8 splay_offset; + u64 active_tag_tid; + u64 prune_tag_tid; + u64 commit_tid; + u64 minimum_set; + u64 active_set; + + struct ceph_journaler_future *prev_future; + struct ceph_journaler_client *client; + struct object_recorder *obj_recorders; + struct object_replayer *obj_replayers; + + struct ceph_journaler_object_pos *obj_pos_pending_array; + struct list_head obj_pos_pending; + struct ceph_journaler_object_pos *obj_pos_committing_array; + struct list_head obj_pos_committing; + + struct mutex meta_lock; + struct mutex commit_lock; + spinlock_t finish_lock; + struct list_head finish_list; + struct list_head clients; + struct list_head clients_cache; + struct list_head entry_tids; + struct rb_root commit_entries; + + struct workqueue_struct *task_wq; + struct workqueue_struct *notify_wq; + struct work_struct flush_work; + struct delayed_work commit_work; + struct work_struct overflow_work; + struct work_struct finish_work; + struct work_struct notify_update_work; + + void *fetch_buf; + int (*handle_entry)(void *entry_handler, + struct ceph_journaler_entry *entry, u64 commit_tid); + void *entry_handler; + struct ceph_osd_linger_request *watch_handle; +}; + +/* generic functions */ +struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc, + struct ceph_object_locator *_oloc, + const char *journal_id, + const char *client_id); +void ceph_journaler_destroy(struct ceph_journaler *journal); + +int ceph_journaler_open(struct ceph_journaler *journal); +void ceph_journaler_close(struct ceph_journaler *journal); + +int ceph_journaler_get_cached_client(struct ceph_journaler *journaler, + char *client_id, + struct ceph_journaler_client **client_result); +/* replaying */ +int ceph_journaler_start_replay(struct ceph_journaler *journaler); + +/* recording */ +static inline u64 +ceph_journaler_get_max_append_size(struct ceph_journaler *journaler) +{ + return (1 << journaler->order) - JOURNALER_EVENT_FIXED_SIZE; +} +struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void); +void ceph_journaler_ctx_free(struct ceph_journaler_ctx *journaler_ctx); +int ceph_journaler_append(struct ceph_journaler *journaler, u64 tag_tid, + struct ceph_journaler_ctx *ctx); +void ceph_journaler_client_committed(struct ceph_journaler *journaler, + u64 commit_tid); +int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, u64 tag_class, + void *buf, u32 buf_len, + struct ceph_journaler_tag *tag); +#endif diff --git a/net/ceph/Makefile b/net/ceph/Makefile index 59d0ba2..2e672dd 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -14,4 +14,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ crypto.o armor.o \ auth_x.o \ ceph_fs.o ceph_strings.o ceph_hash.o \ - pagevec.o snapshot.o string_table.o + pagevec.o snapshot.o string_table.o \ + journaler.o cls_journal_client.o diff --git a/net/ceph/journaler.c b/net/ceph/journaler.c new file mode 100644 index 0000000..d6777cb --- /dev/null +++ b/net/ceph/journaler.c @@ -0,0 +1,2205 @@ +// SPDX-License-Identifier: GPL-2.0 + +#include +#include +#include +#include +#include +#include + +#include +#include + +#define JOURNALER_COMMIT_INTERVAL msecs_to_jiffies(5000) +#define JOURNALER_NOTIFY_TIMEOUT 5 /* seconds */ + +/* + * Append got through the following state machine: + * + * JOURNALER_APPEND_START + * | + * v + * ..>. . . JOURNALER_APPEND_SEND . . . . + * . | . + * JOURNALER_APPEND_OVERFLOW | . + * | . + * ^ v v + * .. . . . JOURNALER_APPEND_FLUSH . . >. . + * | (error) + * v . + * JOURNALER_APPEND_SAFE . . < . v + * | + * v + * JOURNALER_APPEND_FINISH + * + * Append starts in JOURNALER_APPEND_START and ends in JOURNALER_APPEND_FINISH + */ +enum journaler_append_state { + JOURNALER_APPEND_START = 1, + JOURNALER_APPEND_SEND, + JOURNALER_APPEND_FLUSH, + JOURNALER_APPEND_OVERFLOW, + JOURNALER_APPEND_SAFE, + JOURNALER_APPEND_FINISH, +}; + +/* + * journaler_append_ctx is an internal structure to represent an append op. + */ +struct journaler_append_ctx { + struct list_head node; + struct ceph_journaler *journaler; + + u8 splay_offset; + u64 object_num; + struct page *req_page; + + struct ceph_journaler_future future; + struct ceph_journaler_entry entry; + struct ceph_journaler_ctx journaler_ctx; + + enum journaler_append_state state; +}; + +struct commit_entry { + struct rb_node r_node; + u64 commit_tid; + u64 object_num; + u64 tag_tid; + u64 entry_tid; + bool committed; +}; + +struct entry_tid { + struct list_head node; + u64 tag_tid; + u64 entry_tid; +}; + +static struct kmem_cache *journaler_commit_entry_cache; +static struct kmem_cache *journaler_append_ctx_cache; + +/* trimming */ +static int ceph_journaler_obj_remove_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + + osd_req_op_init(req, 0, CEPH_OSD_OP_DELETE, 0); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_req; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static int remove_set(struct ceph_journaler *journaler, u64 object_set) +{ + u64 object_num; + u8 splay_offset; + struct ceph_object_id object_oid; + int ret; + + ceph_oid_init(&object_oid); + for (splay_offset = 0; splay_offset < journaler->splay_width; + splay_offset++) { + object_num = splay_offset + (object_set * journaler->splay_width); + if (!ceph_oid_empty(&object_oid)) { + ceph_oid_destroy(&object_oid); + ceph_oid_init(&object_oid); + } + ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu", + journaler->object_oid_prefix, + object_num); + if (ret) { + pr_err("%s: aprintf error : %d", __func__, ret); + goto out; + } + ret = ceph_journaler_obj_remove_sync(journaler, &object_oid, + &journaler->data_oloc); + if (ret < 0 && ret != -ENOENT) { + pr_err("%s: failed to remove object: %llu", __func__, + object_num); + goto out; + } + } + ret = 0; +out: + ceph_oid_destroy(&object_oid); + return ret; +} + +static int set_minimum_set(struct ceph_journaler *journaler, u64 minimum_set) +{ + int ret; + + ret = ceph_cls_journal_set_minimum_set(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + minimum_set); + if (ret < 0) { + pr_err("%s: failed to set_minimum_set: %d", __func__, ret); + return ret; + } + + queue_work(journaler->task_wq, &journaler->notify_update_work); + + return ret; +} + +DEFINE_RB_INSDEL_FUNCS(commit_entry, struct commit_entry, commit_tid, r_node) + +/* replaying */ +static int ceph_journaler_obj_read_sync(struct ceph_journaler *journaler, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, u32 read_off, u64 buf_len) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + pages = ceph_alloc_page_vector(num_pages, GFP_NOIO); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } + + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, + read_off, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_req; + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) + ceph_copy_from_page_vector(pages, buf, 0, ret); + +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static bool entry_is_readable(struct ceph_journaler *journaler, void *buf, + void *end) +{ + /* preamble, version, entry tid, tag id */ + const u32 HEADER_FIXED_SIZE = 25; + u32 remaining = end - buf; + u64 preamble; + u32 data_size; + void *origin_buf = buf; + u32 crc, crc_encoded; + + if (remaining < HEADER_FIXED_SIZE) + return false; + + /* preamble */ + preamble = ceph_decode_64(&buf); + if (PREAMBLE != preamble) + return false; + + buf += (HEADER_FIXED_SIZE - sizeof(preamble)); + remaining = end - buf; + if (remaining < sizeof(u32)) + return false; + /* data_size */ + data_size = ceph_decode_32(&buf); + remaining = end - buf; + if (remaining < data_size) + return false; + buf += data_size; + remaining = end - buf; + if (remaining < sizeof(u32)) + return false; + /* crc */ + crc = crc32c(0, origin_buf, buf - origin_buf); + crc_encoded = ceph_decode_32(&buf); + if (crc != crc_encoded) { + pr_err("%s: crc corrupted.", __func__); + return false; + } + return true; +} + +static int playback_entry(struct ceph_journaler *journaler, + struct ceph_journaler_entry *entry, u64 commit_tid) +{ + BUG_ON(!journaler->handle_entry || !journaler->entry_handler); + + return journaler->handle_entry(journaler->entry_handler, entry, + commit_tid); +} + +/* + * get_last_entry_tid() is called in replaying step, and no appending + * is allowed at that time. So there is no race for journaler->entry_tids + * with get_new_entry_tid(). + * + * And the replaying is single-threaded, that means there is no race + * with get_last_entry_tid() and reserve_entry_tid(). + */ +static bool get_last_entry_tid(struct ceph_journaler *journaler, u64 tag_tid, + u64 *entry_tid) +{ + struct entry_tid *pos; + + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + *entry_tid = pos->entry_tid; + return true; + } + } + return false; +} + +/* + * There would not be too many entry_tids here, we need + * only one entry_tid for all entries with same tag_tid. + */ +static struct entry_tid *entry_tid_alloc(struct ceph_journaler *journaler, + u64 tag_tid) +{ + struct entry_tid *entry_tid; + + entry_tid = kzalloc(sizeof(*entry_tid), GFP_NOIO); + if (!entry_tid) { + pr_err("%s: failed to allocate new entry.", __func__); + return NULL; + } + + entry_tid->tag_tid = tag_tid; + entry_tid->entry_tid = 0; + INIT_LIST_HEAD(&entry_tid->node); + + list_add_tail(&entry_tid->node, &journaler->entry_tids); + return entry_tid; +} + +/* + * reserve_entry_tid() is called in replaying. About the race + * for journaler->entry_tids, see comment for get_last_entry_tid() + */ +static int reserve_entry_tid(struct ceph_journaler *journaler, u64 tag_tid, + u64 entry_tid) +{ + struct entry_tid *pos; + + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + if (pos->entry_tid < entry_tid) + pos->entry_tid = entry_tid; + return 0; + } + } + + pos = entry_tid_alloc(journaler, tag_tid); + if (!pos) { + pr_err("%s: failed to allocate new entry.", __func__); + return -ENOMEM; + } + pos->entry_tid = entry_tid; + + return 0; +} + +static void journaler_entry_free(struct ceph_journaler_entry *entry) +{ + if (entry->data) + kvfree(entry->data); + kfree(entry); +} + +static struct ceph_journaler_entry *journaler_entry_decode(void **p, void *end) +{ + struct ceph_journaler_entry *entry; + u64 preamble; + u8 version; + u32 crc, crc_encoded; + void *start = *p; + + preamble = ceph_decode_64(p); + if (PREAMBLE != preamble) + return NULL; + + version = ceph_decode_8(p); + if (version != 1) + return NULL; + + entry = kzalloc(sizeof(*entry), GFP_NOIO); + if (!entry) + goto err; + INIT_LIST_HEAD(&entry->node); + entry->entry_tid = ceph_decode_64(p); + entry->tag_tid = ceph_decode_64(p); + /* use kvmalloc to extract the data */ + entry->data = ceph_extract_encoded_string_kvmalloc( + p, end, &entry->data_len, GFP_NOIO); + if (IS_ERR(entry->data)) { + entry->data = NULL; + goto free_entry; + } + + crc = crc32c(0, start, *p - start); + crc_encoded = ceph_decode_32(p); + if (crc != crc_encoded) + goto free_entry; + return entry; + +free_entry: + journaler_entry_free(entry); +err: + return NULL; +} + +static int fetch(struct ceph_journaler *journaler, u64 object_num) +{ + struct ceph_object_id object_oid; + void *read_buf, *end; + u64 read_len = 2 << journaler->order; + struct ceph_journaler_object_pos *pos; + struct object_replayer *obj_replayer; + int ret; + + /* + * get the replayer for this splay and set the ->object_num + * of it. + */ + obj_replayer = &journaler->obj_replayers[object_num % journaler->splay_width]; + obj_replayer->object_num = object_num; + /* find the commit position for this object_num. */ + list_for_each_entry(pos, &journaler->client->object_positions, node) { + if (pos->in_using && pos->object_num == object_num) { + /* + * Tell the replayer there is a commit position + * in this object. So delete the entries before + * this position, they are already committed. + */ + obj_replayer->pos = pos; + break; + } + } + /* read the object data */ + ceph_oid_init(&object_oid); + ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu", + journaler->object_oid_prefix, object_num); + if (ret) { + pr_err("%s: failed to initialize object_id : %d", + __func__, ret); + return ret; + } + + read_buf = journaler->fetch_buf; + ret = ceph_journaler_obj_read_sync(journaler, &object_oid, + &journaler->data_oloc, read_buf, 0, + read_len); + if (ret == -ENOENT) { + dout("%s: no such object, %s: %d", __func__, + object_oid.name, ret); + goto err_free_object_oid; + } else if (ret < 0) { + pr_err("%s: failed to read object: %s: %d", + __func__, object_oid.name, ret); + goto err_free_object_oid; + } else if (ret == 0) { + pr_err("%s: no data read out from object: %s: %d", + __func__, object_oid.name, ret); + goto err_free_object_oid; + } + /* decode the entries in this object */ + end = read_buf + ret; + while (read_buf < end) { + struct ceph_journaler_entry *entry; + + if (!entry_is_readable(journaler, read_buf, end)) { + pr_err("%s: entry is not readable.", __func__); + ret = -EIO; + goto err_free_object_oid; + } + entry = journaler_entry_decode(&read_buf, end); + if (!entry) + goto err_free_object_oid; + if (entry->tag_tid < journaler->active_tag_tid) + journaler_entry_free(entry); + else + list_add_tail(&entry->node, &obj_replayer->entry_list); + } + ret = 0; + +err_free_object_oid: + ceph_oid_destroy(&object_oid); + return ret; +} + +static int add_commit_entry(struct ceph_journaler *journaler, u64 commit_tid, + u64 object_num, u64 tag_tid, u64 entry_tid) +{ + struct commit_entry *commit_entry; + + commit_entry = kmem_cache_zalloc(journaler_commit_entry_cache, GFP_NOIO); + if (!commit_entry) + return -ENOMEM; + + RB_CLEAR_NODE(&commit_entry->r_node); + + commit_entry->commit_tid = commit_tid; + commit_entry->object_num = object_num; + commit_entry->tag_tid = tag_tid; + commit_entry->entry_tid = entry_tid; + + mutex_lock(&journaler->commit_lock); + insert_commit_entry(&journaler->commit_entries, commit_entry); + mutex_unlock(&journaler->commit_lock); + + return 0; +} + +/* journaler->meta_lock held */ +static u64 __allocate_commit_tid(struct ceph_journaler *journaler) +{ + lockdep_assert_held(&journaler->meta_lock); + return ++journaler->commit_tid; +} + +static u64 allocate_commit_tid(struct ceph_journaler *journaler) +{ + u64 commit_tid; + + mutex_lock(&journaler->meta_lock); + commit_tid = __allocate_commit_tid(journaler); + mutex_unlock(&journaler->meta_lock); + + return commit_tid; +} + +static void prune_tag(struct ceph_journaler *journaler, u64 tag_tid) +{ + struct ceph_journaler_entry *entry, *next; + struct object_replayer *obj_replayer; + int i; + + if (journaler->prune_tag_tid < tag_tid) + journaler->prune_tag_tid = tag_tid; + + for (i = 0; i < journaler->splay_width; i++) { + obj_replayer = &journaler->obj_replayers[i]; + list_for_each_entry_safe(entry, next, + &obj_replayer->entry_list, node) { + if (entry->tag_tid == tag_tid) { + list_del(&entry->node); + journaler_entry_free(entry); + } + } + } +} + +static int get_first_entry(struct ceph_journaler *journaler, + struct ceph_journaler_entry **entry, u64 *commit_tid) +{ + struct object_replayer *obj_replayer; + struct ceph_journaler_entry *tmp_entry; + u64 last_entry_tid; + bool expect_first_entry = false; + int ret; + +next: + /* find the current replayer. */ + obj_replayer = &journaler->obj_replayers[journaler->splay_offset]; + if (list_empty(&obj_replayer->entry_list)) { + prune_tag(journaler, journaler->active_tag_tid); + if (journaler->splay_offset == 0) { + ret = -ENOENT; + goto out; + } else { + journaler->splay_offset = 0; + goto next; + } + } + + /* get the first entry in current replayer */ + tmp_entry = list_first_entry(&obj_replayer->entry_list, + struct ceph_journaler_entry, node); + if (expect_first_entry && tmp_entry->entry_tid != 0) { + pr_err("We expect this is the first entry for \ + next tag. but the entry_tid is: %llu.", + tmp_entry->entry_tid); + return -ENOMSG; + } + + if (!journaler->active_tag_tid) { + /* + * There is no active tag tid. This would happen when + * there is no commit in this journal. But + * we have some uncommitted entries. So set the current + * tag_tid to be the active_tag_tid. + */ + journaler->active_tag_tid = tmp_entry->tag_tid; + } else if (tmp_entry->tag_tid < journaler->active_tag_tid || + tmp_entry->tag_tid <= journaler->prune_tag_tid) { + pr_err("detected stale entry: object_num=%llu, tag_tid=%llu,\ + entry_tid=%llu.", + obj_replayer->object_num, tmp_entry->tag_tid, + tmp_entry->entry_tid); + prune_tag(journaler, tmp_entry->tag_tid); + goto next; + } else if (tmp_entry->tag_tid > journaler->active_tag_tid) { + /* + * found a new tag_tid, which means a new client is starting + * to append journal events. lets prune the old tag + */ + prune_tag(journaler, journaler->active_tag_tid); + if (tmp_entry->entry_tid == 0) { + /* + * this is the first entry of new tag client, + * advance the active_tag_tid to the new tag_tid. + */ + journaler->active_tag_tid = tmp_entry->tag_tid; + } else { + /* + * each client is appending events from the first + * object (splay_offset: 0). If we found a new tag + * but this is not the first entry (entry_tid: 0), + * let's jump the splay_offset to 0 to get the + * first entry from the new tag client. + */ + journaler->splay_offset = 0; + + /* + * When we jump splay_offset to 0, we expect to get + * the first entry for a new tag. + */ + expect_first_entry = true; + goto next; + } + } + + /* Pop this entry from journal */ + list_del(&tmp_entry->node); + + /* advance the splay_offset */ + journaler->splay_offset = (journaler->splay_offset + 1) % journaler->splay_width; + + /* + * check entry_tid to make sure this entry_tid is after last_entry_tid + * for the same tag. + */ + ret = get_last_entry_tid(journaler, tmp_entry->tag_tid, + &last_entry_tid); + if (ret && tmp_entry->entry_tid != last_entry_tid + 1) { + pr_err("missing prior journal entry, last_entry_tid: %llu", + last_entry_tid); + ret = -ENOMSG; + goto free_entry; + } + + ret = reserve_entry_tid(journaler, tmp_entry->tag_tid, + tmp_entry->entry_tid); + if (ret) + goto free_entry; + + /* allocate commit_tid for this entry */ + *commit_tid = allocate_commit_tid(journaler); + ret = add_commit_entry(journaler, *commit_tid, obj_replayer->object_num, + tmp_entry->tag_tid, tmp_entry->entry_tid); + if (ret) + goto free_entry; + + /* fetch next object if this object is done. */ + if (list_empty(&obj_replayer->entry_list)) { + ret = fetch(journaler, + obj_replayer->object_num + journaler->splay_width); + if (ret && ret != -ENOENT) + goto free_entry; + } + + *entry = tmp_entry; + return 0; + +free_entry: + journaler_entry_free(tmp_entry); +out: + return ret; +} + +static int process_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_entry *entry = NULL; + u64 commit_tid = 0; + int r; + +next: + /* + * get the first entry from the journal, while there + * are different journal objects. + */ + r = get_first_entry(journaler, &entry, &commit_tid); + if (r) { + if (r == -ENOENT) + r = 0; + return r; + } + + BUG_ON(entry == NULL || commit_tid == 0); + r = playback_entry(journaler, entry, commit_tid); + journaler_entry_free(entry); + if (r) + return r; + goto next; +} + +/* reserve entry tid and delete entries before commit position */ +static int preprocess_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_entry *entry, *next; + bool found_commit = false; + struct object_replayer *obj_replayer; + int i, ret; + + for (i = 0; i < journaler->splay_width; i++) { + obj_replayer = &journaler->obj_replayers[i]; + /* + * If obj_replayer->pos is NULL, that means + * there is no commit position in this object. + */ + if (!obj_replayer->pos) + continue; + found_commit = false; + list_for_each_entry_safe(entry, next, + &obj_replayer->entry_list, node) { + if (entry->tag_tid == obj_replayer->pos->tag_tid && + entry->entry_tid == obj_replayer->pos->entry_tid) + found_commit = true; + + /* This entry is before commit position, skip it in replaying. */ + ret = reserve_entry_tid(journaler, entry->tag_tid, + entry->entry_tid); + if (ret) + return ret; + list_del(&entry->node); + journaler_entry_free(entry); + + if (found_commit) + break; + } + } + return 0; +} + +/* + * Why do we need to replay? + * + * Because we append journal firstly before write data objects. So when + * a crash happened in last writing, there would be some entries in journal + * which means these data is safe, but they are not committed to data + * objects. So when we want to open journal again, we need to check the journal + * and playback the uncommitted journal. + * + * Example: + * + * splay_width: 4 + * + * commit positions: + * [[object_number=2, tag_tid=1, entry_tid=10], + * [object_number=1, tag_tid=1, entry_tid=9], + * [object_number=0, tag_tid=1, entry_tid=8], + * [object_number=3, tag_tid=1, entry_tid=7]] + * + * journal entries (number means the entry_tid; * means commit position): + * object 0: |0|4|8*|12|16|20| + * object 1: |1|5|9*|13|17|21| + * object 2: |2|6|10*|14|18| + * object 3: |3|7*|11|15|19| + * + * In this case, we need to replay the entries from 11 to 21. + * + * Step 1: Get the active position and fetch objects. + * "Active position" means the last committied position, that's the head + * of commit positions. We need to replay the entries after this position. + * + * "fetch objects" means the last committed object in each splay. we don't need + * to replay entries committed, so let's fetch journal from the last committed + * objects + * + * The active position in above example is [object_number=2, tag_tid=1, entry_tid=10]. + * The fetch objects in above example is [0, 1, 2, 3] + * + * Step 2: fetch journal objects. + * read the data in "fetch objects" ([0, 1, 2, 3]), decode the entries and put + * them in list of replayer->entry_list. + * + * In above example, the entry_list would be like this: + * replayer_0->entry_list: 0->4->8*->12->16->20. + * replayer_1->entry_list: 1->5->9*->13->17->21. + * replayer_2->entry_list: 2->6->10*->14->18. + * replayer_3->entry_list: 3->7*->11->15->19. + * + * Step 3: preprocess the journal entries + * delete entries before commit position which we dont need to replay, + * because they are already committed. So after preprocess, the entry_list + * would be that: + * + * replayer_0->entry_list: 12->16->20. + * replayer_1->entry_list: 13->17->21. + * replayer_2->entry_list: 14->18. + * replayer_3->entry_list: 11->15->19. + * + * Step 4: process the journal entries + * replay the entries one by one start from the entry after last commit position. + * + * As we know in Step 1, the last commit position is 10, the replay will begin + * from 11 and end after the last entry 21. + */ +int ceph_journaler_start_replay(struct ceph_journaler *journaler) +{ + struct ceph_journaler_object_pos *active_pos; + u64 *fetch_objects; + u64 buf_len = (2 << journaler->order); + u64 object_num; + int i; + int ret = 0; + + if (!journaler->handle_entry || !journaler->entry_handler) { + pr_err("Please initialize the handle_entry and entry_handler"); + return -EINVAL; + } + + fetch_objects = kcalloc(journaler->splay_width, sizeof(u64), GFP_NOIO); + if (!fetch_objects) + return -ENOMEM; + + /* Step 1: Get the active position. */ + mutex_lock(&journaler->meta_lock); + /* + * Get the HEAD of commit positions, that means + * the last committed object position. + */ + active_pos = list_first_entry(&journaler->client->object_positions, + struct ceph_journaler_object_pos, node); + /* + * When there is no commit position in this journal, + * the active_pos would be empty. So skip getting active + * information when active_pos->in_using is false. + */ + if (active_pos->in_using) { + journaler->splay_offset = (active_pos->object_num + 1) % journaler->splay_width; + journaler->active_tag_tid = active_pos->tag_tid; + + list_for_each_entry(active_pos, + &journaler->client->object_positions, + node) { + if (active_pos->in_using) { + fetch_objects[active_pos->object_num % + journaler->splay_width] = + active_pos->object_num; + } + } + } + mutex_unlock(&journaler->meta_lock); + + /* + * Step 2: fetch journal objects. + * fetch_buf will be used to read every journal object + */ + journaler->fetch_buf = ceph_kvmalloc(buf_len, GFP_NOIO); + if (!journaler->fetch_buf) { + pr_err("%s: failed to alloc fetch buf: %llu", + __func__, buf_len); + ret = -ENOMEM; + goto out; + } + + for (i = 0; i < journaler->splay_width; i++) { + if (fetch_objects[i] == 0) { + /* + * No active commit position, so fetch + * them in splay order. + */ + object_num = i; + } else { + object_num = fetch_objects[i]; + } + ret = fetch(journaler, object_num); + if (ret && ret != -ENOENT) + goto free_fetch_buf; + } + + /* Step 3: preprocess the journal entries */ + ret = preprocess_replay(journaler); + if (ret) + goto free_fetch_buf; + + /* Step 4: process the journal entries */ + ret = process_replay(journaler); + +free_fetch_buf: + kvfree(journaler->fetch_buf); +out: + /* cleanup replayers */ + for (i = 0; i < journaler->splay_width; i++) { + struct object_replayer *obj_replayer = + &journaler->obj_replayers[i]; + struct ceph_journaler_entry *entry, *next_entry; + + list_for_each_entry_safe(entry, next_entry, + &obj_replayer->entry_list, node) { + list_del(&entry->node); + journaler_entry_free(entry); + } + } + kfree(fetch_objects); + return ret; +} +EXPORT_SYMBOL(ceph_journaler_start_replay); + +/* recording*/ +/* + * get_new_entry_tid() is called in ceph_journaler_append() with + * journaler->meta_lock held. So there is no race to access entry_tids + * between different get_new_entry_tid()s + */ +static int get_new_entry_tid(struct ceph_journaler *journaler, u64 tag_tid, + u64 *entry_tid) +{ + struct entry_tid *pos; + + lockdep_assert_held(&journaler->meta_lock); + list_for_each_entry(pos, &journaler->entry_tids, node) { + if (pos->tag_tid == tag_tid) { + *entry_tid = pos->entry_tid++; + return 0; + } + } + + pos = entry_tid_alloc(journaler, tag_tid); + if (!pos) { + pr_err("%s: failed to allocate new entry.", __func__); + return -ENOMEM; + } + + *entry_tid = pos->entry_tid++; + + return 0; +} + +static u64 get_object(struct ceph_journaler *journaler, u8 splay_offset) +{ + return splay_offset + (journaler->splay_width * journaler->active_set); +} + +static void future_init(struct ceph_journaler_future *future, u64 tag_tid, + u64 entry_tid, u64 commit_tid, + struct ceph_journaler_ctx *journaler_ctx) +{ + future->tag_tid = tag_tid; + future->entry_tid = entry_tid; + future->commit_tid = commit_tid; + + spin_lock_init(&future->lock); + future->safe = false; + future->consistent = false; + future->result = 0; + + future->ctx = journaler_ctx; + future->wait = NULL; +} + +static void set_prev_future(struct ceph_journaler *journaler, + struct journaler_append_ctx *append_ctx) +{ + struct ceph_journaler_future *future = &append_ctx->future; + bool prev_future_finished = false; + + if (journaler->prev_future == NULL) { + prev_future_finished = true; + } else { + spin_lock(&journaler->prev_future->lock); + prev_future_finished = (journaler->prev_future->consistent && + journaler->prev_future->safe); + journaler->prev_future->wait = append_ctx; + spin_unlock(&journaler->prev_future->lock); + } + + spin_lock(&future->lock); + if (prev_future_finished) + future->consistent = true; + spin_unlock(&future->lock); + + journaler->prev_future = future; +} + +static void entry_init(struct ceph_journaler_entry *entry, u64 tag_tid, + u64 entry_tid, struct ceph_journaler_ctx *journaler_ctx) +{ + entry->tag_tid = tag_tid; + entry->entry_tid = entry_tid; + entry->data_len = journaler_ctx->bio_len + journaler_ctx->prefix_len + + journaler_ctx->suffix_len; +} + +static void journaler_entry_encode_prefix(struct ceph_journaler_entry *entry, + void **p, void *end) +{ + ceph_encode_64(p, PREAMBLE); + ceph_encode_8(p, 1); /* version */ + ceph_encode_64(p, entry->entry_tid); + ceph_encode_64(p, entry->tag_tid); + + ceph_encode_32(p, entry->data_len); +} + +static u32 crc_bio(u32 crc_in, struct ceph_bio_iter *bio_iter, u64 length) +{ + struct ceph_bio_iter it = *bio_iter; + char *buf; + u64 len; + u32 crc = crc_in; + + ceph_bio_iter_advance_step(&it, length, ({ + buf = page_address(bv.bv_page) + bv.bv_offset; + len = min_t(u64, length, bv.bv_len); + crc = crc32c(crc, buf, len); + })); + + return crc; +} + +static void journaler_handle_append(struct journaler_append_ctx *ctx, int ret); +static void future_consistent(struct journaler_append_ctx *append_ctx, + int result) +{ + struct ceph_journaler_future *future = &append_ctx->future; + bool future_finished = false; + + spin_lock(&future->lock); + if (!future->result) + future->result = result; + future->consistent = true; + future_finished = (future->safe && future->consistent); + spin_unlock(&future->lock); + + if (future_finished) { + append_ctx->state = JOURNALER_APPEND_FINISH; + journaler_handle_append(append_ctx, 0); + } +} + +static void future_finish(struct journaler_append_ctx *append_ctx) +{ + struct ceph_journaler *journaler = append_ctx->journaler; + struct ceph_journaler_ctx *journaler_ctx = &append_ctx->journaler_ctx; + struct ceph_journaler_future *future = &append_ctx->future; + + mutex_lock(&journaler->meta_lock); + if (journaler->prev_future == future) + journaler->prev_future = NULL; + mutex_unlock(&journaler->meta_lock); + + spin_lock(&journaler->finish_lock); + if (journaler_ctx->result == 0) + journaler_ctx->result = future->result; + list_add_tail(&append_ctx->node, &journaler->finish_list); + spin_unlock(&journaler->finish_lock); + + queue_work(journaler->task_wq, &journaler->finish_work); +} + +static void journaler_append_finish(struct work_struct *work) +{ + struct ceph_journaler *journaler = + container_of(work, struct ceph_journaler, finish_work); + struct journaler_append_ctx *ctx_pos, *next; + LIST_HEAD(tmp_list); + + spin_lock(&journaler->finish_lock); + list_splice_init(&journaler->finish_list, &tmp_list); + spin_unlock(&journaler->finish_lock); + + list_for_each_entry_safe(ctx_pos, next, &tmp_list, node) { + list_del(&ctx_pos->node); + if (ctx_pos->future.wait) + future_consistent(ctx_pos->future.wait, + ctx_pos->future.result); + ctx_pos->journaler_ctx.callback(&ctx_pos->journaler_ctx); + } +} + +static void journaler_notify_update(struct work_struct *work) +{ + struct ceph_journaler *journaler = + container_of(work, struct ceph_journaler, notify_update_work); + int ret = 0; + + ret = ceph_osdc_notify(journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, NULL, 0, + JOURNALER_NOTIFY_TIMEOUT, NULL, NULL); + if (ret) + pr_err("%s: notify_update failed: %d", __func__, ret); +} + +/* + * advance the active_set to (active_set + 1). This function + * will call ceph_cls_journal_set_active_set to update journal + * metadata and notify all clients about this event. We don't + * update journaler->active_set in memory currently. + * + * The journaler->active_set will be updated in refresh() when + * we get the notification. + */ +static void advance_object_set(struct ceph_journaler *journaler) +{ + struct object_recorder *obj_recorder; + u64 active_set; + int i, ret; + + mutex_lock(&journaler->meta_lock); + if (journaler->advancing || journaler->flushing) { + mutex_unlock(&journaler->meta_lock); + return; + } + mutex_unlock(&journaler->meta_lock); + + /* make sure all inflight appending finish */ + for (i = 0; i < journaler->splay_width; i++) { + obj_recorder = &journaler->obj_recorders[i]; + spin_lock(&obj_recorder->lock); + if (obj_recorder->inflight_append) { + spin_unlock(&obj_recorder->lock); + return; + } + spin_unlock(&obj_recorder->lock); + } + + mutex_lock(&journaler->meta_lock); + journaler->advancing = true; + + active_set = journaler->active_set + 1; + mutex_unlock(&journaler->meta_lock); + + ret = ceph_cls_journal_set_active_set(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + active_set); + if (ret) + pr_err("%s: error in set active_set: %d", __func__, ret); + + queue_work(journaler->task_wq, &journaler->notify_update_work); +} + +static void journaler_overflow(struct work_struct *work) +{ + struct ceph_journaler *journaler = + container_of(work, struct ceph_journaler, overflow_work); + if (!journaler->overflowed) + return; + + advance_object_set(journaler); +} + +static void journaler_append_callback(struct ceph_osd_request *osd_req) +{ + struct journaler_append_ctx *ctx = osd_req->r_priv; + int ret = osd_req->r_result; + + if (ret) + pr_debug("ret of journaler_append_callback: %d", ret); + + __free_page(ctx->req_page); + ceph_osdc_put_request(osd_req); + + journaler_handle_append(ctx, ret); +} + +static int append(struct ceph_journaler *journaler, struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct journaler_append_ctx *ctx) + +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_request *req; + void *p; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 2, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE; + req->r_callback = journaler_append_callback; + req->r_priv = ctx; + + /* + * guard_append + * TODO: to make the prefix, suffix and guard_append to share page. + */ + ctx->req_page = alloc_page(GFP_NOIO); + if (!ctx->req_page) { + ret = -ENOMEM; + goto out_req; + } + p = page_address(ctx->req_page); + ceph_encode_64(&p, 1 << journaler->order); + ret = osd_req_op_cls_init(req, 0, "journal", "guard_append"); + if (ret) + goto out_free_page; + osd_req_op_cls_request_data_pages(req, 0, &ctx->req_page, 8, 0, false, + false); + + /* append_data */ + osd_req_op_extent_init(req, 1, CEPH_OSD_OP_APPEND, 0, + ctx->journaler_ctx.prefix_len + + ctx->journaler_ctx.bio_len + + ctx->journaler_ctx.suffix_len, + 0, 0); + + if (ctx->journaler_ctx.prefix_len) + osd_req_op_extent_prefix_pages(req, 1, + &ctx->journaler_ctx.prefix_page, + ctx->journaler_ctx.prefix_len, + ctx->journaler_ctx.prefix_offset, + false, false); + + if (ctx->journaler_ctx.bio_len) + osd_req_op_extent_osd_data_bio(req, 1, + &ctx->journaler_ctx.bio_iter, + ctx->journaler_ctx.bio_len); + + if (ctx->journaler_ctx.suffix_len) + osd_req_op_extent_suffix_pages(req, 1, + &ctx->journaler_ctx.suffix_page, + ctx->journaler_ctx.suffix_len, + ctx->journaler_ctx.suffix_offset, + false, false); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_free_page; + + ceph_osdc_start_request(osdc, req, false); + return 0; + +out_free_page: + __free_page(ctx->req_page); +out_req: + ceph_osdc_put_request(req); + return ret; +} + +static int send_append_request(struct ceph_journaler *journaler, u64 object_num, + struct journaler_append_ctx *ctx) +{ + struct ceph_object_id object_oid; + int ret = 0; + + ceph_oid_init(&object_oid); + ret = ceph_oid_aprintf(&object_oid, GFP_NOIO, "%s%llu", + journaler->object_oid_prefix, object_num); + if (ret) { + pr_err("%s: failed to initialize object id: %d", __func__, ret); + goto out; + } + + ret = append(journaler, &object_oid, &journaler->data_oloc, ctx); +out: + ceph_oid_destroy(&object_oid); + return ret; +} + +static void journaler_flush(struct work_struct *work) +{ + struct ceph_journaler *journaler = + container_of(work, struct ceph_journaler, flush_work); + int i; + struct object_recorder *obj_recorder; + struct journaler_append_ctx *ctx, *next_ctx; + LIST_HEAD(tmp); + + mutex_lock(&journaler->meta_lock); + if (journaler->overflowed) { + mutex_unlock(&journaler->meta_lock); + return; + } + + journaler->flushing = true; + mutex_unlock(&journaler->meta_lock); + + for (i = 0; i < journaler->splay_width; i++) { + INIT_LIST_HEAD(&tmp); + obj_recorder = &journaler->obj_recorders[i]; + + spin_lock(&obj_recorder->lock); + list_splice_tail_init(&obj_recorder->overflow_list, &tmp); + list_splice_tail_init(&obj_recorder->append_list, &tmp); + spin_unlock(&obj_recorder->lock); + + list_for_each_entry_safe(ctx, next_ctx, &tmp, node) { + list_del(&ctx->node); + ctx->object_num = get_object( + journaler, obj_recorder->splay_offset); + journaler_handle_append(ctx, 0); + } + } + + mutex_lock(&journaler->meta_lock); + journaler->flushing = false; + mutex_unlock(&journaler->meta_lock); + /* + * As we don't do advance in flushing, so queue another overflow_work + * after flushing finished if we journaler is overflowed. + */ + queue_work(journaler->task_wq, &journaler->overflow_work); +} + +static void +ceph_journaler_object_append(struct ceph_journaler *journaler, + struct journaler_append_ctx *append_ctx) +{ + void *buf, *end; + u32 crc = 0; + struct ceph_journaler_ctx *journaler_ctx = &append_ctx->journaler_ctx; + struct ceph_bio_iter *bio_iter = &journaler_ctx->bio_iter; + struct object_recorder *obj_recorder; + + /* + * PEAMBLE(8) + version(1) + entry_tid(8) + * + tag_tid(8) + string_len(4) = 29 + */ + journaler_ctx->prefix_offset -= 29; + journaler_ctx->prefix_len += 29; + buf = page_address(journaler_ctx->prefix_page) + + journaler_ctx->prefix_offset; + end = buf + 29; + journaler_entry_encode_prefix(&append_ctx->entry, &buf, end); + + /* size of crc is 4 */ + buf = page_address(journaler_ctx->suffix_page) + journaler_ctx->suffix_len; + end = buf + 4; + crc = crc32c(crc, + page_address(journaler_ctx->prefix_page) + + journaler_ctx->prefix_offset, + journaler_ctx->prefix_len); + if (journaler_ctx->bio_len) + crc = crc_bio(crc, bio_iter, journaler_ctx->bio_len); + crc = crc32c(crc, + page_address(journaler_ctx->suffix_page), + journaler_ctx->suffix_len); + ceph_encode_32(&buf, crc); + journaler_ctx->suffix_len += 4; + obj_recorder = &journaler->obj_recorders[append_ctx->splay_offset]; + + spin_lock(&obj_recorder->lock); + list_add_tail(&append_ctx->node, &obj_recorder->append_list); + queue_work(journaler->task_wq, &journaler->flush_work); + spin_unlock(&obj_recorder->lock); +} + +static void journaler_handle_append(struct journaler_append_ctx *ctx, int ret) +{ + struct ceph_journaler *journaler = ctx->journaler; + struct object_recorder *obj_recorder = + &journaler->obj_recorders[ctx->splay_offset]; + +again: + switch (ctx->state) { + case JOURNALER_APPEND_START: + ctx->state = JOURNALER_APPEND_SEND; + ceph_journaler_object_append(journaler, ctx); + break; + case JOURNALER_APPEND_SEND: + ctx->state = JOURNALER_APPEND_FLUSH; + spin_lock(&obj_recorder->lock); + obj_recorder->inflight_append++; + spin_unlock(&obj_recorder->lock); + ret = send_append_request(journaler, ctx->object_num, ctx); + if (ret) { + pr_err("failed to send append request: %d", ret); + ctx->state = JOURNALER_APPEND_SAFE; + goto again; + } + break; + case JOURNALER_APPEND_FLUSH: + if (ret == -EOVERFLOW) { + mutex_lock(&journaler->meta_lock); + journaler->overflowed = true; + mutex_unlock(&journaler->meta_lock); + + spin_lock(&obj_recorder->lock); + ctx->state = JOURNALER_APPEND_OVERFLOW; + list_add_tail(&ctx->node, &obj_recorder->overflow_list); + if (--obj_recorder->inflight_append == 0) + queue_work(journaler->task_wq, + &journaler->overflow_work); + spin_unlock(&obj_recorder->lock); + break; + } + + spin_lock(&obj_recorder->lock); + if (--obj_recorder->inflight_append == 0) + queue_work(journaler->task_wq, + &journaler->overflow_work); + spin_unlock(&obj_recorder->lock); + + if (ret) { + /* + * If ret is not 0, we need to tell the + * upper caller the result. + */ + ctx->state = JOURNALER_APPEND_SAFE; + goto again; + } + + ret = add_commit_entry(journaler, ctx->future.commit_tid, + ctx->object_num, ctx->future.tag_tid, + ctx->future.entry_tid); + if (ret) { + pr_err("%s: failed to add_commit_entry: %d", + __func__, ret); + ctx->state = JOURNALER_APPEND_SAFE; + ret = -ENOMEM; + goto again; + } + ctx->state = JOURNALER_APPEND_SAFE; + goto again; + case JOURNALER_APPEND_OVERFLOW: + ctx->state = JOURNALER_APPEND_SEND; + goto again; + case JOURNALER_APPEND_SAFE: + spin_lock(&ctx->future.lock); + ctx->future.safe = true; + if (!ctx->future.result) + ctx->future.result = ret; + if (ctx->future.consistent) { + spin_unlock(&ctx->future.lock); + ctx->state = JOURNALER_APPEND_FINISH; + goto again; + } + spin_unlock(&ctx->future.lock); + break; + case JOURNALER_APPEND_FINISH: + future_finish(ctx); + break; + default: + BUG(); + } +} + +/* journaler_append_ctx alloc and release */ +struct journaler_append_ctx *journaler_append_ctx_alloc(void) +{ + struct journaler_append_ctx *append_ctx; + struct ceph_journaler_ctx *journaler_ctx; + + append_ctx = kmem_cache_zalloc(journaler_append_ctx_cache, GFP_NOIO); + if (!append_ctx) + return NULL; + + journaler_ctx = &append_ctx->journaler_ctx; + journaler_ctx->prefix_page = alloc_page(GFP_NOIO); + if (!journaler_ctx->prefix_page) + goto free_journaler_ctx; + + journaler_ctx->suffix_page = alloc_page(GFP_NOIO); + if (!journaler_ctx->suffix_page) + goto free_prefix_page; + + INIT_LIST_HEAD(&journaler_ctx->node); + INIT_LIST_HEAD(&append_ctx->node); + return append_ctx; + +free_prefix_page: + __free_page(journaler_ctx->prefix_page); +free_journaler_ctx: + kmem_cache_free(journaler_append_ctx_cache, append_ctx); + return NULL; +} + +struct ceph_journaler_ctx *ceph_journaler_ctx_alloc(void) +{ + struct journaler_append_ctx *append_ctx; + + append_ctx = journaler_append_ctx_alloc(); + if (!append_ctx) + return NULL; + + return &append_ctx->journaler_ctx; +} +EXPORT_SYMBOL(ceph_journaler_ctx_alloc); + +static void journaler_append_ctx_free(struct journaler_append_ctx *append_ctx) +{ + struct ceph_journaler_ctx *journaler_ctx; + + journaler_ctx = &append_ctx->journaler_ctx; + __free_page(journaler_ctx->prefix_page); + __free_page(journaler_ctx->suffix_page); + kmem_cache_free(journaler_append_ctx_cache, append_ctx); +} + +void ceph_journaler_ctx_free(struct ceph_journaler_ctx *journaler_ctx) +{ + struct journaler_append_ctx *append_ctx; + + append_ctx = container_of(journaler_ctx, struct journaler_append_ctx, + journaler_ctx); + journaler_append_ctx_free(append_ctx); +} +EXPORT_SYMBOL(ceph_journaler_ctx_free); + +int ceph_journaler_append(struct ceph_journaler *journaler, u64 tag_tid, + struct ceph_journaler_ctx *journaler_ctx) +{ + u64 entry_tid; + struct journaler_append_ctx *append_ctx; + int ret; + + append_ctx = container_of(journaler_ctx, struct journaler_append_ctx, + journaler_ctx); + + append_ctx->journaler = journaler; + mutex_lock(&journaler->meta_lock); + /* + * get entry_tid for this event. (tag_tid, entry_tid) is + * the uniq id for every journal event. + */ + ret = get_new_entry_tid(journaler, tag_tid, &entry_tid); + if (ret) { + mutex_unlock(&journaler->meta_lock); + return ret; + } + + /* calculate the object_num for this entry. */ + append_ctx->splay_offset = entry_tid % journaler->splay_width; + append_ctx->object_num = + get_object(journaler, append_ctx->splay_offset); + + /* + * allocate a commit_tid for this event, when the data is committed + * to data objects, ceph_journaler_client_committed() will accept + * the commit_tid to understand how to update journal commit position. + */ + journaler_ctx->commit_tid = __allocate_commit_tid(journaler); + entry_init(&append_ctx->entry, tag_tid, entry_tid, journaler_ctx); + + /* + * To make sure the journal entry is consistent, we use future + * to track it. And every journal entry depent on the previous + * entry. Only if the previous entry is finished, current entry + * could be consistent. and then we can finish current entry. + */ + future_init(&append_ctx->future, tag_tid, entry_tid, + journaler_ctx->commit_tid, journaler_ctx); + set_prev_future(journaler, append_ctx); + mutex_unlock(&journaler->meta_lock); + + append_ctx->state = JOURNALER_APPEND_START; + journaler_handle_append(append_ctx, 0); + return 0; +} +EXPORT_SYMBOL(ceph_journaler_append); + +static void copy_object_pos(struct ceph_journaler_object_pos *src_pos, + struct ceph_journaler_object_pos *dst_pos) +{ + dst_pos->object_num = src_pos->object_num; + dst_pos->tag_tid = src_pos->tag_tid; + dst_pos->entry_tid = src_pos->entry_tid; +} + +static void copy_pos_list(struct list_head *src_list, + struct list_head *dst_list) +{ + struct ceph_journaler_object_pos *src_pos, *dst_pos; + + src_pos = list_first_entry(src_list, struct ceph_journaler_object_pos, + node); + dst_pos = list_first_entry(dst_list, struct ceph_journaler_object_pos, + node); + while (&src_pos->node != src_list && &dst_pos->node != dst_list) { + copy_object_pos(src_pos, dst_pos); + src_pos = list_next_entry(src_pos, node); + dst_pos = list_next_entry(dst_pos, node); + } +} + +static void journaler_client_commit(struct work_struct *work) +{ + struct ceph_journaler *journaler = container_of( + to_delayed_work(work), struct ceph_journaler, commit_work); + int ret; + + mutex_lock(&journaler->commit_lock); + copy_pos_list(&journaler->obj_pos_pending, + &journaler->obj_pos_committing); + mutex_unlock(&journaler->commit_lock); + + ret = ceph_cls_journal_client_committed(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + journaler->client, + &journaler->obj_pos_committing); + + if (ret) + pr_err("%s: error in client committed: %d", __func__, ret); + + queue_work(journaler->task_wq, &journaler->notify_update_work); + + mutex_lock(&journaler->commit_lock); + journaler->commit_scheduled = false; + mutex_unlock(&journaler->commit_lock); +} + +static void add_object_position(struct commit_entry *entry, + struct list_head *object_positions, + u64 splay_width) +{ + struct ceph_journaler_object_pos *position; + u8 splay_offset = entry->object_num % splay_width; + bool found = false; + + list_for_each_entry(position, object_positions, node) { + if (position->in_using == false || + (splay_offset == position->object_num % splay_width)) { + found = true; + break; + } + } + + BUG_ON(!found); + if (position->in_using == false) + position->in_using = true; + position->object_num = entry->object_num; + position->tag_tid = entry->tag_tid; + position->entry_tid = entry->entry_tid; + list_move(&position->node, object_positions); +} + +void ceph_journaler_client_committed(struct ceph_journaler *journaler, + u64 commit_tid) +{ + struct commit_entry *commit_entry; + bool update_client_commit = true; + struct rb_node *n; + + mutex_lock(&journaler->commit_lock); + /* search commit entries in commit_tid order. */ + for (n = rb_first(&journaler->commit_entries); n; n = rb_next(n)) { + commit_entry = rb_entry(n, struct commit_entry, r_node); + /* set current commit entry to be committed. */ + if (commit_entry->commit_tid == commit_tid) { + commit_entry->committed = true; + break; + } + /* + * if there is any one entry before commit_tid is not committed, + * we dont need to update_client_commit. + */ + if (commit_entry->committed == false) + update_client_commit = false; + } + + /* + * update_client_commit when the all commit entries before this commit_tid + * are all committed. + */ + if (update_client_commit) { + for (n = rb_first(&journaler->commit_entries); n;) { + commit_entry = rb_entry(n, struct commit_entry, r_node); + n = rb_next(n); + + if (commit_entry->commit_tid > commit_tid) + break; + add_object_position(commit_entry, + &journaler->obj_pos_pending, + journaler->splay_width); + erase_commit_entry(&journaler->commit_entries, + commit_entry); + kmem_cache_free(journaler_commit_entry_cache, + commit_entry); + } + } + + /* + * schedule commit_work to call ceph_cls_journal_client_committed() + * after JOURNALER_COMMIT_INTERVAL + */ + if (update_client_commit && !journaler->commit_scheduled) { + queue_delayed_work(journaler->task_wq, &journaler->commit_work, + JOURNALER_COMMIT_INTERVAL); + journaler->commit_scheduled = true; + } + mutex_unlock(&journaler->commit_lock); +} +EXPORT_SYMBOL(ceph_journaler_client_committed); + +/* + * client need to allocate an uniq tag for itself, then every + * journaler entry from this client will be tagged as his tag. + */ +int ceph_journaler_allocate_tag(struct ceph_journaler *journaler, u64 tag_class, + void *buf, u32 buf_len, + struct ceph_journaler_tag *tag) +{ + u64 tag_tid; + int ret; + +retry: + ret = ceph_cls_journal_get_next_tag_tid(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + &tag_tid); + if (ret) + goto out; + + ret = ceph_cls_journal_tag_create(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, tag_tid, + tag_class, buf, buf_len); + if (ret < 0) { + if (ret == -ESTALE) + goto retry; + else + goto out; + } + + ret = ceph_cls_journal_get_tag(journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, tag_tid, tag); + if (ret) + goto out; + +out: + return ret; +} +EXPORT_SYMBOL(ceph_journaler_allocate_tag); + +int ceph_journaler_get_cached_client( + struct ceph_journaler *journaler, char *client_id, + struct ceph_journaler_client **client_result) +{ + struct ceph_journaler_client *client; + int ret = -ENOENT; + + list_for_each_entry(client, &journaler->clients, node) { + if (!strcmp(client->id, client_id)) { + *client_result = client; + ret = 0; + break; + } + } + + return ret; +} +EXPORT_SYMBOL(ceph_journaler_get_cached_client); + +static int refresh(struct ceph_journaler *journaler, bool init) +{ + struct ceph_journaler_client *client; + u64 minimum_commit_set; + u64 minimum_set; + u64 active_set; + bool need_advance = false; + LIST_HEAD(tmp_clients); + int ret; + + INIT_LIST_HEAD(&tmp_clients); + ret = ceph_cls_journal_get_mutable_metas(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + &minimum_set, &active_set); + if (ret) + return ret; + + ret = ceph_cls_journal_client_list(journaler->osdc, + &journaler->header_oid, + &journaler->header_oloc, + &journaler->clients_cache, + journaler->splay_width); + if (ret) + return ret; + + mutex_lock(&journaler->meta_lock); + if (!init) { + /* check for advance active_set. */ + need_advance = active_set > journaler->active_set; + if (need_advance) { + journaler->overflowed = false; + journaler->advancing = false; + } + } + + journaler->active_set = active_set; + journaler->minimum_set = minimum_set; + /* + * swap clients with clients_cache. clients in client_cache list is not + * released, then we can reuse them in next refresh() to avoid malloc() and + * free() too frequently. + */ + list_splice_tail_init(&journaler->clients, &tmp_clients); + list_splice_tail_init(&journaler->clients_cache, &journaler->clients); + list_splice_tail_init(&tmp_clients, &journaler->clients_cache); + + /* + * calculate the minimum_commit_set. + * TODO: unregister clients if the commit position is too long behind + * active positions. similar with rbd_journal_max_concurrent_object_sets + * in user space journal. + */ + minimum_commit_set = journaler->active_set; + list_for_each_entry(client, &journaler->clients, node) { + struct ceph_journaler_object_pos *pos; + + list_for_each_entry(pos, &client->object_positions, node) { + u64 object_set = + pos->object_num / journaler->splay_width; + if (object_set < minimum_commit_set) + minimum_commit_set = object_set; + } + + if (!strcmp(client->id, journaler->client_id)) + journaler->client = client; + } + mutex_unlock(&journaler->meta_lock); + + /* + * At this time, the active_set is actually advanced, + * we can flush now. + */ + if (need_advance) + queue_work(journaler->task_wq, &journaler->flush_work); + + /* + * remove set if necessary + */ + if (minimum_commit_set > minimum_set) { + u64 trim_set = minimum_set; + while (trim_set < minimum_commit_set) { + ret = remove_set(journaler, trim_set); + if (ret < 0 && ret != -ENOENT) { + pr_err("failed to trim object_set: %llu", + trim_set); + return ret; + } + trim_set++; + } + + ret = set_minimum_set(journaler, minimum_commit_set); + if (ret < 0) { + pr_err("failed to set minimum set to %llu", + minimum_commit_set); + return ret; + } + } + + return 0; +} + +static void journaler_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) +{ + struct ceph_journaler *journaler = arg; + int ret; + + ret = refresh(journaler, false); + if (ret < 0) + pr_err("%s: failed to refresh journaler: %d", __func__, ret); + + ret = ceph_osdc_notify_ack(journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, notify_id, cookie, + NULL, 0); + if (ret) + pr_err("%s: acknowledge_notify failed: %d", __func__, ret); +} + +static void journaler_watch_errcb(void *arg, u64 cookie, int err) +{ + /* TODO re-watch in watch error. */ + pr_err("%s: journaler watch error: %d", __func__, err); +} + +static int journaler_watch(struct ceph_journaler *journaler) +{ + struct ceph_osd_client *osdc = journaler->osdc; + struct ceph_osd_linger_request *handle; + + handle = ceph_osdc_watch(osdc, &journaler->header_oid, + &journaler->header_oloc, journaler->notify_wq, + journaler_watch_cb, journaler_watch_errcb, + journaler); + if (IS_ERR(handle)) + return PTR_ERR(handle); + + journaler->watch_handle = handle; + return 0; +} + +static void journaler_unwatch(struct ceph_journaler *journaler) +{ + struct ceph_osd_client *osdc = journaler->osdc; + int ret; + + ret = ceph_osdc_unwatch(osdc, journaler->watch_handle); + if (ret) + pr_err("%s: failed to unwatch: %d", __func__, ret); + + journaler->watch_handle = NULL; +} + +int ceph_journaler_open(struct ceph_journaler *journaler) +{ + struct ceph_journaler_client *client, *next_client; + u8 order, splay_width; + int64_t pool_id; + int i, ret; + + ret = ceph_cls_journal_get_immutable_metas( + journaler->osdc, &journaler->header_oid, + &journaler->header_oloc, &order, &splay_width, &pool_id); + if (ret) { + pr_err("failed to get immutable metas."); + goto out; + } + + mutex_lock(&journaler->meta_lock); + /* set the immutable metas. */ + journaler->order = order; + journaler->splay_width = splay_width; + + if (pool_id == -1 || pool_id == journaler->header_oloc.pool) + ceph_oloc_copy(&journaler->data_oloc, &journaler->header_oloc); + else + journaler->data_oloc.pool = pool_id; + + /* initialize ->obj_recorders and ->obj_replayers. */ + journaler->obj_recorders = + kcalloc(journaler->splay_width, sizeof(struct object_recorder), + GFP_NOIO); + if (!journaler->obj_recorders) { + mutex_unlock(&journaler->meta_lock); + goto out; + } + + journaler->obj_replayers = + kcalloc(journaler->splay_width, sizeof(struct object_replayer), + GFP_NOIO); + if (!journaler->obj_replayers) { + mutex_unlock(&journaler->meta_lock); + goto free_recorders; + } + + journaler->obj_pos_pending_array = + kcalloc(journaler->splay_width, + sizeof(struct ceph_journaler_object_pos), GFP_NOIO); + if (!journaler->obj_pos_pending_array) { + mutex_unlock(&journaler->meta_lock); + goto free_replayers; + } + + journaler->obj_pos_committing_array = + kcalloc(journaler->splay_width, + sizeof(struct ceph_journaler_object_pos), GFP_NOIO); + if (!journaler->obj_pos_committing_array) { + mutex_unlock(&journaler->meta_lock); + goto free_pos_pending; + } + + for (i = 0; i < journaler->splay_width; i++) { + struct object_recorder *obj_recorder = + &journaler->obj_recorders[i]; + struct object_replayer *obj_replayer = + &journaler->obj_replayers[i]; + struct ceph_journaler_object_pos *pos_pending = + &journaler->obj_pos_pending_array[i]; + struct ceph_journaler_object_pos *pos_committing = + &journaler->obj_pos_committing_array[i]; + + spin_lock_init(&obj_recorder->lock); + obj_recorder->splay_offset = i; + obj_recorder->inflight_append = 0; + INIT_LIST_HEAD(&obj_recorder->append_list); + INIT_LIST_HEAD(&obj_recorder->overflow_list); + + obj_replayer->object_num = i; + obj_replayer->pos = NULL; + INIT_LIST_HEAD(&obj_replayer->entry_list); + + pos_pending->in_using = false; + INIT_LIST_HEAD(&pos_pending->node); + list_add_tail(&pos_pending->node, &journaler->obj_pos_pending); + + pos_committing->in_using = false; + INIT_LIST_HEAD(&pos_committing->node); + list_add_tail(&pos_committing->node, + &journaler->obj_pos_committing); + } + mutex_unlock(&journaler->meta_lock); + + ret = refresh(journaler, true); + if (ret) + goto free_pos_committing; + + mutex_lock(&journaler->meta_lock); + if (journaler->client) { + copy_pos_list(&journaler->client->object_positions, + &journaler->obj_pos_pending); + } + mutex_unlock(&journaler->meta_lock); + + ret = journaler_watch(journaler); + if (ret) { + pr_err("journaler_watch error: %d", ret); + goto destroy_clients; + } + return 0; + +destroy_clients: + list_for_each_entry_safe(client, next_client, &journaler->clients, + node) { + list_del(&client->node); + destroy_client(client); + } + + list_for_each_entry_safe(client, next_client, + &journaler->clients_cache, node) { + list_del(&client->node); + destroy_client(client); + } +free_pos_committing: + kfree(journaler->obj_pos_committing_array); +free_pos_pending: + kfree(journaler->obj_pos_pending_array); +free_replayers: + kfree(journaler->obj_replayers); +free_recorders: + kfree(journaler->obj_recorders); +out: + return ret; +} +EXPORT_SYMBOL(ceph_journaler_open); + +void ceph_journaler_close(struct ceph_journaler *journaler) +{ + struct ceph_journaler_client *client, *next; + struct commit_entry *commit_entry; + struct entry_tid *entry_tid, *entry_tid_next; + struct ceph_journaler_object_pos *pos, *next_pos; + struct rb_node *n; + int i; + + /* Stop watching and flush pending linger work */ + journaler_unwatch(journaler); + flush_workqueue(journaler->notify_wq); + + /* As we are closing journal, there should not + * be any pending ->flush_work,->overflow_work and finish_work. + */ + flush_delayed_work(&journaler->commit_work); + flush_work(&journaler->notify_update_work); + list_for_each_entry_safe(pos, next_pos, + &journaler->obj_pos_pending, node) + list_del(&pos->node); + + list_for_each_entry_safe(pos, next_pos, + &journaler->obj_pos_committing, node) + list_del(&pos->node); + + journaler->client = NULL; + list_for_each_entry_safe(client, next, &journaler->clients, node) { + list_del(&client->node); + destroy_client(client); + } + list_for_each_entry_safe(client, next, &journaler->clients_cache, + node) { + list_del(&client->node); + destroy_client(client); + } + + for (n = rb_first(&journaler->commit_entries); n;) { + commit_entry = rb_entry(n, struct commit_entry, r_node); + + n = rb_next(n); + erase_commit_entry(&journaler->commit_entries, commit_entry); + kmem_cache_free(journaler_commit_entry_cache, commit_entry); + } + + for (i = 0; i < journaler->splay_width; i++) { + struct object_recorder *obj_recorder = &journaler->obj_recorders[i]; + struct object_replayer *obj_replayer = &journaler->obj_replayers[i]; + + spin_lock(&obj_recorder->lock); + BUG_ON(!list_empty(&obj_recorder->append_list) || + !list_empty(&obj_recorder->overflow_list)); + spin_unlock(&obj_recorder->lock); + + BUG_ON(!list_empty(&obj_replayer->entry_list)); + } + + kfree(journaler->obj_pos_committing_array); + kfree(journaler->obj_pos_pending_array); + kfree(journaler->obj_recorders); + kfree(journaler->obj_replayers); + journaler->obj_recorders = NULL; + journaler->obj_replayers = NULL; + + list_for_each_entry_safe(entry_tid, entry_tid_next, + &journaler->entry_tids, node) { + list_del(&entry_tid->node); + kfree(entry_tid); + } + + WARN_ON(!list_empty(&journaler->finish_list)); + WARN_ON(!list_empty(&journaler->clients)); + WARN_ON(!list_empty(&journaler->clients_cache)); + WARN_ON(!list_empty(&journaler->entry_tids)); + WARN_ON(!list_empty(&journaler->obj_pos_pending)); + WARN_ON(rb_first(&journaler->commit_entries) != NULL); + return; +} +EXPORT_SYMBOL(ceph_journaler_close); + +struct ceph_journaler *ceph_journaler_create(struct ceph_osd_client *osdc, + struct ceph_object_locator *oloc, + const char *journal_id, + const char *client_id) +{ + struct ceph_journaler *journaler; + ssize_t len; + int ret; + + journaler = kzalloc(sizeof(*journaler), GFP_NOIO); + if (!journaler) + return NULL; + + journaler->osdc = osdc; + ceph_oid_init(&journaler->header_oid); + ret = ceph_oid_aprintf(&journaler->header_oid, GFP_NOIO, "%s%s", + JOURNAL_HEADER_PREFIX, journal_id); + if (ret) { + pr_err("aprintf error : %d", ret); + goto err_free_journaler; + } + + ceph_oloc_init(&journaler->header_oloc); + ceph_oloc_copy(&journaler->header_oloc, oloc); + ceph_oloc_init(&journaler->data_oloc); + + /* Init object_oid_prefix */ + len = snprintf(NULL, 0, "%s%lld.%s.", JOURNAL_OBJECT_PREFIX, + journaler->header_oloc.pool, journal_id); + journaler->object_oid_prefix = kzalloc(len + 1, GFP_NOIO); + if (!journaler->object_oid_prefix) + goto err_destroy_data_oloc; + + ret = snprintf(journaler->object_oid_prefix, len + 1, "%s%lld.%s.", + JOURNAL_OBJECT_PREFIX, journaler->header_oloc.pool, + journal_id); + if (ret != len) { + WARN_ON(1); + goto err_free_object_oid_prefix; + } + + journaler->client_id = kstrdup(client_id, GFP_NOIO); + if (!journaler->client_id) { + ret = -ENOMEM; + goto err_free_object_oid_prefix; + } + + mutex_init(&journaler->meta_lock); + mutex_init(&journaler->commit_lock); + spin_lock_init(&journaler->finish_lock); + + INIT_LIST_HEAD(&journaler->finish_list); + INIT_LIST_HEAD(&journaler->clients); + INIT_LIST_HEAD(&journaler->clients_cache); + INIT_LIST_HEAD(&journaler->entry_tids); + INIT_LIST_HEAD(&journaler->obj_pos_pending); + INIT_LIST_HEAD(&journaler->obj_pos_committing); + + journaler->commit_entries = RB_ROOT; + journaler_commit_entry_cache = KMEM_CACHE(commit_entry, 0); + if (!journaler_commit_entry_cache) + goto err_free_client_id; + + journaler_append_ctx_cache = KMEM_CACHE(journaler_append_ctx, 0); + if (!journaler_append_ctx_cache) + goto err_destroy_commit_entry_cache; + journaler->task_wq = + alloc_ordered_workqueue("journaler-tasks", WQ_MEM_RECLAIM); + if (!journaler->task_wq) + goto err_destroy_append_ctx_cache; + journaler->notify_wq = + create_singlethread_workqueue("journaler-notify"); + if (!journaler->notify_wq) + goto err_destroy_task_wq; + + INIT_WORK(&journaler->flush_work, journaler_flush); + INIT_WORK(&journaler->finish_work, journaler_append_finish); + INIT_DELAYED_WORK(&journaler->commit_work, journaler_client_commit); + INIT_WORK(&journaler->notify_update_work, journaler_notify_update); + INIT_WORK(&journaler->overflow_work, journaler_overflow); + + return journaler; +err_destroy_task_wq: + destroy_workqueue(journaler->task_wq); +err_destroy_append_ctx_cache: + kmem_cache_destroy(journaler_append_ctx_cache); +err_destroy_commit_entry_cache: + kmem_cache_destroy(journaler_commit_entry_cache); +err_free_client_id: + kfree(journaler->client_id); +err_free_object_oid_prefix: + kfree(journaler->object_oid_prefix); +err_destroy_data_oloc: + ceph_oloc_destroy(&journaler->data_oloc); + ceph_oloc_destroy(&journaler->header_oloc); + ceph_oid_destroy(&journaler->header_oid); +err_free_journaler: + kfree(journaler); + return NULL; +} +EXPORT_SYMBOL(ceph_journaler_create); + +void ceph_journaler_destroy(struct ceph_journaler *journaler) +{ + destroy_workqueue(journaler->notify_wq); + destroy_workqueue(journaler->task_wq); + + kmem_cache_destroy(journaler_append_ctx_cache); + kmem_cache_destroy(journaler_commit_entry_cache); + kfree(journaler->client_id); + kfree(journaler->object_oid_prefix); + ceph_oloc_destroy(&journaler->data_oloc); + ceph_oloc_destroy(&journaler->header_oloc); + ceph_oid_destroy(&journaler->header_oid); + kfree(journaler); +} +EXPORT_SYMBOL(ceph_journaler_destroy); From patchwork Wed Sep 25 09:07:29 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160229 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id A645214DB for ; Wed, 25 Sep 2019 09:09:20 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 8E6F320673 for ; Wed, 25 Sep 2019 09:09:20 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387480AbfIYJJP (ORCPT ); Wed, 25 Sep 2019 05:09:15 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21232 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728467AbfIYJIl (ORCPT ); Wed, 25 Sep 2019 05:08:41 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S9; Wed, 25 Sep 2019 17:07:39 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 07/12] rbd: introduce completion for each img_request Date: Wed, 25 Sep 2019 09:07:29 +0000 Message-Id: <1569402454-4736-8-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S9 X-Coremail-Antispam: 1Uf129KBjDUn29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73 VFW2AGmfu7bjvjm3AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUJeOJUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbifhs7elrpOTHIDAAAsb Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When we are going to do a sync IO, we need a way to wait a img_request to complete. Example, when we are going to do journal replay, we need to do a sync replaying, and return after img_request completed. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index f1fc28d..8a8914a 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -349,6 +349,9 @@ struct rbd_img_request { struct pending_result pending; struct work_struct work; int work_result; + + struct completion completion; + struct kref kref; }; @@ -1750,6 +1753,7 @@ static struct rbd_img_request *rbd_img_request_create( img_request_layered_set(img_request); INIT_LIST_HEAD(&img_request->lock_item); + init_completion(&img_request->completion); INIT_LIST_HEAD(&img_request->object_extents); mutex_init(&img_request->state_mutex); kref_init(&img_request->kref); @@ -3725,6 +3729,7 @@ static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) } else { struct request *rq = img_req->rq; + complete_all(&img_req->completion); rbd_img_request_put(img_req); blk_mq_end_request(rq, errno_to_blk_status(result)); } From patchwork Wed Sep 25 09:07:30 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160221 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 6030E14DB for ; Wed, 25 Sep 2019 09:09:16 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 3E8B720673 for ; Wed, 25 Sep 2019 09:09:16 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387651AbfIYJJP (ORCPT ); Wed, 25 Sep 2019 05:09:15 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21765 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728774AbfIYJIm (ORCPT ); Wed, 25 Sep 2019 05:08:42 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S10; Wed, 25 Sep 2019 17:07:39 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 08/12] rbd: introduce IMG_REQ_NOLOCK flag for image request state Date: Wed, 25 Sep 2019 09:07:30 +0000 Message-Id: <1569402454-4736-9-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S10 X-Coremail-Antispam: 1Uf129KBjDUn29KB7ZKAUJUUUU8529EdanIXcx71UUUUU7v73 VFW2AGmfu7bjvjm3AaLaJ3UbIYCTnIWIevJa73UjIFyTuYvjfUpXAzUUUUU X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiWxs7elf4pct44QAAsZ Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When we are going to replay an journal event, we don't need to acquire exclusive_lock, as we are in lock acquiring. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8a8914a..6b387d9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -318,6 +318,7 @@ struct rbd_obj_request { enum img_req_flags { IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ + IMG_REQ_NOLOCK, /* don't need exclusive lock and ->lock_rwsem */ }; enum rbd_img_state { @@ -3550,6 +3551,9 @@ static bool need_exclusive_lock(struct rbd_img_request *img_req) if (rbd_dev->spec->snap_id != CEPH_NOSNAP) return false; + if (test_bit(IMG_REQ_NOLOCK, &img_req->flags)) + return false; + rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags)); if (rbd_dev->opts->lock_on_read || (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) From patchwork Wed Sep 25 09:07:31 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160227 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 7855314DB for ; Wed, 25 Sep 2019 09:09:18 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 609E820673 for ; Wed, 25 Sep 2019 09:09:18 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387618AbfIYJJP (ORCPT ); Wed, 25 Sep 2019 05:09:15 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21327 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728532AbfIYJIn (ORCPT ); Wed, 25 Sep 2019 05:08:43 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S11; Wed, 25 Sep 2019 17:07:39 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 09/12] rbd: introduce rbd_journal_allocate_tag to allocate journal tag for rbd client Date: Wed, 25 Sep 2019 09:07:31 +0000 Message-Id: <1569402454-4736-10-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S11 X-Coremail-Antispam: 1Uf129KBjvJXoWxGr4DZFyfurW7XF48Xw45GFg_yoWrXr47pF 1DGryrCrW5Ar17Z3yxAF4rAFZaqry0yryjgasIkwn3K3Z3trZrtF1IkFykJFZFyFW7G3W8 Gr45trW5CrWqk37anT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0JbRo7_UUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbifhs7elrpOTHIEAAAsH Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org rbd_journal_allocate_tag() get the client by client id and allocate an uniq tag for this client. All journal events from this client will be tagged by this tag. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6b387d9..6987259 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -445,6 +446,8 @@ struct rbd_device { atomic_t parent_ref; struct rbd_device *parent; + struct rbd_journal *journal; + /* Block layer tags. */ struct blk_mq_tag_set tag_set; @@ -470,6 +473,11 @@ enum rbd_dev_flags { RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ }; +struct rbd_journal { + struct ceph_journaler *journaler; + u64 tag_tid; +}; + static DEFINE_MUTEX(client_mutex); /* Serialize client creation */ static LIST_HEAD(rbd_dev_list); /* devices */ @@ -6916,6 +6924,94 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +struct rbd_journal_tag_predecessor { + bool commit_valid; + u64 tag_tid; + u64 entry_tid; +}; + +struct rbd_journal_tag_data { + struct rbd_journal_tag_predecessor predecessor; +}; + +static u32 tag_data_encoding_size(struct rbd_journal_tag_data *tag_data) +{ + /* + * sizeof(uuid_len) 4 + uuid_len(0) + 1 commit_valid + 8 tag_tid + + * 8 entry_tid + 4 sizeof(uuid_len) + uuid_len(0) + */ + return (4 + 0 + 1 + 8 + 8 + 4 + 0); +} + +static void predecessor_encode(void **p, void *end, + struct rbd_journal_tag_predecessor *predecessor) +{ + /* Encode mirror uuid, as it's "", let's just encode 0 */ + ceph_encode_32(p, 0); + ceph_encode_8(p, predecessor->commit_valid); + ceph_encode_64(p, predecessor->tag_tid); + ceph_encode_64(p, predecessor->entry_tid); +} + +static void rbd_journal_encode_tag_data(void **p, void *end, + struct rbd_journal_tag_data *tag_data) +{ + /* Encode mirror uuid, as it's "", let's just encode 0 */ + ceph_encode_32(p, 0); + predecessor_encode(p, end, &tag_data->predecessor); +} + +#define LOCAL_CLIENT_ID "" + +static int rbd_journal_allocate_tag(struct rbd_journal *journal) +{ + struct ceph_journaler_tag tag = {}; + struct rbd_journal_tag_data tag_data = {}; + struct ceph_journaler *journaler = journal->journaler; + struct ceph_journaler_client *client; + struct rbd_journal_tag_predecessor *predecessor; + struct ceph_journaler_object_pos *position; + void *orig_buf, *buf, *p, *end; + u32 buf_len; + int ret; + + ret = ceph_journaler_get_cached_client(journaler, LOCAL_CLIENT_ID, + &client); + if (ret) + goto out; + + if (!list_empty(&client->object_positions)) { + position = list_first_entry(&client->object_positions, + struct ceph_journaler_object_pos, + node); + predecessor = &tag_data.predecessor; + predecessor->commit_valid = true; + predecessor->tag_tid = position->tag_tid; + predecessor->entry_tid = position->entry_tid; + } + buf_len = tag_data_encoding_size(&tag_data); + p = kmalloc(buf_len, GFP_KERNEL); + if (!p) { + pr_err("failed to allocate tag data"); + ret = -ENOMEM; + goto out; + } + + end = p + buf_len; + orig_buf = buf = p; + rbd_journal_encode_tag_data(&p, end, &tag_data); + + ret = ceph_journaler_allocate_tag(journaler, 0, buf, buf_len, &tag); + if (ret) + goto free_buf; + + journal->tag_tid = tag.tid; +free_buf: + kfree(orig_buf); +out: + return ret; +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); From patchwork Wed Sep 25 09:07:32 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160225 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id 751F513BD for ; Wed, 25 Sep 2019 09:09:17 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 503DC20673 for ; Wed, 25 Sep 2019 09:09:17 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387724AbfIYJJQ (ORCPT ); Wed, 25 Sep 2019 05:09:16 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21638 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728608AbfIYJJN (ORCPT ); Wed, 25 Sep 2019 05:09:13 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S12; Wed, 25 Sep 2019 17:07:39 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 10/12] rbd: append journal event in image request state machine Date: Wed, 25 Sep 2019 09:07:32 +0000 Message-Id: <1569402454-4736-11-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S12 X-Coremail-Antispam: 1Uf129KBjvJXoW3KrykJr4UGr13Xr13Ww1kuFg_yoWkZr18pw 4rJFW5CrZ8ur12kw4rWa1kXrW3X3y0kFZrWrWvkr9ak3Wvgrn7KF1UKFW3ZrZrXryxGw18 Kr4UX348Cw17KrDanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0Jb_-BiUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiHBw7elpchhMJugAAsh Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org Introduce RBD_IMG_APPEND_JOURNAL and __RBD_IMG_APPEND_JOURNAL in rbd_img_state. When a image request after RBD_IMG_EXCLUSIVE_LOCK, it will go into __RBD_IMG_APPEND_JOURNAL and then RBD_IMG_APPEND_JOURNAL. after that, it then would go into __RBD_IMG_OBJECT_REQUESTS. That means, we will append journal event before send the data object request for image request. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 260 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 259 insertions(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6987259..79929c7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -119,6 +119,7 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_OBJECT_MAP (1ULL<<3) #define RBD_FEATURE_FAST_DIFF (1ULL<<4) #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5) +#define RBD_FEATURE_JOURNALING (1ULL<<6) #define RBD_FEATURE_DATA_POOL (1ULL<<7) #define RBD_FEATURE_OPERATIONS (1ULL<<8) @@ -325,10 +326,17 @@ enum img_req_flags { enum rbd_img_state { RBD_IMG_START = 1, RBD_IMG_EXCLUSIVE_LOCK, + __RBD_IMG_APPEND_JOURNAL, + RBD_IMG_APPEND_JOURNAL, __RBD_IMG_OBJECT_REQUESTS, RBD_IMG_OBJECT_REQUESTS, }; +struct journal_commit_info { + struct list_head node; + u64 commit_tid; +}; + struct rbd_img_request { struct rbd_device *rbd_dev; enum obj_operation_type op_type; @@ -353,6 +361,7 @@ struct rbd_img_request { int work_result; struct completion completion; + struct list_head journal_commit_list; struct kref kref; }; @@ -1764,6 +1773,7 @@ static struct rbd_img_request *rbd_img_request_create( INIT_LIST_HEAD(&img_request->lock_item); init_completion(&img_request->completion); INIT_LIST_HEAD(&img_request->object_extents); + INIT_LIST_HEAD(&img_request->journal_commit_list); mutex_init(&img_request->state_mutex); kref_init(&img_request->kref); @@ -1777,6 +1787,7 @@ static void rbd_img_request_destroy(struct kref *kref) struct rbd_img_request *img_request; struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; + struct journal_commit_info *commit_info, *next_commit_info; img_request = container_of(kref, struct rbd_img_request, kref); @@ -1791,6 +1802,12 @@ static void rbd_img_request_destroy(struct kref *kref) rbd_dev_parent_put(img_request->rbd_dev); } + list_for_each_entry_safe(commit_info, next_commit_info, + &img_request->journal_commit_list, node) { + list_del(&commit_info->node); + kfree(commit_info); + } + if (rbd_img_is_write(img_request)) ceph_put_snap_context(img_request->snapc); @@ -3647,6 +3664,20 @@ static void rbd_img_object_requests(struct rbd_img_request *img_req) } } +static bool rbd_img_need_journal(struct rbd_img_request *img_req) +{ + struct rbd_device *rbd_dev = img_req->rbd_dev; + + if (img_req->op_type == OBJ_OP_READ) + return false; + + if (!(rbd_dev->header.features & RBD_FEATURE_JOURNALING)) + return false; + + return true; +} + +static void rbd_img_journal_append(struct rbd_img_request *img_req); static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) { struct rbd_device *rbd_dev = img_req->rbd_dev; @@ -3673,6 +3704,27 @@ static bool rbd_img_advance(struct rbd_img_request *img_req, int *result) rbd_assert(!need_exclusive_lock(img_req) || __rbd_is_lock_owner(rbd_dev)); + if (!rbd_img_need_journal(img_req)) { + img_req->state = RBD_IMG_APPEND_JOURNAL; + goto again; + } + + rbd_img_journal_append(img_req); + if (!img_req->pending.num_pending) { + *result = img_req->pending.result; + img_req->state = RBD_IMG_OBJECT_REQUESTS; + goto again; + } + img_req->state = __RBD_IMG_APPEND_JOURNAL; + return false; + case __RBD_IMG_APPEND_JOURNAL: + if (!pending_result_dec(&img_req->pending, result)) + return false; + /* fall through */ + case RBD_IMG_APPEND_JOURNAL: + if (*result) + return true; + rbd_img_object_requests(img_req); if (!img_req->pending.num_pending) { *result = img_req->pending.result; @@ -3741,9 +3793,22 @@ static void rbd_img_handle_request(struct rbd_img_request *img_req, int result) } else { struct request *rq = img_req->rq; + if (!result) { + struct journal_commit_info *commit_info; + + list_for_each_entry(commit_info, + &img_req->journal_commit_list, + node) { + ceph_journaler_client_committed( + img_req->rbd_dev->journal->journaler, + commit_info->commit_tid); + } + } + complete_all(&img_req->completion); rbd_img_request_put(img_req); - blk_mq_end_request(rq, errno_to_blk_status(result)); + if (rq) + blk_mq_end_request(rq, errno_to_blk_status(result)); } } @@ -6924,6 +6989,199 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) return ret; } +enum rbd_journal_event_type { + EVENT_TYPE_AIO_DISCARD = 0, + EVENT_TYPE_AIO_WRITE = 1, + EVENT_TYPE_AIO_FLUSH = 2, + EVENT_TYPE_OP_FINISH = 3, + EVENT_TYPE_SNAP_CREATE = 4, + EVENT_TYPE_SNAP_REMOVE = 5, + EVENT_TYPE_SNAP_RENAME = 6, + EVENT_TYPE_SNAP_PROTECT = 7, + EVENT_TYPE_SNAP_UNPROTECT = 8, + EVENT_TYPE_SNAP_ROLLBACK = 9, + EVENT_TYPE_RENAME = 10, + EVENT_TYPE_RESIZE = 11, + EVENT_TYPE_FLATTEN = 12, + EVENT_TYPE_DEMOTE_PROMOTE = 13, + EVENT_TYPE_SNAP_LIMIT = 14, + EVENT_TYPE_UPDATE_FEATURES = 15, + EVENT_TYPE_METADATA_SET = 16, + EVENT_TYPE_METADATA_REMOVE = 17, + EVENT_TYPE_AIO_WRITESAME = 18, + EVENT_TYPE_AIO_COMPARE_AND_WRITE = 19, +}; + + +/* + * RBD_EVENT_FIXED_SIZE(10 = CEPH_ENCODING_START_BLK_LEN(6) + EVENT_TYPE(4)) + */ +#define RBD_EVENT_FIXED_SIZE 10 + +static void rbd_journal_callback(struct ceph_journaler_ctx *journaler_ctx) +{ + struct rbd_img_request *img_req = journaler_ctx->priv; + int result = journaler_ctx->result; + + ceph_journaler_ctx_free(journaler_ctx); + rbd_img_handle_request(img_req, result); +} + +static void img_journal_append_write_event(struct rbd_img_request *img_req) +{ + struct rbd_journal *journal = img_req->rbd_dev->journal; + struct journal_commit_info *commit_info; + struct ceph_journaler_ctx *journaler_ctx; + u64 offset = (u64)blk_rq_pos(img_req->rq) << SECTOR_SHIFT; + u64 length = blk_rq_bytes(img_req->rq); + /* RBD_EVENT_FIXED_SIZE + offset(8) + length(8) + string_len(4) */ + u64 prefix_len = RBD_EVENT_FIXED_SIZE + 20; + u64 max_append_size = ceph_journaler_get_max_append_size(journal->journaler) - prefix_len; + u64 append_size = min(max_append_size, length); + u64 bio_offset = 0; + void *p; + int ret; + + while (length > 0) { + journaler_ctx = ceph_journaler_ctx_alloc(); + if (!journaler_ctx) { + img_req->pending.result = -ENOMEM; + return; + } + + commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO); + if (!commit_info) { + ceph_journaler_ctx_free(journaler_ctx); + img_req->pending.result = -ENOMEM; + return; + } + INIT_LIST_HEAD(&commit_info->node); + + journaler_ctx->bio_iter.bio = img_req->rq->bio; + journaler_ctx->bio_iter.iter = img_req->rq->bio->bi_iter; + ceph_bio_iter_advance(&journaler_ctx->bio_iter, bio_offset); + + append_size = min(max_append_size, length); + journaler_ctx->bio_len = append_size; + + journaler_ctx->prefix_len = prefix_len; + journaler_ctx->prefix_offset = PAGE_SIZE - journaler_ctx->prefix_len; + + p = page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset; + ceph_start_encoding(&p, 1, 1, + journaler_ctx->prefix_len + journaler_ctx->bio_len - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, EVENT_TYPE_AIO_WRITE); + ceph_encode_64(&p, offset); + ceph_encode_64(&p, append_size); + /* first part of ceph_encode_string(); */ + ceph_encode_32(&p, journaler_ctx->bio_len); + + offset += append_size; + length -= append_size; + bio_offset += append_size; + + journaler_ctx->priv = img_req; + journaler_ctx->callback = rbd_journal_callback; + + ret = ceph_journaler_append(journal->journaler, + journal->tag_tid, journaler_ctx); + if (ret) { + kfree(commit_info); + ceph_journaler_ctx_free(journaler_ctx); + img_req->pending.result = ret; + return; + } + commit_info->commit_tid = journaler_ctx->commit_tid; + list_add_tail(&commit_info->node, + &img_req->journal_commit_list); + img_req->pending.num_pending++; + } +} + +static void img_journal_append_discard_event(struct rbd_img_request *img_req, + bool skip_partial_discard) +{ + struct rbd_journal *journal = img_req->rbd_dev->journal; + struct journal_commit_info *commit_info; + struct ceph_journaler_ctx *journaler_ctx; + u64 offset = (u64)blk_rq_pos(img_req->rq) << SECTOR_SHIFT; + u64 length = blk_rq_bytes(img_req->rq); + struct timespec64 mtime; + void *p; + int ret; + + journaler_ctx = ceph_journaler_ctx_alloc(); + if (!journaler_ctx) { + img_req->pending.result = -ENOMEM; + return; + } + + commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO); + if (!commit_info) { + ceph_journaler_ctx_free(journaler_ctx); + img_req->pending.result = -ENOMEM; + return; + } + INIT_LIST_HEAD(&commit_info->node); + + journaler_ctx->bio_iter.bio = img_req->rq->bio; + journaler_ctx->bio_iter.iter = img_req->rq->bio->bi_iter; + journaler_ctx->bio_len = 0; + + /* RBD_EVENT_FIXED_SIZE + offset(8) + length(8) + bool(1)= 27 */ + journaler_ctx->prefix_len = RBD_EVENT_FIXED_SIZE + 17; + journaler_ctx->prefix_offset = PAGE_SIZE - journaler_ctx->prefix_len; + + p = page_address(journaler_ctx->prefix_page) + journaler_ctx->prefix_offset; + ceph_start_encoding(&p, 4, 1, journaler_ctx->prefix_len - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, EVENT_TYPE_AIO_DISCARD); + ceph_encode_64(&p, offset); + ceph_encode_64(&p, length); + ceph_encode_8(&p, skip_partial_discard); + + /* encode metadata */ + journaler_ctx->suffix_len = CEPH_ENCODING_START_BLK_LEN + sizeof(struct ceph_timespec); + p = page_address(journaler_ctx->suffix_page); + ceph_start_encoding(&p, 1, 1, journaler_ctx->suffix_len - CEPH_ENCODING_START_BLK_LEN); + ktime_get_real_ts64(&mtime); + ceph_encode_timespec64(p, &mtime); + + journaler_ctx->priv = img_req; + journaler_ctx->callback = rbd_journal_callback; + + ret = ceph_journaler_append(journal->journaler, journal->tag_tid, + journaler_ctx); + if (ret) { + kfree(commit_info); + ceph_journaler_ctx_free(journaler_ctx); + img_req->pending.result = ret; + return; + } + + commit_info->commit_tid = journaler_ctx->commit_tid; + list_add_tail(&commit_info->node, &img_req->journal_commit_list); + img_req->pending.num_pending++; +} + +static void rbd_img_journal_append(struct rbd_img_request *img_req) +{ + rbd_assert(!img_req->pending.result && !img_req->pending.num_pending); + + switch (img_req->op_type) { + case OBJ_OP_WRITE: + img_journal_append_write_event(img_req); + break; + case OBJ_OP_ZEROOUT: + img_journal_append_discard_event(img_req, false); + break; + case OBJ_OP_DISCARD: + img_journal_append_discard_event(img_req, true); + break; + default: + img_req->pending.result = -ENOTSUPP; + } +} + struct rbd_journal_tag_predecessor { bool commit_valid; u64 tag_tid; From patchwork Wed Sep 25 09:07:33 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160219 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id E1CAD1800 for ; Wed, 25 Sep 2019 09:09:15 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id CA6BA20673 for ; Wed, 25 Sep 2019 09:09:15 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S2387569AbfIYJJP (ORCPT ); Wed, 25 Sep 2019 05:09:15 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:21512 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728572AbfIYJIn (ORCPT ); Wed, 25 Sep 2019 05:08:43 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S13; Wed, 25 Sep 2019 17:07:40 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 11/12] rbd: replay events in journal Date: Wed, 25 Sep 2019 09:07:33 +0000 Message-Id: <1569402454-4736-12-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S13 X-Coremail-Antispam: 1Uf129KBjvJXoWxtF17Jr43AF1UKr1kJrW5KFg_yoW3Wr47pF W5GFWYkrZ5WF129rs3Gr4rZryaq397AFZrWry7Kr129rnxGwn29F1FkFyavrW3ZFWxu34D KF4qq3ZFgF1qgFDanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0JbMYLgUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiXxw7eli2k8jLgAAAs8 Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org when we found uncommitted events in journal, we need to do a replay. This commit only implement three kinds of events replaying: EVENT_TYPE_AIO_DISCARD: Will send a img_request to image with OBJ_OP_ZEROOUT, and wait for it completed. EVENT_TYPE_AIO_WRITE: Will send a img_request to image with OBJ_OP_WRITE, and wait for it completed. EVENT_TYPE_AIO_FLUSH: As all other events are replayed in synchoronized way, that means the events before are all flushed. we did nothing for this event. Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 236 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 79929c7..e8c2446 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -7182,6 +7182,242 @@ static void rbd_img_journal_append(struct rbd_img_request *img_req) } } +static int setup_write_bvecs(void *buf, u64 offset, + u64 length, struct bio_vec **bvecs, + u32 *bvec_count) +{ + u32 i; + u32 off; + + *bvec_count = calc_pages_for(offset, length); + *bvecs = kcalloc(*bvec_count, sizeof(**bvecs), GFP_NOIO); + if (!(*bvecs)) + return -ENOMEM; + + div_u64_rem(offset, PAGE_SIZE, &off); + for (i = 0; i < *bvec_count; i++) { + u32 len = min(length, (u64)PAGE_SIZE - off); + + (*bvecs)[i].bv_page = alloc_page(GFP_NOIO); + if (!(*bvecs)[i].bv_page) + return -ENOMEM; + + (*bvecs)[i].bv_offset = off; + (*bvecs)[i].bv_len = len; + memcpy(page_address((*bvecs)[i].bv_page) + (*bvecs)[i].bv_offset, buf, + (*bvecs)[i].bv_len); + length -= len; + buf += len; + off = 0; + } + rbd_assert(!length); + return 0; +} + +static int rbd_journal_handle_aio_write(struct rbd_device *rbd_dev, void **p, + void *end, u8 struct_v, u64 commit_tid) +{ + struct ceph_snap_context *snapc; + struct rbd_img_request *img_request; + struct journal_commit_info *commit_info; + struct ceph_file_extent ex; + struct bio_vec *bvecs = NULL; + u32 bvec_count; + ssize_t data_len; + u64 offset, length; + int result; + + /* get offset and length */ + offset = ceph_decode_64(p); + length = ceph_decode_64(p); + /* get data_len and check the buffer length */ + data_len = ceph_decode_32(p); + if (!ceph_has_room(p, end, data_len)) { + pr_err("our of range"); + return -ERANGE; + } + rbd_assert(length == data_len); + /* Create a new image request for writing */ + snapc = rbd_dev->header.snapc; + ceph_get_snap_context(snapc); + img_request = rbd_img_request_create(rbd_dev, OBJ_OP_WRITE, snapc); + if (!img_request) { + result = -ENOMEM; + goto err; + } + /* Allocate a new commit_info for this journal replay */ + commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO); + if (!commit_info) { + result = -ENOMEM; + rbd_img_request_put(img_request); + goto err; + } + INIT_LIST_HEAD(&commit_info->node); + + /* Don't down ->lock_rwsem in __rbd_img_handle_request */ + __set_bit(IMG_REQ_NOLOCK, &img_request->flags); + commit_info->commit_tid = commit_tid; + list_add_tail(&commit_info->node, &img_request->journal_commit_list); + snapc = NULL; /* img_request consumes a ref */ + + result = setup_write_bvecs(*p, offset, length, &bvecs, &bvec_count); + if (result) { + rbd_warn(rbd_dev, "failed to setup bvecs."); + rbd_img_request_put(img_request); + goto err; + } + /* Skip data for this event */ + *p = (char *)*p + data_len; + ex.fe_off = offset; + ex.fe_len = length; + result = rbd_img_fill_from_bvecs(img_request, &ex, 1, bvecs); + if (result) { + rbd_img_request_put(img_request); + goto err; + } + /* + * As we are doing journal replaying in exclusive-lock aqcuiring, + * we don't need to aquire exclusive-lock for this writing, so + * set the ->state to RBD_IMG_APPEND_JOURNAL to skip exclusive-lock + * acquiring in state machine + */ + img_request->state = RBD_IMG_APPEND_JOURNAL; + rbd_img_handle_request(img_request, 0); + result = wait_for_completion_interruptible(&img_request->completion); +err: + if (bvecs) { + int i; + + for (i = 0; i < bvec_count; i++) { + if (bvecs[i].bv_page) + __free_page(bvecs[i].bv_page); + } + kfree(bvecs); + } + return result; +} + +static int rbd_journal_handle_aio_discard(struct rbd_device *rbd_dev, void **p, + void *end, u8 struct_v, + u64 commit_tid) +{ + struct rbd_img_request *img_request; + struct ceph_snap_context *snapc; + struct journal_commit_info *commit_info; + enum obj_operation_type op_type; + u64 offset, length; + bool skip_partial_discard = true; + int result; + + /* get offset and length*/ + offset = ceph_decode_64(p); + length = ceph_decode_64(p); + if (struct_v >= 4) + skip_partial_discard = ceph_decode_8(p); + if (struct_v >= 5) { + /* skip discard_granularity_bytes */ + ceph_decode_32(p); + } + + snapc = rbd_dev->header.snapc; + ceph_get_snap_context(snapc); + /* + * When the skip_partial_discard is false, we need to guarantee + * that the data range would be zeroout-ed. Otherwise, we don't + * guarantee the range discarded would be zero filled. + */ + if (skip_partial_discard) + op_type = OBJ_OP_DISCARD; + else + op_type = OBJ_OP_ZEROOUT; + + img_request = rbd_img_request_create(rbd_dev, op_type, snapc); + if (!img_request) { + result = -ENOMEM; + goto err; + } + /* Allocate a commit_info for this image request */ + commit_info = kzalloc(sizeof(*commit_info), GFP_NOIO); + if (!commit_info) { + result = -ENOMEM; + rbd_img_request_put(img_request); + goto err; + } + INIT_LIST_HEAD(&commit_info->node); + + /* Don't down ->lock_rwsem in __rbd_img_handle_request */ + __set_bit(IMG_REQ_NOLOCK, &img_request->flags); + commit_info->commit_tid = commit_tid; + list_add_tail(&commit_info->node, &img_request->journal_commit_list); + result = rbd_img_fill_nodata(img_request, offset, length); + if (result) { + rbd_img_request_put(img_request); + goto err; + } + + img_request->state = RBD_IMG_APPEND_JOURNAL; + rbd_img_handle_request(img_request, 0); + result = wait_for_completion_interruptible(&img_request->completion); +err: + return result; +} + +static int rbd_journal_replay(void *entry_handler, + struct ceph_journaler_entry *entry, + u64 commit_tid) +{ + struct rbd_device *rbd_dev = entry_handler; + void *data = entry->data; + void **p = &data; + void *end = *p + entry->data_len; + u32 event_type; + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 1, "rbd_decode_entry", &struct_v, + &struct_len); + if (ret) + return -EINVAL; + + event_type = ceph_decode_32(p); + switch (event_type) { + case EVENT_TYPE_AIO_WRITE: + ret = rbd_journal_handle_aio_write(rbd_dev, p, end, struct_v, + commit_tid); + break; + case EVENT_TYPE_AIO_DISCARD: + ret = rbd_journal_handle_aio_discard(rbd_dev, p, end, struct_v, + commit_tid); + break; + case EVENT_TYPE_AIO_FLUSH: + /* + * As the all event replaying are synchronized, we don't + * need any more work for replaying flush event. Just + * commit it. + */ + ceph_journaler_client_committed(rbd_dev->journal->journaler, + commit_tid); + break; + default: + rbd_warn(rbd_dev, "unknown event_type: %u", event_type); + return -EINVAL; + } + + if (struct_v >= 4) { + u8 meta_struct_v; + u32 meta_struct_len; + + /* skip metadata */ + ret = ceph_start_decoding(p, end, 1, "decode_metadata", + &meta_struct_v, &meta_struct_len); + if (ret) + return ret; + *p += meta_struct_len; + } + return ret; +} + struct rbd_journal_tag_predecessor { bool commit_valid; u64 tag_tid; From patchwork Wed Sep 25 09:07:34 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Dongsheng Yang X-Patchwork-Id: 11160223 Return-Path: Received: from mail.kernel.org (pdx-korg-mail-1.web.codeaurora.org [172.30.200.123]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id A7D5A17D4 for ; Wed, 25 Sep 2019 09:09:16 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 90FC520673 for ; Wed, 25 Sep 2019 09:09:16 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728532AbfIYJJP (ORCPT ); Wed, 25 Sep 2019 05:09:15 -0400 Received: from m97138.mail.qiye.163.com ([220.181.97.138]:22135 "EHLO m97138.mail.qiye.163.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728842AbfIYJIp (ORCPT ); Wed, 25 Sep 2019 05:08:45 -0400 Received: from atest-guest.localdomain (unknown [218.94.118.90]) by smtp9 (Coremail) with SMTP id u+CowAD3dl9YLotdHDhSAg--.166S14; Wed, 25 Sep 2019 17:07:40 +0800 (CST) From: Dongsheng Yang To: idryomov@gmail.com, jdillama@redhat.com Cc: ceph-devel@vger.kernel.org, Dongsheng Yang Subject: [PATCH v4 12/12] rbd: add support for feature of RBD_FEATURE_JOURNALING Date: Wed, 25 Sep 2019 09:07:34 +0000 Message-Id: <1569402454-4736-13-git-send-email-dongsheng.yang@easystack.cn> X-Mailer: git-send-email 1.8.3.1 In-Reply-To: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> References: <1569402454-4736-1-git-send-email-dongsheng.yang@easystack.cn> X-CM-TRANSID: u+CowAD3dl9YLotdHDhSAg--.166S14 X-Coremail-Antispam: 1Uf129KBjvJXoWxZw1fCF4kWry3uFyrJF13Arb_yoWrCr4DpF W8JF9YyrWUZr1xCw4fXrs8JrWYga10y34DWr9rCrn2k3Z3Jrnrta4IkFyUJ3y7JFyUGa1k Jr45J3yUCa1UtrDanT9S1TB71UUUUU7qnTZGkaVYY2UrUUUUjbIjqfuFe4nvWSU5nxnvy2 9KBjDUYxBIdaVFxhVjvjDU0xZFpf9x0JbKMKAUUUUU= X-Originating-IP: [218.94.118.90] X-CM-SenderInfo: 5grqw2pkhqwhp1dqwq5hdv52pwdfyhdfq/1tbiVRw7elf4pXSmVgAAsG Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org Allow user to map rbd images with journaling enabled, but there is a warning in demsg: WARNING: kernel journaling is EXPERIMENTAL! Signed-off-by: Dongsheng Yang --- drivers/block/rbd.c | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e8c2446..5bb98f5 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -129,6 +129,7 @@ static int atomic_dec_return_safe(atomic_t *v) RBD_FEATURE_OBJECT_MAP | \ RBD_FEATURE_FAST_DIFF | \ RBD_FEATURE_DEEP_FLATTEN | \ + RBD_FEATURE_JOURNALING | \ RBD_FEATURE_DATA_POOL | \ RBD_FEATURE_OPERATIONS) @@ -3863,6 +3864,7 @@ static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie) /* * lock_rwsem must be held for write */ +static int rbd_dev_open_journal(struct rbd_device *rbd_dev); static int rbd_lock(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; @@ -4206,6 +4208,12 @@ static int rbd_post_acquire_action(struct rbd_device *rbd_dev) return ret; } + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) { + ret = rbd_dev_open_journal(rbd_dev); + if (ret) + return ret; + } + return 0; } @@ -4336,10 +4344,14 @@ static bool rbd_quiesce_lock(struct rbd_device *rbd_dev) return true; } +static void rbd_dev_close_journal(struct rbd_device *rbd_dev); static void rbd_pre_release_action(struct rbd_device *rbd_dev) { if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) rbd_object_map_close(rbd_dev); + + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) + rbd_dev_close_journal(rbd_dev); } static void __rbd_release_lock(struct rbd_device *rbd_dev) @@ -7506,6 +7518,85 @@ static int rbd_journal_allocate_tag(struct rbd_journal *journal) return ret; } +static int rbd_journal_open(struct rbd_journal *journal) +{ + struct ceph_journaler *journaler = journal->journaler; + int ret; + + ret = ceph_journaler_open(journaler); + if (ret) + goto out; + + ret = ceph_journaler_start_replay(journaler); + if (ret) + goto err_close_journaler; + + ret = rbd_journal_allocate_tag(journal); + if (ret) + goto err_close_journaler; + return ret; + +err_close_journaler: + ceph_journaler_close(journaler); +out: + return ret; +} + +static int rbd_dev_open_journal(struct rbd_device *rbd_dev) +{ + struct rbd_journal *journal; + struct ceph_journaler *journaler; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int ret; + + /* create journal */ + rbd_assert(!rbd_dev->journal); + + journal = kzalloc(sizeof(*journal), GFP_KERNEL); + if (!journal) + return -ENOMEM; + + journaler = ceph_journaler_create(osdc, &rbd_dev->header_oloc, + rbd_dev->spec->image_id, + LOCAL_CLIENT_ID); + if (!journaler) { + ret = -ENOMEM; + goto err_free_journal; + } + + /* journal init */ + journaler->entry_handler = rbd_dev; + journaler->handle_entry = rbd_journal_replay; + + journal->journaler = journaler; + rbd_dev->journal = journal; + + /* journal open */ + ret = rbd_journal_open(rbd_dev->journal); + if (ret) + goto err_destroy_journaler; + + return ret; +err_destroy_journaler: + ceph_journaler_destroy(journaler); +err_free_journal: + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; + return ret; +} + +static void rbd_dev_close_journal(struct rbd_device *rbd_dev) +{ + rbd_assert(rbd_dev->journal); + + ceph_journaler_close(rbd_dev->journal->journaler); + rbd_dev->journal->tag_tid = 0; + + ceph_journaler_destroy(rbd_dev->journal->journaler); + kfree(rbd_dev->journal); + rbd_dev->journal = NULL; +} + static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); @@ -7595,6 +7686,10 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_probe; } + if (rbd_dev->header.features & RBD_FEATURE_JOURNALING) + rbd_warn(rbd_dev, + "WARNING: kernel rbd journaling is EXPERIMENTAL!"); + ret = rbd_dev_probe_parent(rbd_dev, depth); if (ret) goto err_out_probe;