diff mbox

[1/2] rbd: implement full object parent reads

Message ID 5171CA43.5070200@inktank.com (mailing list archive)
State New, archived
Headers show

Commit Message

Alex Elder April 19, 2013, 10:50 p.m. UTC
As a step toward implementing layered writes, implement reading the
data for a target object from the parent image for a write request
whose target object is known to not exist.  Add a copyup_pages field
to an image request to track the page array used (only) for such a
request.

Signed-off-by: Alex Elder <elder@inktank.com>
---
 drivers/block/rbd.c |  152
++++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 143 insertions(+), 9 deletions(-)

 {
 	struct rbd_obj_request *orig_request;
@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct
rbd_obj_request *obj_request)
 		obj_request_existence_set(orig_request, false);
 	} else if (result) {
 		orig_request->result = result;
-		goto out_err;
+		goto out;
 	}

 	/*
@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct
rbd_obj_request *obj_request)
 	 * whether the target object exists.
 	 */
 	orig_request->result = rbd_img_obj_request_submit(orig_request);
-out_err:
+out:
 	if (orig_request->result)
 		rbd_obj_request_complete(orig_request);
 	rbd_obj_request_put(orig_request);
@@ -2070,15 +2200,13 @@ out:
 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request;
+	bool known;

 	rbd_assert(obj_request_img_data_test(obj_request));

 	img_request = obj_request->img_request;
 	rbd_assert(img_request);

-	/* (At the moment we don't care whether it exists or not...) */
-	(void) obj_request_exists_test;
-
 	/*
 	 * Only layered writes need special handling.  If it's not a
 	 * layered write, or it is a layered write but we know the
@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct
rbd_obj_request *obj_request)
 	 */
 	if (!img_request_write_test(img_request) ||
 		!img_request_layered_test(img_request) ||
-		obj_request_known_test(obj_request)) {
+		((known = obj_request_known_test(obj_request)) &&
+			obj_request_exists_test(obj_request))) {

 		struct rbd_device *rbd_dev;
 		struct ceph_osd_client *osdc;
@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct
rbd_obj_request *obj_request)
 	}

 	/*
-	 * It's a layered write and we don't know whether the target
-	 * exists.  Issue existence check; once that completes the
-	 * original request will be submitted again.
+	 * It's a layered write.  The target object might exist but
+	 * we may not know that yet.  If we know it doesn't exist,
+	 * start by reading the data for the full target object from
+	 * the parent so we can use it for a copyup to the target.
 	 */
+	if (known)
+		return rbd_img_obj_parent_read_full(obj_request);
+
+	/* We don't know whether the target exists.  Go find out. */

 	return rbd_img_obj_exists_submit(obj_request);
 }

Comments

Josh Durgin April 22, 2013, 6:13 p.m. UTC | #1
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>

