diff mbox

[2/2] rbd: issue a copyup for layered writes

Message ID 5171CA53.70406@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Elder April 19, 2013, 10:50 p.m. UTC
This implements the main copyup functionality for layered writes.

Here we add a copyup_pages field to the object request, which is
used only for copyup requests to keep track of the page array
containing data read from the parent image.

A copyup request is the only request rbd has that requires two osd
operations.  Because of this we handle copyup specially.  All image
object requests get an osd request allocated when they are created.
For a write request, if a copyup is copyup is required, the osd
request origainlly allocated is released, and a new one (with room
for two osd ops) is allocated to replace it.  A new function
rbd_osd_req_create_copyup() allocates an osd request suitable for
a copyup request.

The first op is then filled with a copyup object class method call,
supplying the array of pages containing data read from the parent.
The second op is filled in with the original write request.

The original request otherwise remains intact, and it describes the
original write request (found in the second osd op).  The presence
of the copyup op is sort of implicit; a non-null copyup_pages field
could be used to distinguish between a "normal" write request and a
request containing both a copyup call and a write.

This resolves:
    http://tracker.ceph.com/issues/3419

Signed-off-by: Alex Elder <elder@inktank.com>
---
 drivers/block/rbd.c |  149
++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 137 insertions(+), 12 deletions(-)

 	 * We support a 64-bit length, but ultimately it has to be
@@ -1601,6 +1602,48 @@ static struct ceph_osd_request *rbd_osd_req_create(
 	return osd_req;
 }

+/*
+ * Create a copyup osd request based on the information in the
+ * object request supplied.  A copyup request has two osd ops,
+ * a copyup method call, and a "normal" write request.
+ */
+static struct ceph_osd_request *
+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
+{
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc;
+	struct rbd_device *rbd_dev;
+	struct ceph_osd_client *osdc;
+	struct ceph_osd_request *osd_req;
+
+	rbd_assert(obj_request_img_data_test(obj_request));
+	img_request = obj_request->img_request;
+	rbd_assert(img_request);
+	rbd_assert(img_request_write_test(img_request));
+
+	/* Allocate and initialize the request, for the two ops */
+
+	snapc = img_request->snapc;
+	rbd_dev = img_request->rbd_dev;
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
+	if (!osd_req)
+		return NULL;	/* ENOMEM */
+
+	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+	osd_req->r_callback = rbd_osd_req_callback;
+	osd_req->r_priv = obj_request;
+
+	osd_req->r_oid_len = strlen(obj_request->object_name);
+	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
+
+	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
+
+	return osd_req;
+}
+
+
 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
 {
 	ceph_osdc_put_request(osd_req);
@@ -1960,11 +2003,49 @@ out_unwind:
 }

 static void
+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
+{
+	struct rbd_img_request *img_request;
+	struct rbd_device *rbd_dev;
+	u64 length;
+	u32 page_count;
+
+	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+	rbd_assert(obj_request_img_data_test(obj_request));
+	img_request = obj_request->img_request;
+	rbd_assert(img_request);
+
+	rbd_dev = img_request->rbd_dev;
+	rbd_assert(rbd_dev);
+	length = (u64)1 << rbd_dev->header.obj_order;
+	page_count = (u32)calc_pages_for(0, length);
+
+	rbd_assert(obj_request->copyup_pages);
+	ceph_release_page_vector(obj_request->copyup_pages, page_count);
+	obj_request->copyup_pages = NULL;
+
+	/*
+	 * We want the transfer count to reflect the size of the
+	 * original write request.  There is no such thing as a
+	 * successful short write, so if the request was successful
+	 * we can just set it to the originally-requested length.
+	 */
+	if (!obj_request->result)
+		obj_request->xferred = obj_request->length;
+
+	/* Finish up with the normal image object callback */
+
+	rbd_img_obj_callback(obj_request);
+}
+
+static void
 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
 {
 	struct rbd_obj_request *orig_request;
+	struct ceph_osd_request *osd_req;
+	struct ceph_osd_client *osdc;
+	struct rbd_device *rbd_dev;
 	struct page **pages;
-	u32 page_count;
 	int result;
 	u64 obj_size;
 	u64 xferred;
@@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct
rbd_img_request *img_request)

 	orig_request = img_request->obj_request;
 	rbd_assert(orig_request != NULL);
