From patchwork Wed Oct 24 02:53:41 2012 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Alex Elder X-Patchwork-Id: 1635471 Return-Path: X-Original-To: patchwork-ceph-devel@patchwork.kernel.org Delivered-To: patchwork-process-083081@patchwork2.kernel.org Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by patchwork2.kernel.org (Postfix) with ESMTP id 352A1DF283 for ; Wed, 24 Oct 2012 02:53:46 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S934271Ab2JXCxp (ORCPT ); Tue, 23 Oct 2012 22:53:45 -0400 Received: from mail-ia0-f174.google.com ([209.85.210.174]:59043 "EHLO mail-ia0-f174.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S934265Ab2JXCxo (ORCPT ); Tue, 23 Oct 2012 22:53:44 -0400 Received: by mail-ia0-f174.google.com with SMTP id y32so41665iag.19 for ; Tue, 23 Oct 2012 19:53:43 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=message-id:date:from:user-agent:mime-version:to:cc:subject :content-type:content-transfer-encoding:x-gm-message-state; bh=hU8jG+J1fyH5UeGQyl5gkduExedoEiTZWkJrTuKdYKk=; b=bfJpXV2c3ftgUPcIgFBFIsfjTwAPXxq24/uM5JykWBllCohO2M48qPSAyfVhJOSAti Ab8lVss9Ams0X8sQ/RS5TUkrOMTKtGAbnuO2aFP4dI04s0fMO4A/hx59uAsxB8JTs9Oa aGkI0aqGfOvDcSG8sfPqN0TJze2zxDlUUNKuk4WTdRbjQMhKjE8lB01xdCUfCkCw8XQ7 rCJpPmZp9rs3fFT1lLlhNubrLLvzOstBnz+Y6j8hYOiSLDw0Q15VLlLyJpZiMu44tLPl KXCPLOjfh9yaW3EMEVjSZUHGLEn0Obo690aWDFIfui0d0k8m0prLBdR0/P/K0aLuO4qL NfSQ== Received: by 10.50.185.168 with SMTP id fd8mr1094576igc.51.1351047223717; Tue, 23 Oct 2012 19:53:43 -0700 (PDT) Received: from [172.22.22.4] (c-71-195-31-37.hsd1.mn.comcast.net. [71.195.31.37]) by mx.google.com with ESMTPS id o9sm993596igd.7.2012.10.23.19.53.42 (version=SSLv3 cipher=OTHER); Tue, 23 Oct 2012 19:53:42 -0700 (PDT) Message-ID: <50875835.3010206@inktank.com> Date: Tue, 23 Oct 2012 21:53:41 -0500 From: Alex Elder User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:16.0) Gecko/20121011 Thunderbird/16.0.1 MIME-Version: 1.0 To: ceph-devel CC: Guangliang Zhao Subject: [PATCH] rbd: simplify rbd_rq_fn() X-Gm-Message-State: ALoCoQmOVLH75nQFHH/eZki+fWyFtC/LqXwWeB5VKMrHBdCyMJlmmw31694S4D3pYeuZ6t7t5p6Z Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org When processing a request, rbd_rq_fn() makes clones of the bio's in the request's bio chain and submits the results to osd's to be satisfied. If a request bio straddles the boundary between objects backing the rbd image, it must be represented by two cloned bio's, one for the first part (at the end of one object) and one for the second (at the beginning of the next object). This has been handled by a function bio_chain_clone(), which includes an interface only a mother could love, and which has been found to have other problems. This patch defines two new fairly generic bio functions (one which replaces bio_chain_clone()) to help out the situation, and then revises rbd_rq_fn() to make use of them. First, bio_clone_range() clones a portion of a single bio, starting at a given offset within the bio and including only as many bytes as requested. As a convenience, a request to clone the entire bio is passed directly to bio_clone(). Second, bio_chain_clone_range() performs a similar function, producing a chain of cloned bio's covering a sub-range of the source chain. No bio_pair structures are used, and if successful the result will represent exactly the specified range. Using bio_chain_clone_range() makes bio_rq_fn() a little easier to understand, because it avoids the need to pass very much state information between consecutive calls. By avoiding the need to track a bio_pair structure, it also eliminates the problem described here: http://tracker.newdream.net/issues/2933 Note that a block request (and therefore the complete length of a bio chain processed in rbd_rq_fn()) is an unsigned int, while the result of rbd_segment_length() is u64. This change makes this range trunctation explicit, and trips a bug if the the segment boundary is too far off. Signed-off-by: Alex Elder --- drivers/block/rbd.c | 231 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 152 insertions(+), 79 deletions(-) @@ -1014,8 +1081,9 @@ static int rbd_do_request(struct request *rq, req_data->coll_index = coll_index; } - dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name, - (unsigned long long) ofs, (unsigned long long) len); + dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n", + object_name, (unsigned long long) ofs, + (unsigned long long) len, coll, coll_index); osdc = &rbd_dev->rbd_client->client->osdc; req = ceph_osdc_alloc_request(osdc, flags, snapc, ops, @@ -1463,18 +1531,16 @@ static void rbd_rq_fn(struct request_queue *q) { struct rbd_device *rbd_dev = q->queuedata; struct request *rq; - struct bio_pair *bp = NULL; while ((rq = blk_fetch_request(q))) { struct bio *bio; - struct bio *rq_bio, *next_bio = NULL; bool do_write; unsigned int size; - u64 op_size = 0; u64 ofs; int num_segs, cur_seg = 0; struct rbd_req_coll *coll; struct ceph_snap_context *snapc; + unsigned int bio_offset; dout("fetched request\n"); @@ -1486,10 +1552,6 @@ static void rbd_rq_fn(struct request_queue *q) /* deduce our operation (read, write) */ do_write = (rq_data_dir(rq) == WRITE); - - size = blk_rq_bytes(rq); - ofs = blk_rq_pos(rq) * SECTOR_SIZE; - rq_bio = rq->bio; if (do_write && rbd_dev->mapping.read_only) { __blk_end_request_all(rq, -EROFS); continue; @@ -1512,6 +1574,10 @@ static void rbd_rq_fn(struct request_queue *q) up_read(&rbd_dev->header_rwsem); + size = blk_rq_bytes(rq); + ofs = blk_rq_pos(rq) * SECTOR_SIZE; + bio = rq->bio; + dout("%s 0x%x bytes at 0x%llx\n", do_write ? "write" : "read", size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE); @@ -1531,30 +1597,37 @@ static void rbd_rq_fn(struct request_queue *q) continue; } + bio_offset = 0; do { - /* a bio clone to be passed down to OSD req */ + u64 limit = rbd_segment_length(rbd_dev, ofs, size); + unsigned int chain_size; + struct bio *bio_chain; + + BUG_ON(limit > (u64) UINT_MAX); + chain_size = (unsigned int) limit; dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt); - op_size = rbd_segment_length(rbd_dev, ofs, size); + kref_get(&coll->kref); - bio = bio_chain_clone(&rq_bio, &next_bio, &bp, - op_size, GFP_ATOMIC); - if (bio) + + /* Pass a cloned bio chain via an osd request */ + + bio_chain = bio_chain_clone_range(&bio, + &bio_offset, chain_size, + GFP_ATOMIC); + if (bio_chain) (void) rbd_do_op(rq, rbd_dev, snapc, - ofs, op_size, - bio, coll, cur_seg); + ofs, chain_size, + bio_chain, coll, cur_seg); else rbd_coll_end_req_index(rq, coll, cur_seg, - -ENOMEM, op_size); - size -= op_size; - ofs += op_size; + -ENOMEM, chain_size); + size -= chain_size; + ofs += chain_size; cur_seg++; - rq_bio = next_bio; } while (size > 0); kref_put(&coll->kref, rbd_coll_release); - if (bp) - bio_pair_release(bp); spin_lock_irq(q->queue_lock); ceph_put_snap_context(snapc); @@ -1564,7 +1637,7 @@ static void rbd_rq_fn(struct request_queue *q) /* * a queue callback. Makes sure that we don't create a bio that spans across * multiple osd objects. One exception would be with a single page bios, - * which we handle later at bio_chain_clone + * which we handle later at bio_chain_clone_range() */ static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, struct bio_vec *bvec) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3c131ab..f7fcc96 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -826,77 +826,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs) } /* - * bio_chain_clone - clone a chain of bios up to a certain length. - * might return a bio_pair that will need to be released. + * Clone a portion of a bio, starting at the given byte offset + * and continuing for the number of bytes indicated. */ -static struct bio *bio_chain_clone(struct bio **old, struct bio **next, - struct bio_pair **bp, - int len, gfp_t gfpmask) -{ - struct bio *old_chain = *old; - struct bio *new_chain = NULL; - struct bio *tail; - int total = 0; - - if (*bp) { - bio_pair_release(*bp); - *bp = NULL; - } +static struct bio *bio_clone_range(struct bio *bio_src, + unsigned int offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio_vec *bv; + unsigned int resid; + unsigned short idx; + unsigned int voff; + unsigned short end_idx; + unsigned short vcnt; + struct bio *bio; - while (old_chain && (total < len)) { - struct bio *tmp; + /* Handle the easy case for the caller */ - tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs); - if (!tmp) - goto err_out; - gfpmask &= ~__GFP_WAIT; /* can't wait after the first */ + if (!offset && len == bio_src->bi_size) + return bio_clone(bio_src, gfpmask); - if (total + old_chain->bi_size > len) { - struct bio_pair *bp; + if (WARN_ON_ONCE(!len)) + return NULL; + if (WARN_ON_ONCE(len > bio_src->bi_size)) + return NULL; + if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) + return NULL; - /* - * this split can only happen with a single paged bio, - * split_bio will BUG_ON if this is not the case - */ - dout("bio_chain_clone split! total=%d remaining=%d" - "bi_size=%u\n", - total, len - total, old_chain->bi_size); + /* Find first affected segment... */ - /* split the bio. We'll release it either in the next - call, or it will have to be released outside */ - bp = bio_split(old_chain, (len - total) / SECTOR_SIZE); - if (!bp) - goto err_out; + resid = offset; + __bio_for_each_segment(bv, bio_src, idx, 0) { + if (resid < bv->bv_len) + break; + resid -= bv->bv_len; + } + voff = resid; - __bio_clone(tmp, &bp->bio1); + /* ...and the last affected segment */ - *next = &bp->bio2; - } else { - __bio_clone(tmp, old_chain); - *next = old_chain->bi_next; - } + resid += len; + __bio_for_each_segment(bv, bio_src, end_idx, idx) { + if (resid <= bv->bv_len) + break; + resid -= bv->bv_len; + } + vcnt = end_idx - idx + 1; + + /* Build the clone */ + + bio = bio_alloc(gfpmask, (unsigned int) vcnt); + if (!bio) + return NULL; /* ENOMEM */ - tmp->bi_bdev = NULL; - tmp->bi_next = NULL; - if (new_chain) - tail->bi_next = tmp; - else - new_chain = tmp; - tail = tmp; - old_chain = old_chain->bi_next; + bio->bi_bdev = bio_src->bi_bdev; + bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); + bio->bi_rw = bio_src->bi_rw; + bio->bi_flags |= 1 << BIO_CLONED; - total += tmp->bi_size; + /* + * Copy over our part of the bio_vec, then update the first + * and last (or only) entries. + */ + memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], + vcnt * sizeof (struct bio_vec)); + bio->bi_io_vec[0].bv_offset += voff; + if (vcnt > 1) { + bio->bi_io_vec[0].bv_len -= voff; + bio->bi_io_vec[vcnt - 1].bv_len = resid; + } else { + bio->bi_io_vec[0].bv_len = len; } - rbd_assert(total == len); + bio->bi_vcnt = vcnt; + bio->bi_size = len; + bio->bi_idx = 0; + + return bio; +} + +/* + * Clone a portion of a bio chain, starting at the given byte offset + * into the first bio in the source chain and continuing for the + * number of bytes indicated. The result is another bio chain of + * exactly the given length, or a null pointer on error. + * + * The bio_src and offset parameters are both in-out. On entry they + * refer to the first source bio and the offset into that bio where + * the start of data to be cloned is located. + * + * On return, bio_src is updated to refer to the bio in the source + * chain that contains first un-cloned byte, and *offset will + * contain the offset of that byte within that bio. + */ +static struct bio *bio_chain_clone_range(struct bio **bio_src, + unsigned int *offset, + unsigned int len, + gfp_t gfpmask) +{ + struct bio *bi = *bio_src; + unsigned int off = *offset; + struct bio *chain = NULL; + struct bio **end; + + /* Build up a chain of clone bios up to the limit */ + + if (!bi || off >= bi->bi_size || !len) + return NULL; /* Nothing to clone */ - *old = old_chain; + end = &chain; + while (len) { + unsigned int bi_size; + struct bio *bio; + + if (!bi) + goto out_err; /* EINVAL; ran out of bio's */ + bi_size = min_t(unsigned int, bi->bi_size - off, len); + bio = bio_clone_range(bi, off, bi_size, gfpmask); + if (!bio) + goto out_err; /* ENOMEM */ + + *end = bio; + end = &bio->bi_next; + + off += bi_size; + if (off == bi->bi_size) { + bi = bi->bi_next; + off = 0; + } + len -= bi_size; + } + *bio_src = bi; + *offset = off; - return new_chain; + return chain; +out_err: + bio_chain_put(chain); -err_out: - dout("bio_chain_clone with err\n"); - bio_chain_put(new_chain); return NULL; }