Alex Elder <elder@inktank.com> wrote:
>As a step toward implementing layered writes, implement reading the
>data for a target object from the parent image for a write request
>whose target object is known to not exist.  Add a copyup_pages field
>to an image request to track the page array used (only) for such a
>request.
>
>Signed-off-by: Alex Elder <elder@inktank.com>
>---
> drivers/block/rbd.c |  152
>++++++++++++++++++++++++++++++++++++++++++++++++---
> 1 file changed, 143 insertions(+), 9 deletions(-)
>
>diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
>index 91fcf36..c5d0619 100644
>--- a/drivers/block/rbd.c
>+++ b/drivers/block/rbd.c
>@@ -250,6 +250,7 @@ struct rbd_img_request {
> 		struct request		*rq;		/* block request */
> 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
> 	};
>+	struct page		**copyup_pages;
> 	spinlock_t		completion_lock;/* protects next_completion */
> 	u32			next_completion;
> 	rbd_img_callback_t	callback;
>@@ -350,6 +351,8 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
> static LIST_HEAD(rbd_client_list);		/* clients */
> static DEFINE_SPINLOCK(rbd_client_list_lock);
>
>+static int rbd_img_request_submit(struct rbd_img_request
>*img_request);
>+
> static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
> static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
>
>@@ -1956,6 +1959,133 @@ out_unwind:
> 	return -ENOMEM;
> }
>
>+static void
>+rbd_img_obj_parent_read_full_callback(struct rbd_img_request
>*img_request)
>+{
>+	struct rbd_obj_request *orig_request;
>+	struct page **pages;
>+	u32 page_count;
>+	int result;
>+	u64 obj_size;
>+	u64 xferred;
>+
>+	rbd_assert(img_request_child_test(img_request));
>+
>+	/* First get what we need from the image request */
>+
>+	pages = img_request->copyup_pages;
>+	rbd_assert(pages != NULL);
>+	img_request->copyup_pages = NULL;
>+
>+	orig_request = img_request->obj_request;
>+	rbd_assert(orig_request != NULL);
>+
>+	result = img_request->result;
>+	obj_size = img_request->length;
>+	xferred = img_request->xferred;
>+
>+	rbd_img_request_put(img_request);
>+
>+	obj_request_existence_set(orig_request, true);
>+
>+	page_count = (u32)calc_pages_for(0, obj_size);
>+	ceph_release_page_vector(pages, page_count);
>+
>+	/* Resubmit the original request (for now). */
>+
>+	orig_request->result = rbd_img_obj_request_submit(orig_request);
>+	if (orig_request->result) {
>+		obj_request_done_set(orig_request);
>+		rbd_obj_request_complete(orig_request);
>+	}
>+}
>+
>+/*
>+ * Read from the parent image the range of data that covers the
>+ * entire target of the given object request.  This is used for
>+ * satisfying a layered image write request when the target of an
>+ * object request from the image request does not exist.
>+ *
>+ * A page array big enough to hold the returned data is allocated
>+ * and supplied to rbd_img_request_fill() as the "data descriptor."
>+ * When the read completes, this page array will be transferred to
>+ * the original object request for the copyup operation.
>+ *
>+ * If an error occurs, record it as the result of the original
>+ * object request and mark it done so it gets completed.
>+ */
>+static int rbd_img_obj_parent_read_full(struct rbd_obj_request
>*obj_request)
>+{
>+	struct rbd_img_request *img_request = NULL;
>+	struct rbd_img_request *parent_request = NULL;
>+	struct rbd_device *rbd_dev;
>+	u64 img_offset;
>+	u64 length;
>+	struct page **pages = NULL;
>+	u32 page_count;
>+	int result;
>+
>+	rbd_assert(obj_request_img_data_test(obj_request));
>+	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
>+
>+	img_request = obj_request->img_request;
>+	rbd_assert(img_request != NULL);
>+	rbd_dev = img_request->rbd_dev;
>+	rbd_assert(rbd_dev->parent != NULL);
>+
>+	/*
>+	 * Determine the byte range covered by the object in the
>+	 * child image to which the original request was to be sent.
>+	 */
>+	img_offset = obj_request->img_offset - obj_request->offset;
>+	length = (u64)1 << rbd_dev->header.obj_order;
>+
>+	/*
>+	 * Allocate a page array big enough to receive the data read
>+	 * from the parent.
>+	 */
>+	page_count = (u32)calc_pages_for(0, length);
>+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
>+	if (IS_ERR(pages)) {
>+		result = PTR_ERR(pages);
>+		pages = NULL;
>+		goto out_err;
>+	}
>+
>+	result = -ENOMEM;
>+	parent_request = rbd_img_request_create(rbd_dev->parent,
>+						img_offset, length,
>+						false, true);
>+	if (!parent_request)
>+		goto out_err;
>+	rbd_obj_request_get(obj_request);
>+	parent_request->obj_request = obj_request;
>+
>+	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES,
>pages);
>+	if (result)
>+		goto out_err;
>+	parent_request->copyup_pages = pages;
>+
>+	parent_request->callback = rbd_img_obj_parent_read_full_callback;
>+	result = rbd_img_request_submit(parent_request);
>+	if (!result)
>+		return 0;
>+
>+	parent_request->copyup_pages = NULL;
>+	parent_request->obj_request = NULL;
>+	rbd_obj_request_put(obj_request);
>+out_err:
>+	if (pages)
>+		ceph_release_page_vector(pages, page_count);
>+	if (parent_request)
>+		rbd_img_request_put(parent_request);
>+	obj_request->result = result;
>+	obj_request->xferred = 0;
>+	obj_request_done_set(obj_request);
>+
>+	return result;
>+}
>+
> static void rbd_img_obj_exists_callback(struct rbd_obj_request
>*obj_request)
> {
> 	struct rbd_obj_request *orig_request;
>@@ -1996,7 +2126,7 @@ static void rbd_img_obj_exists_callback(struct
>rbd_obj_request *obj_request)
> 		obj_request_existence_set(orig_request, false);
> 	} else if (result) {
> 		orig_request->result = result;
>-		goto out_err;
>+		goto out;
> 	}
>
> 	/*
>@@ -2004,7 +2134,7 @@ static void rbd_img_obj_exists_callback(struct
>rbd_obj_request *obj_request)
> 	 * whether the target object exists.
> 	 */
> 	orig_request->result = rbd_img_obj_request_submit(orig_request);
>-out_err:
>+out:
> 	if (orig_request->result)
> 		rbd_obj_request_complete(orig_request);
> 	rbd_obj_request_put(orig_request);
>@@ -2070,15 +2200,13 @@ out:
>static int rbd_img_obj_request_submit(struct rbd_obj_request
>*obj_request)
> {
> 	struct rbd_img_request *img_request;
>+	bool known;
>
> 	rbd_assert(obj_request_img_data_test(obj_request));
>
> 	img_request = obj_request->img_request;
> 	rbd_assert(img_request);
>
>-	/* (At the moment we don't care whether it exists or not...) */
>-	(void) obj_request_exists_test;
>-
> 	/*
> 	 * Only layered writes need special handling.  If it's not a
> 	 * layered write, or it is a layered write but we know the
>@@ -2087,7 +2215,8 @@ static int rbd_img_obj_request_submit(struct
>rbd_obj_request *obj_request)
> 	 */
> 	if (!img_request_write_test(img_request) ||
> 		!img_request_layered_test(img_request) ||
>-		obj_request_known_test(obj_request)) {
>+		((known = obj_request_known_test(obj_request)) &&
>+			obj_request_exists_test(obj_request))) {
>
> 		struct rbd_device *rbd_dev;
> 		struct ceph_osd_client *osdc;
>@@ -2099,10 +2228,15 @@ static int rbd_img_obj_request_submit(struct
>rbd_obj_request *obj_request)
> 	}
>
> 	/*
>-	 * It's a layered write and we don't know whether the target
>-	 * exists.  Issue existence check; once that completes the
>-	 * original request will be submitted again.
>+	 * It's a layered write.  The target object might exist but
>+	 * we may not know that yet.  If we know it doesn't exist,
>+	 * start by reading the data for the full target object from
>+	 * the parent so we can use it for a copyup to the target.
> 	 */
>+	if (known)
>+		return rbd_img_obj_parent_read_full(obj_request);
>+
>+	/* We don't know whether the target exists.  Go find out. */
>
> 	return rbd_img_obj_exists_submit(obj_request);
> }

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 91fcf36..c5d0619 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -250,6 +250,7 @@  struct rbd_img_request {
 		struct request		*rq;		/* block request */
 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
 	};