-
+	rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
 	result = img_request->result;
 	obj_size = img_request->length;
 	xferred = img_request->xferred;

+	rbd_dev = img_request->rbd_dev;
+	rbd_assert(rbd_dev);
+	rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
+
 	rbd_img_request_put(img_request);

-	obj_request_existence_set(orig_request, true);
+	if (result)
+		goto out_err;
+
+	/* Allocate the new copyup osd request for the original request */

-	page_count = (u32)calc_pages_for(0, obj_size);
-	ceph_release_page_vector(pages, page_count);
+	result = -ENOMEM;
+	rbd_assert(!orig_request->osd_req);
+	osd_req = rbd_osd_req_create_copyup(orig_request);
+	if (!osd_req)
+		goto out_err;
+	orig_request->osd_req = osd_req;
+	orig_request->copyup_pages = pages;

-	/* Resubmit the original request (for now). */
+	/* Initialize the copyup op */

-	orig_request->result = rbd_img_obj_request_submit(orig_request);
-	if (orig_request->result) {
-		obj_request_done_set(orig_request);
-		rbd_obj_request_complete(orig_request);
-	}
+	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
+	osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+						false, false);
+
+	/* Then the original write request op */
+
+	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
+					orig_request->offset,
+					orig_request->length, 0, 0);
+	osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
+					orig_request->length);
+
+	rbd_osd_req_format_write(orig_request);
+
+	/* All set, send it off. */
+
+	orig_request->callback = rbd_img_obj_copyup_callback;
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	result = rbd_obj_request_submit(osdc, orig_request);
+	if (!result)
+		return;
+out_err:
+	/* Record the error code and complete the request */
+
+	orig_request->result = result;
+	orig_request->xferred = 0;
+	obj_request_done_set(orig_request);
+	rbd_obj_request_complete(orig_request);
 }

 /*
@@ -2034,6 +2150,15 @@ static int rbd_img_obj_parent_read_full(struct
rbd_obj_request *obj_request)
 	rbd_assert(rbd_dev->parent != NULL);

 	/*
+	 * First things first.  The original osd request is of no
+	 * use to use any more, we'll need a new one that can hold
+	 * the two ops in a copyup request.  We'll get that later,
+	 * but for now we can release the old one.
+	 */
+	rbd_osd_req_destroy(obj_request->osd_req);
+	obj_request->osd_req = NULL;
+
+	/*
 	 * Determine the byte range covered by the object in the
 	 * child image to which the original request was to be sent.
 	 */

Comments

Josh Durgin April 22, 2013, 6:16 p.m. UTC | #1
Alex Elder <elder@inktank.com> wrote:

>This implements the main copyup functionality for layered writes.
>
>Here we add a copyup_pages field to the object request, which is
>used only for copyup requests to keep track of the page array
>containing data read from the parent image.
>
>A copyup request is the only request rbd has that requires two osd
>operations.  Because of this we handle copyup specially.  All image
>object requests get an osd request allocated when they are created.
>For a write request, if a copyup is copyup is required, the osd
>request origainlly allocated is released, and a new one (with room

A couple typos in this sentence, but looks good.
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>

>for two osd ops) is allocated to replace it.  A new function
>rbd_osd_req_create_copyup() allocates an osd request suitable for
>a copyup request.
>
>The first op is then filled with a copyup object class method call,
>supplying the array of pages containing data read from the parent.
>The second op is filled in with the original write request.
>
>The original request otherwise remains intact, and it describes the
>original write request (found in the second osd op).  The presence
>of the copyup op is sort of implicit; a non-null copyup_pages field
>could be used to distinguish between a "normal" write request and a
>request containing both a copyup call and a write.
>
>This resolves:
>    http://tracker.ceph.com/issues/3419
>
>Signed-off-by: Alex Elder <elder@inktank.com>
>---
> drivers/block/rbd.c |  149
>++++++++++++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 137 insertions(+), 12 deletions(-)
>
>diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
>index c5d0619..12f7a5f 100644
>--- a/drivers/block/rbd.c
>+++ b/drivers/block/rbd.c
>@@ -218,6 +218,7 @@ struct rbd_obj_request {
> 			u32		page_count;
> 		};
> 	};
>+	struct page		**copyup_pages;
>
> 	struct ceph_osd_request	*osd_req;
>
>@@ -1498,7 +1499,7 @@ static void rbd_osd_req_callback(struct
>ceph_osd_request *osd_req,
> 		obj_request->result = osd_req->r_result;
>	obj_request->version =
>le64_to_cpu(osd_req->r_reassert_version.version);
>
>-	WARN_ON(osd_req->r_num_ops != 1);	/* For now */
>+	BUG_ON(osd_req->r_num_ops > 2);
>
> 	/*
> 	 * We support a 64-bit length, but ultimately it has to be
>@@ -1601,6 +1602,48 @@ static struct ceph_osd_request
>*rbd_osd_req_create(
> 	return osd_req;
> }
>
>+/*
>+ * Create a copyup osd request based on the information in the
>+ * object request supplied.  A copyup request has two osd ops,
>+ * a copyup method call, and a "normal" write request.
>+ */
>+static struct ceph_osd_request *
>+rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
>+{
>+	struct rbd_img_request *img_request;
>+	struct ceph_snap_context *snapc;
>+	struct rbd_device *rbd_dev;
>+	struct ceph_osd_client *osdc;
>+	struct ceph_osd_request *osd_req;
>+
>+	rbd_assert(obj_request_img_data_test(obj_request));
>+	img_request = obj_request->img_request;
>+	rbd_assert(img_request);
>+	rbd_assert(img_request_write_test(img_request));
>+
>+	/* Allocate and initialize the request, for the two ops */
>+
>+	snapc = img_request->snapc;
>+	rbd_dev = img_request->rbd_dev;
>+	osdc = &rbd_dev->rbd_client->client->osdc;
>+	osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
>+	if (!osd_req)
>+		return NULL;	/* ENOMEM */
>+
>+	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
>+	osd_req->r_callback = rbd_osd_req_callback;
>+	osd_req->r_priv = obj_request;
>+
>+	osd_req->r_oid_len = strlen(obj_request->object_name);
>+	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
>+	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
>+
>+	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
>+
>+	return osd_req;
>+}
>+
>+
> static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
> {
> 	ceph_osdc_put_request(osd_req);
>@@ -1960,11 +2003,49 @@ out_unwind:
> }
>
> static void
>+rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
>+{
>+	struct rbd_img_request *img_request;
>+	struct rbd_device *rbd_dev;
>+	u64 length;
>+	u32 page_count;
>+
>+	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
>+	rbd_assert(obj_request_img_data_test(obj_request));
>+	img_request = obj_request->img_request;
>+	rbd_assert(img_request);
>+
>+	rbd_dev = img_request->rbd_dev;
>+	rbd_assert(rbd_dev);
>+	length = (u64)1 << rbd_dev->header.obj_order;
>+	page_count = (u32)calc_pages_for(0, length);
>+
>+	rbd_assert(obj_request->copyup_pages);
>+	ceph_release_page_vector(obj_request->copyup_pages, page_count);
>+	obj_request->copyup_pages = NULL;
>+
>+	/*
>+	 * We want the transfer count to reflect the size of the
>+	 * original write request.  There is no such thing as a
>+	 * successful short write, so if the request was successful
>+	 * we can just set it to the originally-requested length.
>+	 */
>+	if (!obj_request->result)
>+		obj_request->xferred = obj_request->length;
>+
>+	/* Finish up with the normal image object callback */
>+
>+	rbd_img_obj_callback(obj_request);
>+}
>+
>+static void
>rbd_img_obj_parent_read_full_callback(struct rbd_img_request
>*img_request)
> {
> 	struct rbd_obj_request *orig_request;
>+	struct ceph_osd_request *osd_req;
>+	struct ceph_osd_client *osdc;
>+	struct rbd_device *rbd_dev;
> 	struct page **pages;
>-	u32 page_count;
> 	int result;
> 	u64 obj_size;
> 	u64 xferred;
>@@ -1979,25 +2060,60 @@ rbd_img_obj_parent_read_full_callback(struct
>rbd_img_request *img_request)
>
> 	orig_request = img_request->obj_request;
> 	rbd_assert(orig_request != NULL);
>-
>+	rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
> 	result = img_request->result;
> 	obj_size = img_request->length;
> 	xferred = img_request->xferred;
>
>+	rbd_dev = img_request->rbd_dev;
>+	rbd_assert(rbd_dev);
>+	rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
>+
> 	rbd_img_request_put(img_request);
>
>-	obj_request_existence_set(orig_request, true);
>+	if (result)
>+		goto out_err;
>+
>+	/* Allocate the new copyup osd request for the original request */
>
>-	page_count = (u32)calc_pages_for(0, obj_size);
>-	ceph_release_page_vector(pages, page_count);
>+	result = -ENOMEM;
>+	rbd_assert(!orig_request->osd_req);
>+	osd_req = rbd_osd_req_create_copyup(orig_request);
>+	if (!osd_req)
>+		goto out_err;
>+	orig_request->osd_req = osd_req;
>+	orig_request->copyup_pages = pages;
>
>-	/* Resubmit the original request (for now). */
>+	/* Initialize the copyup op */
>
>-	orig_request->result = rbd_img_obj_request_submit(orig_request);
>-	if (orig_request->result) {
>-		obj_request_done_set(orig_request);
>-		rbd_obj_request_complete(orig_request);
>-	}
>+	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
>+	osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
>+						false, false);
>+
>+	/* Then the original write request op */
>+
>+	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
>+					orig_request->offset,
>+					orig_request->length, 0, 0);
>+	osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
>+					orig_request->length);
>+
>+	rbd_osd_req_format_write(orig_request);
>+
>+	/* All set, send it off. */
>+
>+	orig_request->callback = rbd_img_obj_copyup_callback;
>+	osdc = &rbd_dev->rbd_client->client->osdc;
>+	result = rbd_obj_request_submit(osdc, orig_request);
>+	if (!result)
>+		return;
>+out_err:
>+	/* Record the error code and complete the request */
>+
>+	orig_request->result = result;
>+	orig_request->xferred = 0;
>+	obj_request_done_set(orig_request);
>+	rbd_obj_request_complete(orig_request);
> }
>
> /*
>@@ -2034,6 +2150,15 @@ static int rbd_img_obj_parent_read_full(struct
>rbd_obj_request *obj_request)
> 	rbd_assert(rbd_dev->parent != NULL);
>
> 	/*
>+	 * First things first.  The original osd request is of no
>+	 * use to use any more, we'll need a new one that can hold
>+	 * the two ops in a copyup request.  We'll get that later,
>+	 * but for now we can release the old one.
>+	 */
>+	rbd_osd_req_destroy(obj_request->osd_req);
>+	obj_request->osd_req = NULL;
>+
>+	/*
> 	 * Determine the byte range covered by the object in the
> 	 * child image to which the original request was to be sent.
> 	 */

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c5d0619..12f7a5f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -218,6 +218,7 @@  struct rbd_obj_request {
 			u32		page_count;
 		};
 	};
+	struct page		**copyup_pages;

 	struct ceph_osd_request	*osd_req;

@@ -1498,7 +1499,7 @@  static void rbd_osd_req_callback(struct
ceph_osd_request *osd_req,
 		obj_request->result = osd_req->r_result;
 	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);

-	WARN_ON(osd_req->r_num_ops != 1);	/* For now */
+	BUG_ON(osd_req->r_num_ops > 2);

 	/*