+	struct page		**copyup_pages;
 	spinlock_t		completion_lock;/* protects next_completion */
 	u32			next_completion;
 	rbd_img_callback_t	callback;
@@ -350,6 +351,8 @@  static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);		/* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);

+static int rbd_img_request_submit(struct rbd_img_request *img_request);
+
 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);

@@ -1956,6 +1959,133 @@  out_unwind:
 	return -ENOMEM;
 }

+static void
+rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
+{
+	struct rbd_obj_request *orig_request;
+	struct page **pages;
+	u32 page_count;
+	int result;
+	u64 obj_size;
+	u64 xferred;
+
+	rbd_assert(img_request_child_test(img_request));
+
+	/* First get what we need from the image request */
+
+	pages = img_request->copyup_pages;
+	rbd_assert(pages != NULL);
+	img_request->copyup_pages = NULL;
+
+	orig_request = img_request->obj_request;
+	rbd_assert(orig_request != NULL);
+
+	result = img_request->result;
+	obj_size = img_request->length;
+	xferred = img_request->xferred;
+
+	rbd_img_request_put(img_request);
+
+	obj_request_existence_set(orig_request, true);
+
+	page_count = (u32)calc_pages_for(0, obj_size);
+	ceph_release_page_vector(pages, page_count);
+
+	/* Resubmit the original request (for now). */
+
+	orig_request->result = rbd_img_obj_request_submit(orig_request);
+	if (orig_request->result) {
+		obj_request_done_set(orig_request);
+		rbd_obj_request_complete(orig_request);
+	}
+}
+
+/*
+ * Read from the parent image the range of data that covers the
+ * entire target of the given object request.  This is used for
+ * satisfying a layered image write request when the target of an
+ * object request from the image request does not exist.
+ *
+ * A page array big enough to hold the returned data is allocated
+ * and supplied to rbd_img_request_fill() as the "data descriptor."
+ * When the read completes, this page array will be transferred to
+ * the original object request for the copyup operation.
+ *
+ * If an error occurs, record it as the result of the original
+ * object request and mark it done so it gets completed.
+ */
+static int rbd_img_obj_parent_read_full(struct rbd_obj_request
*obj_request)
+{
+	struct rbd_img_request *img_request = NULL;
+	struct rbd_img_request *parent_request = NULL;
+	struct rbd_device *rbd_dev;
+	u64 img_offset;
+	u64 length;
+	struct page **pages = NULL;
+	u32 page_count;
+	int result;
+
+	rbd_assert(obj_request_img_data_test(obj_request));
+	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+
+	img_request = obj_request->img_request;
+	rbd_assert(img_request != NULL);
+	rbd_dev = img_request->rbd_dev;
+	rbd_assert(rbd_dev->parent != NULL);
+
+	/*
+	 * Determine the byte range covered by the object in the
+	 * child image to which the original request was to be sent.
+	 */
+	img_offset = obj_request->img_offset - obj_request->offset;
+	length = (u64)1 << rbd_dev->header.obj_order;
+
+	/*
+	 * Allocate a page array big enough to receive the data read
+	 * from the parent.
+	 */
+	page_count = (u32)calc_pages_for(0, length);
+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(pages)) {
+		result = PTR_ERR(pages);
+		pages = NULL;
+		goto out_err;
+	}
+
+	result = -ENOMEM;
+	parent_request = rbd_img_request_create(rbd_dev->parent,
+						img_offset, length,
+						false, true);
+	if (!parent_request)
+		goto out_err;
+	rbd_obj_request_get(obj_request);
+	parent_request->obj_request = obj_request;
+
+	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
+	if (result)
+		goto out_err;
+	parent_request->copyup_pages = pages;
+
+	parent_request->callback = rbd_img_obj_parent_read_full_callback;
+	result = rbd_img_request_submit(parent_request);
+	if (!result)
+		return 0;
+
+	parent_request->copyup_pages = NULL;
+	parent_request->obj_request = NULL;
+	rbd_obj_request_put(obj_request);
+out_err:
+	if (pages)
+		ceph_release_page_vector(pages, page_count);
+	if (parent_request)
+		rbd_img_request_put(parent_request);
+	obj_request->result = result;
+	obj_request->xferred = 0;
+	obj_request_done_set(obj_request);
+
+	return result;
+}
+
 static void rbd_img_obj_exists_callback(struct rbd_obj_request
*obj_request)