[01/12,v2] rbd: new request tracking code
diff mbox

Message ID 5101406E.4060602@inktank.com
State New, archived
Headers show

Commit Message

Alex Elder Jan. 24, 2013, 2:08 p.m. UTC
This is an update of the first patch in my request tracking
code series.  After posting it the other day I identified some
problems related to reference counting of image and object
requests.  I also am starting to look at the details of
implementing layered reads, and ended up making some
substantive changes.  Since I have not seen any review
feedback I thought the best thing would be to just
re-post the updated patch.

The remaining patches in the series have changed accordingly,
but they have not really changed substantively, so I am
not re-posting those (but will if it's requested).

The main functional change is that an image request no longer
maintains an array of object request pointers, it maintains
a list of object requests.  This simplifies some things, and
makes the image request structure fixed size.

A few other functional changes:
- Reference counting of object and image requests is now
  done sensibly.
- Image requests now support a callback when complete,
  which will be used for layered I/O requests.
- There are a few new helper functions that encapsulate
  tying an object request to an image request.
- An distinct value is now used for the "which" field
  for object requests not associated with a image request
  (mainly used for validation/assertions).

Other changes:
- Everything that was named "image_request" now uses
  "img_request" instead.
- A few blocks and lines of code have been rearranged.

The updated series is available on the ceph-client git
repository in the branch "wip-rbd-review-v2".

					-Alex

This patch fully implements the new request tracking code for rbd
I/O requests.

Each I/O request to an rbd image will get an rbd_image_request
structure allocated to track it.  This provides access to all
information about the original request, as well as access to the
set of one or more object requests that are initiated as a result
of the image request.

An rbd_obj_request structure defines a request sent to a single osd
object (possibly) as part of an rbd image request.  An rbd object
request refers to a ceph_osd_request structure built up to represent
the request; for now it will contain a single osd operation.  It
also provides space to hold the result status and the version of the
object when the osd request completes.

An rbd_obj_request structure can also stand on its own.  This will
be used for reading the version 1 header object, for issuing
acknowledgements to event notifications, and for making object
method calls.

All rbd object requests now complete asynchronously with respect
to the osd client--they supply a common callback routine.

This resolves:
    http://tracker.newdream.net/issues/3741

Signed-off-by: Alex Elder <elder@inktank.com>
---
v2: - fixed reference counting
    - image request callback support
    - image/object connection helper functions
    - distinct BAD_WHICH value for non-image object requests

 drivers/block/rbd.c |  622
++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 620 insertions(+), 2 deletions(-)

 {
 	struct ceph_osd_req_op *op;
@@ -1395,6 +1512,26 @@ done:
 	return ret;
 }

+static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
+				struct rbd_obj_request *obj_request)
+{
+	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
+}
+
+static void rbd_img_request_complete(struct rbd_img_request *img_request)
+{
+	if (img_request->callback)
+		img_request->callback(img_request);
+	else
+		rbd_img_request_put(img_request);
+}
+
+static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+{
+	if (obj_request->callback)
+		obj_request->callback(obj_request);
+}
+
 /*
  * Request sync osd read
  */
@@ -1618,6 +1755,487 @@ static int rbd_dev_do_request(struct request *rq,
 	return 0;
 }

+static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
+{
+	u64 xferred;
+
+	/*
+	 * We support a 64-bit length, but ultimately it has to be
+	 * passed to blk_end_request(), which takes an unsigned int.
+	 */
+	xferred = le64_to_cpu(op->extent.length);
+	rbd_assert(xferred < (u64) UINT_MAX);
+	if (obj_request->result == (s32) -ENOENT) {
+		zero_bio_chain(obj_request->bio_list, 0);
+		obj_request->result = 0;
+	} else if (xferred < obj_request->length && !obj_request->result) {
+		zero_bio_chain(obj_request->bio_list, xferred);
+		xferred = obj_request->length;
+	}
+	obj_request->xferred = xferred;
+	atomic_set(&obj_request->done, 1);
+}
+
+static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
+{
+	obj_request->xferred = le64_to_cpu(op->extent.length);
+	atomic_set(&obj_request->done, 1);
+}
+
+static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
+				struct ceph_msg *msg)
+{
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
+	struct ceph_osd_reply_head *reply_head;
+	struct ceph_osd_op *op;
+	u32 num_ops;
+	u16 opcode;
+
+	rbd_assert(osd_req == obj_request->osd_req);
+	rbd_assert(!!obj_request->img_request ^
+				(obj_request->which == BAD_WHICH));
+
+	obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
+	reply_head = msg->front.iov_base;
+	obj_request->result = (s32) le32_to_cpu(reply_head->result);
+	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
+
+	num_ops = le32_to_cpu(reply_head->num_ops);
+	WARN_ON(num_ops != 1);	/* For now */
+
+	op = &reply_head->ops[0];
+	opcode = le16_to_cpu(op->op);
+	switch (opcode) {
+	case CEPH_OSD_OP_READ:
+		rbd_osd_read_callback(obj_request, op);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		rbd_osd_write_callback(obj_request, op);
+		break;
+	default:
+		rbd_warn(NULL, "%s: unsupported op %hu\n",
+			obj_request->object_name, (unsigned short) opcode);
+		break;
+	}
+
+	if (atomic_read(&obj_request->done))
+		rbd_obj_request_complete(obj_request);
+}
+
+static struct ceph_osd_request *rbd_osd_req_create(
+					struct rbd_device *rbd_dev,
+					bool write_request,
+					struct rbd_obj_request *obj_request,
+					struct ceph_osd_req_op *op)
+{
+	struct rbd_img_request *img_request = obj_request->img_request;
+	struct ceph_snap_context *snapc = NULL;
+	struct ceph_osd_client *osdc;
+	struct ceph_osd_request *osd_req;
+	struct timespec now;
+	struct timespec *mtime;
+	u64 snap_id = CEPH_NOSNAP;
+	u64 offset = obj_request->offset;
+	u64 length = obj_request->length;
+
+	if (img_request) {
+		rbd_assert(img_request->write_request == write_request);
+		if (img_request->write_request)
+			snapc = img_request->snapc;
+		else
+			snap_id = img_request->snap_id;
+	}
+
+	/* Allocate and initialize the request, for the single op */
+
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
+	if (!osd_req)
+		return NULL;	/* ENOMEM */
+
+	rbd_assert(obj_req_type_valid(obj_request->type));
+	switch (obj_request->type) {
+	case obj_req_bio:
+		rbd_assert(obj_request->bio_list != NULL);
+		osd_req->r_bio = obj_request->bio_list;
+		bio_get(osd_req->r_bio);
+		/* osd client requires "num pages" even for bio */
+		osd_req->r_num_pages = calc_pages_for(offset, length);
+		break;
+	}
+
+	if (write_request) {
+		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+		now = CURRENT_TIME;
+		mtime = &now;
+	} else {
+		osd_req->r_flags = CEPH_OSD_FLAG_READ;
+		mtime = NULL;	/* not needed for reads */
+		offset = 0;	/* These are not used... */
+		length = 0;	/* ...for osd read requests */
+	}
+
+	osd_req->r_callback = rbd_osd_req_callback;
+	osd_req->r_priv = obj_request;
+
+	/* No trailing '\0' required for the object name in the request */
+
+	osd_req->r_oid_len = strlen(obj_request->object_name);
+	rbd_assert(osd_req->r_oid_len <= sizeof (osd_req->r_oid));
+	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
+
+	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
+
+	/* osd_req will get its own reference to snapc (if non-null) */
+
+	ceph_osdc_build_request(osd_req, offset, length, 1, op,
+				snapc, snap_id, mtime);
+
+	return osd_req;
+}
+
+static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
+{
+	ceph_osdc_put_request(osd_req);
+}
+
+/* object_name is assumed to be a non-null pointer and NUL-terminated */
+
+static struct rbd_obj_request *rbd_obj_request_create(const char
*object_name,
+						u64 offset, u64 length,
+						enum obj_req_type type)
+{
+	struct rbd_obj_request *obj_request;
+	size_t size;
+	char *name;
+
+	rbd_assert(obj_req_type_valid(type));
+
+	size = strlen(object_name) + 1;
+	obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
+	if (!obj_request)
+		return NULL;
+
+	name = (char *)(obj_request + 1);
+	obj_request->object_name = memcpy(name, object_name, size);
+	obj_request->offset = offset;
+	obj_request->length = length;
+	obj_request->which = BAD_WHICH;
+	obj_request->type = type;
+	INIT_LIST_HEAD(&obj_request->links);
+	atomic_set(&obj_request->done, 0);
+	kref_init(&obj_request->kref);
+
+	return obj_request;
+}
+
+static void rbd_obj_request_destroy(struct kref *kref)
+{
+	struct rbd_obj_request *obj_request;
+
+	obj_request = container_of(kref, struct rbd_obj_request, kref);
+
+	rbd_assert(obj_request->img_request == NULL);
+	rbd_assert(obj_request->which == BAD_WHICH);
+
+	if (obj_request->osd_req)
+		rbd_osd_req_destroy(obj_request->osd_req);
+
+	rbd_assert(obj_req_type_valid(obj_request->type));
+	switch (obj_request->type) {
+	case obj_req_bio:
+		if (obj_request->bio_list)
+			bio_chain_put(obj_request->bio_list);
+		break;
+	}
+
+	kfree(obj_request);
+}
+
+/*
+ * Caller is responsible for filling in the list of object requests
+ * that comprises the image request, and the Linux request pointer
+ * (if there is one).
+ */
+struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
+					u64 offset, u64 length,
+					bool write_request)
+{
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc = NULL;
+
+	img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+	if (!img_request)
+		return NULL;
+
+	if (write_request) {
+		down_read(&rbd_dev->header_rwsem);
+		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+		up_read(&rbd_dev->header_rwsem);
+		if (WARN_ON(!snapc)) {
+			kfree(img_request);
+			return NULL;	/* Shouldn't happen */
+		}
+	}
+
+	img_request->rq = NULL;
+	img_request->rbd_dev = rbd_dev;
+	img_request->offset = offset;
+	img_request->length = length;
+	img_request->write_request = write_request;
+	if (write_request)
+		img_request->snapc = snapc;
+	else
+		img_request->snap_id = rbd_dev->spec->snap_id;
+	spin_lock_init(&img_request->completion_lock);
+	img_request->next_completion = 0;
+	img_request->callback = NULL;
+	img_request->obj_request_count = 0;
+	INIT_LIST_HEAD(&img_request->obj_requests);
+	kref_init(&img_request->kref);
+
+	rbd_img_request_get(img_request);	/* Avoid a warning */
+	rbd_img_request_put(img_request);	/* TEMPORARY */
+
+	return img_request;
+}
+
+static void rbd_img_request_destroy(struct kref *kref)
+{
+	struct rbd_img_request *img_request;
+	struct rbd_obj_request *obj_request;
+	struct rbd_obj_request *next_obj_request;
+
+	img_request = container_of(kref, struct rbd_img_request, kref);
+
+	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+		rbd_img_obj_request_del(img_request, obj_request);
+
+	if (img_request->write_request)
+		ceph_put_snap_context(img_request->snapc);
+
+	kfree(img_request);
+}
+
+static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
+					struct bio *bio_list)
+{
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	struct rbd_obj_request *obj_request = NULL;
+	struct rbd_obj_request *next_obj_request;
+	unsigned int bio_offset;
+	u64 image_offset;
+	u64 resid;
+	u16 opcode;
+
+	opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
+					      : CEPH_OSD_OP_READ;
+	bio_offset = 0;
+	image_offset = img_request->offset;
+	rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
+	resid = img_request->length;
+	while (resid) {
+		const char *object_name;
+		unsigned int clone_size;
+		struct ceph_osd_req_op *op;
+		u64 offset;
+		u64 length;
+
+		object_name = rbd_segment_name(rbd_dev, image_offset);
+		if (!object_name)
+			goto out_unwind;
+		offset = rbd_segment_offset(rbd_dev, image_offset);
+		length = rbd_segment_length(rbd_dev, image_offset, resid);
+		obj_request = rbd_obj_request_create(object_name,
+						offset, length, obj_req_bio);
+		kfree(object_name);	/* object request has its own copy */
+		if (!obj_request)
+			goto out_unwind;
+
+		rbd_assert(length <= (u64) UINT_MAX);
+		clone_size = (unsigned int) length;
+		obj_request->bio_list = bio_chain_clone_range(&bio_list,
+						&bio_offset, clone_size,
+						GFP_ATOMIC);
+		if (!obj_request->bio_list)
+			goto out_partial;
+
+		/*
+		 * Build up the op to use in building the osd
+		 * request.  Note that the contents of the op are
+		 * copied by rbd_osd_req_create().
+		 */
+		op = rbd_osd_req_op_create(opcode, offset, length);
+		if (!op)
+			goto out_partial;
+		obj_request->osd_req = rbd_osd_req_create(rbd_dev,
+						img_request->write_request,
+						obj_request, op);
+		rbd_osd_req_op_destroy(op);
+		if (!obj_request->osd_req)
+			goto out_partial;
+		/* status and version are initially zero-filled */
+
+		rbd_img_obj_request_add(img_request, obj_request);
+
+		image_offset += length;
+		resid -= length;
+	}
+
+	return 0;
+
+out_partial:
+	rbd_obj_request_put(obj_request);
+out_unwind:
+	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+		rbd_obj_request_put(obj_request);
+
+	return -ENOMEM;
+}
+
+static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+{
+	struct rbd_img_request *img_request;
+	u32 which = obj_request->which;
+	bool more = true;
+
+	img_request = obj_request->img_request;
+	rbd_assert(img_request != NULL);
+	rbd_assert(img_request->rq != NULL);
+	rbd_assert(which != BAD_WHICH);
+	rbd_assert(which < img_request->obj_request_count);
+	rbd_assert(which >= img_request->next_completion);
+
+	spin_lock(&img_request->completion_lock);
+	if (which != img_request->next_completion)
+		goto out;
+
+	for_each_obj_request_from(img_request, obj_request) {
+		unsigned int xferred;
+		int result;
+
+		rbd_assert(more);
+		rbd_assert(which < img_request->obj_request_count);
+
+		if (!atomic_read(&obj_request->done))
+			break;
+
+		rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
+		xferred = (unsigned int) obj_request->xferred;
+		result = (int) obj_request->result;
+		if (result)
+			rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
+				img_request->write_request ? "write" : "read",
+				result, xferred);
+
+		more = blk_end_request(img_request->rq, result, xferred);
+		which++;
+	}
+	rbd_assert(more ^ (which == img_request->obj_request_count));
+	img_request->next_completion = which;
+out:
+	spin_unlock(&img_request->completion_lock);
+
+	if (!more)
+		rbd_img_request_complete(img_request);
+}
+
+static int rbd_img_request_submit(struct rbd_img_request *img_request)
+{
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_obj_request *obj_request;
+
+	for_each_obj_request(img_request, obj_request) {
+		int ret;
+
+		obj_request->callback = rbd_img_obj_callback;
+		ret = rbd_obj_request_submit(osdc, obj_request);
+		if (ret)
+			return ret;
+		/*
+		 * The image request has its own reference to each
+		 * of its object requests, so we can safely drop the
+		 * initial one here.
+		 */
+		rbd_obj_request_put(obj_request);
+	}
+
+	return 0;
+}
+
+static void rbd_request_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	bool read_only = rbd_dev->mapping.read_only;
+	struct request *rq;
+	int result;
+
+	while ((rq = blk_fetch_request(q))) {
+		bool write_request = rq_data_dir(rq) == WRITE;
+		struct rbd_img_request *img_request;
+		u64 offset;
+		u64 length;
+
+		/* Ignore any non-FS requests that filter through. */
+
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			__blk_end_request_all(rq, 0);
+			continue;
+		}
+
+		spin_unlock_irq(q->queue_lock);
+
+		/* Disallow writes to a read-only device */
+
+		if (write_request) {
+			result = -EROFS;
+			if (read_only)
+				goto end_request;
+			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+		}
+
+		/* Quit early if the snapshot has disappeared */
+
+		if (!atomic_read(&rbd_dev->exists)) {
+			dout("request for non-existent snapshot");
+			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+			result = -ENXIO;
+			goto end_request;
+		}
+
+		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
+		length = (u64) blk_rq_bytes(rq);
+
+		result = -EINVAL;
+		if (WARN_ON(offset && length > U64_MAX - offset + 1))
+			goto end_request;	/* Shouldn't happen */
+
+		result = -ENOMEM;
+		img_request = rbd_img_request_create(rbd_dev, offset, length,
+							write_request);
+		if (!img_request)
+			goto end_request;
+
+		img_request->rq = rq;
+
+		result = rbd_img_request_fill_bio(img_request, rq->bio);
+		if (!result)
+			result = rbd_img_request_submit(img_request);
+		if (result)
+			rbd_img_request_put(img_request);
+end_request:
+		spin_lock_irq(q->queue_lock);
+		if (result < 0) {
+			rbd_warn(rbd_dev, "obj_request %s result %d\n",
+				write_request ? "write" : "read", result);
+			__blk_end_request_all(rq, result);
+		}
+	}
+}
+
 /*
  * block device queue callback
  */
@@ -1929,8 +2547,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	disk->fops = &rbd_bd_ops;
 	disk->private_data = rbd_dev;

-	/* init rq */
-	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+	(void) rbd_rq_fn;		/* avoid a warning */
+	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
 	if (!q)
 		goto out_disk;

Comments

Alex Elder Jan. 24, 2013, 4:22 p.m. UTC | #1
On 01/24/2013 08:08 AM, Alex Elder wrote:
> The remaining patches in the series have changed accordingly,
> but they have not really changed substantively, so I am
> not re-posting those (but will if it's requested).

This was requested.  So I'm going to re-post the remainder
of the series.  They'll show up as responses to the first
one I posted.

					-Alex
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Josh Durgin Jan. 29, 2013, 10:43 a.m. UTC | #2
On 01/24/2013 06:08 AM, Alex Elder wrote:
> This is an update of the first patch in my request tracking
> code series.  After posting it the other day I identified some
> problems related to reference counting of image and object
> requests.  I also am starting to look at the details of
> implementing layered reads, and ended up making some
> substantive changes.  Since I have not seen any review
> feedback I thought the best thing would be to just
> re-post the updated patch.
>
> The remaining patches in the series have changed accordingly,
> but they have not really changed substantively, so I am
> not re-posting those (but will if it's requested).
>
> The main functional change is that an image request no longer
> maintains an array of object request pointers, it maintains
> a list of object requests.  This simplifies some things, and
> makes the image request structure fixed size.
>
> A few other functional changes:
> - Reference counting of object and image requests is now
>    done sensibly.
> - Image requests now support a callback when complete,
>    which will be used for layered I/O requests.
> - There are a few new helper functions that encapsulate
>    tying an object request to an image request.
> - An distinct value is now used for the "which" field
>    for object requests not associated with a image request
>    (mainly used for validation/assertions).
>
> Other changes:
> - Everything that was named "image_request" now uses
>    "img_request" instead.
> - A few blocks and lines of code have been rearranged.
>
> The updated series is available on the ceph-client git
> repository in the branch "wip-rbd-review-v2".
>
> 					-Alex
>
> This patch fully implements the new request tracking code for rbd
> I/O requests.
>
> Each I/O request to an rbd image will get an rbd_image_request
> structure allocated to track it.  This provides access to all
> information about the original request, as well as access to the
> set of one or more object requests that are initiated as a result
> of the image request.
>
> An rbd_obj_request structure defines a request sent to a single osd
> object (possibly) as part of an rbd image request.  An rbd object
> request refers to a ceph_osd_request structure built up to represent
> the request; for now it will contain a single osd operation.  It
> also provides space to hold the result status and the version of the
> object when the osd request completes.
>
> An rbd_obj_request structure can also stand on its own.  This will
> be used for reading the version 1 header object, for issuing
> acknowledgements to event notifications, and for making object
> method calls.
>
> All rbd object requests now complete asynchronously with respect
> to the osd client--they supply a common callback routine.
>
> This resolves:
>      http://tracker.newdream.net/issues/3741
>
> Signed-off-by: Alex Elder <elder@inktank.com>
> ---
> v2: - fixed reference counting
>      - image request callback support
>      - image/object connection helper functions
>      - distinct BAD_WHICH value for non-image object requests
>
>   drivers/block/rbd.c |  622
> ++++++++++++++++++++++++++++++++++++++++++++++++++-
>   1 file changed, 620 insertions(+), 2 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 6689363..46a61dd 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -181,6 +181,67 @@ struct rbd_req_coll {
>   	struct rbd_req_status	status[0];
>   };
>
> +struct rbd_img_request;
> +typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
> +
> +#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
> +
> +struct rbd_obj_request;
> +typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
> +
> +enum obj_req_type { obj_req_bio };	/* More types to come */

enum labels should be capitalized.

> +struct rbd_obj_request {
> +	const char		*object_name;
> +	u64			offset;		/* object start byte */
> +	u64			length;		/* bytes from offset */
> +
> +	struct rbd_img_request	*img_request;
> +	struct list_head	links;
> +	u32			which;		/* posn image request list */
> +
> +	enum obj_req_type	type;
> +	struct bio		*bio_list;
> +
> +	struct ceph_osd_request	*osd_req;
> +
> +	u64			xferred;	/* bytes transferred */
> +	u64			version;

This version is only used (uselessly) for the watch operation. It
should be removed in a future patch (along with the obj_ver in the
header).

> +	s32			result;
> +	atomic_t		done;
> +
> +	rbd_obj_callback_t	callback;
> +
> +	struct kref		kref;
> +};
> +
> +struct rbd_img_request {
> +	struct request		*rq;
> +	struct rbd_device	*rbd_dev;
> +	u64			offset;	/* starting image byte offset */
> +	u64			length;	/* byte count from offset */
> +	bool			write_request;	/* false for read */
> +	union {
> +		struct ceph_snap_context *snapc;	/* for writes */
> +		u64		snap_id;		/* for reads */
> +	};
> +	spinlock_t		completion_lock;

It'd be nice to have a comment describing what this lock protects.

> +	u32			next_completion;
> +	rbd_img_callback_t	callback;
> +
> +	u32			obj_request_count;
> +	struct list_head	obj_requests;

Maybe note that these are rbd_obj_requests, and not ceph_osd_requests.

> +	struct kref		kref;
> +};
> +
> +#define for_each_obj_request(ireq, oreq) \
> +	list_for_each_entry(oreq, &ireq->obj_requests, links)
> +#define for_each_obj_request_from(ireq, oreq) \
> +	list_for_each_entry_from(oreq, &ireq->obj_requests, links)
> +#define for_each_obj_request_safe(ireq, oreq, n) \
> +	list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
> +
>   /*
>    * a single io request
>    */
> @@ -1031,6 +1092,62 @@ out_err:
>   	return NULL;
>   }
>
> +static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
> +{
> +	kref_get(&obj_request->kref);
> +}
> +
> +static void rbd_obj_request_destroy(struct kref *kref);
> +static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
> +{
> +	rbd_assert(obj_request != NULL);
> +	kref_put(&obj_request->kref, rbd_obj_request_destroy);
> +}
> +
> +static void rbd_img_request_get(struct rbd_img_request *img_request)
> +{
> +	kref_get(&img_request->kref);
> +}
> +
> +static void rbd_img_request_destroy(struct kref *kref);
> +static void rbd_img_request_put(struct rbd_img_request *img_request)
> +{
> +	rbd_assert(img_request != NULL);
> +	kref_put(&img_request->kref, rbd_img_request_destroy);
> +}
> +
> +static inline void rbd_img_obj_request_add(struct rbd_img_request
> *img_request,
> +					struct rbd_obj_request *obj_request)
> +{
> +	rbd_obj_request_get(obj_request);
> +	obj_request->img_request = img_request;
> +	list_add_tail(&obj_request->links, &img_request->obj_requests);
> +	obj_request->which = img_request->obj_request_count++;
> +	rbd_assert(obj_request->which != BAD_WHICH);
> +}
> +
> +static inline void rbd_img_obj_request_del(struct rbd_img_request
> *img_request,
> +					struct rbd_obj_request *obj_request)
> +{
> +	rbd_assert(obj_request->which != BAD_WHICH);
> +	obj_request->which = BAD_WHICH;
> +	list_del(&obj_request->links);
> +	rbd_assert(obj_request->img_request == img_request);
> +	obj_request->callback = NULL;
> +	obj_request->img_request = NULL;
> +	rbd_obj_request_put(obj_request);
> +}
> +
> +static bool obj_req_type_valid(enum obj_req_type type)
> +{
> +	switch (type) {
> +	case obj_req_bio:
> +		return true;
> +	default:
> +		return false;
> +	}
> +}
> +
>   struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
>   {
>   	struct ceph_osd_req_op *op;
> @@ -1395,6 +1512,26 @@ done:
>   	return ret;
>   }
>
> +static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
> +				struct rbd_obj_request *obj_request)
> +{
> +	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
> +}
> +
> +static void rbd_img_request_complete(struct rbd_img_request *img_request)
> +{
> +	if (img_request->callback)
> +		img_request->callback(img_request);
> +	else
> +		rbd_img_request_put(img_request);
> +}

Why rely on the callback to rbd_img_request_put()? Wouldn't it be a
bit simpler to unconditionally do the put here?

> +static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
> +{
> +	if (obj_request->callback)
> +		obj_request->callback(obj_request);
> +}
> +
>   /*
>    * Request sync osd read
>    */
> @@ -1618,6 +1755,487 @@ static int rbd_dev_do_request(struct request *rq,
>   	return 0;
>   }
>
> +static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
> +				struct ceph_osd_op *op)
> +{
> +	u64 xferred;
> +
> +	/*
> +	 * We support a 64-bit length, but ultimately it has to be
> +	 * passed to blk_end_request(), which takes an unsigned int.
> +	 */
> +	xferred = le64_to_cpu(op->extent.length);
> +	rbd_assert(xferred < (u64) UINT_MAX);
> +	if (obj_request->result == (s32) -ENOENT) {
> +		zero_bio_chain(obj_request->bio_list, 0);
> +		obj_request->result = 0;
> +	} else if (xferred < obj_request->length && !obj_request->result) {
> +		zero_bio_chain(obj_request->bio_list, xferred);
> +		xferred = obj_request->length;
> +	}
> +	obj_request->xferred = xferred;
> +	atomic_set(&obj_request->done, 1);
> +}
> +
> +static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
> +				struct ceph_osd_op *op)
> +{
> +	obj_request->xferred = le64_to_cpu(op->extent.length);
> +	atomic_set(&obj_request->done, 1);
> +}
> +
> +static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
> +				struct ceph_msg *msg)
> +{
> +	struct rbd_obj_request *obj_request = osd_req->r_priv;
> +	struct ceph_osd_reply_head *reply_head;
> +	struct ceph_osd_op *op;
> +	u32 num_ops;
> +	u16 opcode;
> +
> +	rbd_assert(osd_req == obj_request->osd_req);
> +	rbd_assert(!!obj_request->img_request ^
> +				(obj_request->which == BAD_WHICH));
> +
> +	obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
> +	reply_head = msg->front.iov_base;
> +	obj_request->result = (s32) le32_to_cpu(reply_head->result);
> +	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
> +
> +	num_ops = le32_to_cpu(reply_head->num_ops);
> +	WARN_ON(num_ops != 1);	/* For now */
> +
> +	op = &reply_head->ops[0];
> +	opcode = le16_to_cpu(op->op);
> +	switch (opcode) {
> +	case CEPH_OSD_OP_READ:
> +		rbd_osd_read_callback(obj_request, op);
> +		break;
> +	case CEPH_OSD_OP_WRITE:
> +		rbd_osd_write_callback(obj_request, op);
> +		break;
> +	default:
> +		rbd_warn(NULL, "%s: unsupported op %hu\n",
> +			obj_request->object_name, (unsigned short) opcode);
> +		break;
> +	}
> +
> +	if (atomic_read(&obj_request->done))
> +		rbd_obj_request_complete(obj_request);
> +}
> +
> +static struct ceph_osd_request *rbd_osd_req_create(
> +					struct rbd_device *rbd_dev,
> +					bool write_request,
> +					struct rbd_obj_request *obj_request,
> +					struct ceph_osd_req_op *op)
> +{
> +	struct rbd_img_request *img_request = obj_request->img_request;
> +	struct ceph_snap_context *snapc = NULL;
> +	struct ceph_osd_client *osdc;
> +	struct ceph_osd_request *osd_req;
> +	struct timespec now;
> +	struct timespec *mtime;
> +	u64 snap_id = CEPH_NOSNAP;
> +	u64 offset = obj_request->offset;
> +	u64 length = obj_request->length;
> +
> +	if (img_request) {
> +		rbd_assert(img_request->write_request == write_request);
> +		if (img_request->write_request)
> +			snapc = img_request->snapc;
> +		else
> +			snap_id = img_request->snap_id;
> +	}
> +
> +	/* Allocate and initialize the request, for the single op */
> +
> +	osdc = &rbd_dev->rbd_client->client->osdc;
> +	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
> +	if (!osd_req)
> +		return NULL;	/* ENOMEM */
> +
> +	rbd_assert(obj_req_type_valid(obj_request->type));
> +	switch (obj_request->type) {
> +	case obj_req_bio:
> +		rbd_assert(obj_request->bio_list != NULL);
> +		osd_req->r_bio = obj_request->bio_list;
> +		bio_get(osd_req->r_bio);
> +		/* osd client requires "num pages" even for bio */
> +		osd_req->r_num_pages = calc_pages_for(offset, length);
> +		break;
> +	}
> +
> +	if (write_request) {
> +		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
> +		now = CURRENT_TIME;
> +		mtime = &now;
> +	} else {
> +		osd_req->r_flags = CEPH_OSD_FLAG_READ;
> +		mtime = NULL;	/* not needed for reads */
> +		offset = 0;	/* These are not used... */
> +		length = 0;	/* ...for osd read requests */
> +	}
> +
> +	osd_req->r_callback = rbd_osd_req_callback;
> +	osd_req->r_priv = obj_request;
> +
> +	/* No trailing '\0' required for the object name in the request */

It looks like ceph_calc_object_layout() does require the trailing '\0':

osd_client.c:
   __map_request()
     ceph_calc_object_layout(...,->r_oid,...)
       strlen(oid)

> +	osd_req->r_oid_len = strlen(obj_request->object_name);
> +	rbd_assert(osd_req->r_oid_len <= sizeof (osd_req->r_oid));
> +	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
> +
> +	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
> +
> +	/* osd_req will get its own reference to snapc (if non-null) */
> +
> +	ceph_osdc_build_request(osd_req, offset, length, 1, op,
> +				snapc, snap_id, mtime);
> +
> +	return osd_req;
> +}
> +
> +static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
> +{
> +	ceph_osdc_put_request(osd_req);
> +}
> +
> +/* object_name is assumed to be a non-null pointer and NUL-terminated */
> +
> +static struct rbd_obj_request *rbd_obj_request_create(const char
> *object_name,
> +						u64 offset, u64 length,
> +						enum obj_req_type type)
> +{
> +	struct rbd_obj_request *obj_request;
> +	size_t size;
> +	char *name;
> +
> +	rbd_assert(obj_req_type_valid(type));
> +
> +	size = strlen(object_name) + 1;
> +	obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
> +	if (!obj_request)
> +		return NULL;
> +
> +	name = (char *)(obj_request + 1);
> +	obj_request->object_name = memcpy(name, object_name, size);
> +	obj_request->offset = offset;
> +	obj_request->length = length;
> +	obj_request->which = BAD_WHICH;
> +	obj_request->type = type;
> +	INIT_LIST_HEAD(&obj_request->links);
> +	atomic_set(&obj_request->done, 0);
> +	kref_init(&obj_request->kref);
> +
> +	return obj_request;
> +}
> +
> +static void rbd_obj_request_destroy(struct kref *kref)
> +{
> +	struct rbd_obj_request *obj_request;
> +
> +	obj_request = container_of(kref, struct rbd_obj_request, kref);
> +
> +	rbd_assert(obj_request->img_request == NULL);
> +	rbd_assert(obj_request->which == BAD_WHICH);
> +
> +	if (obj_request->osd_req)
> +		rbd_osd_req_destroy(obj_request->osd_req);
> +
> +	rbd_assert(obj_req_type_valid(obj_request->type));
> +	switch (obj_request->type) {
> +	case obj_req_bio:
> +		if (obj_request->bio_list)
> +			bio_chain_put(obj_request->bio_list);
> +		break;
> +	}
> +
> +	kfree(obj_request);
> +}
> +
> +/*
> + * Caller is responsible for filling in the list of object requests
> + * that comprises the image request, and the Linux request pointer
> + * (if there is one).
> + */
> +struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
> +					u64 offset, u64 length,
> +					bool write_request)
> +{
> +	struct rbd_img_request *img_request;
> +	struct ceph_snap_context *snapc = NULL;
> +
> +	img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
> +	if (!img_request)
> +		return NULL;
> +
> +	if (write_request) {
> +		down_read(&rbd_dev->header_rwsem);
> +		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
> +		up_read(&rbd_dev->header_rwsem);
> +		if (WARN_ON(!snapc)) {
> +			kfree(img_request);
> +			return NULL;	/* Shouldn't happen */
> +		}
> +	}
> +
> +	img_request->rq = NULL;
> +	img_request->rbd_dev = rbd_dev;
> +	img_request->offset = offset;
> +	img_request->length = length;
> +	img_request->write_request = write_request;
> +	if (write_request)
> +		img_request->snapc = snapc;
> +	else
> +		img_request->snap_id = rbd_dev->spec->snap_id;
> +	spin_lock_init(&img_request->completion_lock);
> +	img_request->next_completion = 0;
> +	img_request->callback = NULL;
> +	img_request->obj_request_count = 0;
> +	INIT_LIST_HEAD(&img_request->obj_requests);
> +	kref_init(&img_request->kref);
> +
> +	rbd_img_request_get(img_request);	/* Avoid a warning */
> +	rbd_img_request_put(img_request);	/* TEMPORARY */
> +
> +	return img_request;
> +}
> +
> +static void rbd_img_request_destroy(struct kref *kref)
> +{
> +	struct rbd_img_request *img_request;
> +	struct rbd_obj_request *obj_request;
> +	struct rbd_obj_request *next_obj_request;
> +
> +	img_request = container_of(kref, struct rbd_img_request, kref);
> +
> +	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
> +		rbd_img_obj_request_del(img_request, obj_request);
> +
> +	if (img_request->write_request)
> +		ceph_put_snap_context(img_request->snapc);
> +
> +	kfree(img_request);
> +}
> +
> +static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
> +					struct bio *bio_list)
> +{
> +	struct rbd_device *rbd_dev = img_request->rbd_dev;
> +	struct rbd_obj_request *obj_request = NULL;
> +	struct rbd_obj_request *next_obj_request;
> +	unsigned int bio_offset;
> +	u64 image_offset;
> +	u64 resid;
> +	u16 opcode;
> +
> +	opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
> +					      : CEPH_OSD_OP_READ;
> +	bio_offset = 0;
> +	image_offset = img_request->offset;
> +	rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
> +	resid = img_request->length;
> +	while (resid) {
> +		const char *object_name;
> +		unsigned int clone_size;
> +		struct ceph_osd_req_op *op;
> +		u64 offset;
> +		u64 length;
> +
> +		object_name = rbd_segment_name(rbd_dev, image_offset);
> +		if (!object_name)
> +			goto out_unwind;
> +		offset = rbd_segment_offset(rbd_dev, image_offset);
> +		length = rbd_segment_length(rbd_dev, image_offset, resid);
> +		obj_request = rbd_obj_request_create(object_name,
> +						offset, length, obj_req_bio);
> +		kfree(object_name);	/* object request has its own copy */
> +		if (!obj_request)
> +			goto out_unwind;
> +
> +		rbd_assert(length <= (u64) UINT_MAX);
> +		clone_size = (unsigned int) length;
> +		obj_request->bio_list = bio_chain_clone_range(&bio_list,
> +						&bio_offset, clone_size,
> +						GFP_ATOMIC);
> +		if (!obj_request->bio_list)
> +			goto out_partial;
> +
> +		/*
> +		 * Build up the op to use in building the osd
> +		 * request.  Note that the contents of the op are
> +		 * copied by rbd_osd_req_create().
> +		 */
> +		op = rbd_osd_req_op_create(opcode, offset, length);
> +		if (!op)
> +			goto out_partial;
> +		obj_request->osd_req = rbd_osd_req_create(rbd_dev,
> +						img_request->write_request,
> +						obj_request, op);
> +		rbd_osd_req_op_destroy(op);
> +		if (!obj_request->osd_req)
> +			goto out_partial;
> +		/* status and version are initially zero-filled */
> +
> +		rbd_img_obj_request_add(img_request, obj_request);
> +
> +		image_offset += length;
> +		resid -= length;
> +	}
> +
> +	return 0;
> +
> +out_partial:
> +	rbd_obj_request_put(obj_request);
> +out_unwind:
> +	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
> +		rbd_obj_request_put(obj_request);
> +
> +	return -ENOMEM;
> +}
> +
> +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
> +{
> +	struct rbd_img_request *img_request;
> +	u32 which = obj_request->which;
> +	bool more = true;
> +
> +	img_request = obj_request->img_request;
> +	rbd_assert(img_request != NULL);
> +	rbd_assert(img_request->rq != NULL);
> +	rbd_assert(which != BAD_WHICH);
> +	rbd_assert(which < img_request->obj_request_count);
> +	rbd_assert(which >= img_request->next_completion);
> +
> +	spin_lock(&img_request->completion_lock);

In the current equivalent code (rbd_coll_end_req_index), we use
spin_lock_irq(), and don't hold the spinlock while calling
blk_end_request.

Why the change, and is this change safe?

> +	if (which != img_request->next_completion)
> +		goto out;
> +
> +	for_each_obj_request_from(img_request, obj_request) {
> +		unsigned int xferred;
> +		int result;
> +
> +		rbd_assert(more);
> +		rbd_assert(which < img_request->obj_request_count);
> +
> +		if (!atomic_read(&obj_request->done))
> +			break;
> +
> +		rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
> +		xferred = (unsigned int) obj_request->xferred;
> +		result = (int) obj_request->result;
> +		if (result)
> +			rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
> +				img_request->write_request ? "write" : "read",
> +				result, xferred);
> +
> +		more = blk_end_request(img_request->rq, result, xferred);
> +		which++;
> +	}
> +	rbd_assert(more ^ (which == img_request->obj_request_count));
> +	img_request->next_completion = which;
> +out:
> +	spin_unlock(&img_request->completion_lock);
> +
> +	if (!more)
> +		rbd_img_request_complete(img_request);
> +}
> +
> +static int rbd_img_request_submit(struct rbd_img_request *img_request)
> +{
> +	struct rbd_device *rbd_dev = img_request->rbd_dev;
> +	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
> +	struct rbd_obj_request *obj_request;
> +
> +	for_each_obj_request(img_request, obj_request) {
> +		int ret;
> +
> +		obj_request->callback = rbd_img_obj_callback;
> +		ret = rbd_obj_request_submit(osdc, obj_request);
> +		if (ret)
> +			return ret;
> +		/*
> +		 * The image request has its own reference to each
> +		 * of its object requests, so we can safely drop the
> +		 * initial one here.
> +		 */
> +		rbd_obj_request_put(obj_request);
> +	}
> +
> +	return 0;
> +}
> +
> +static void rbd_request_fn(struct request_queue *q)
> +{
> +	struct rbd_device *rbd_dev = q->queuedata;
> +	bool read_only = rbd_dev->mapping.read_only;
> +	struct request *rq;
> +	int result;
> +
> +	while ((rq = blk_fetch_request(q))) {
> +		bool write_request = rq_data_dir(rq) == WRITE;
> +		struct rbd_img_request *img_request;
> +		u64 offset;
> +		u64 length;
> +
> +		/* Ignore any non-FS requests that filter through. */
> +
> +		if (rq->cmd_type != REQ_TYPE_FS) {
> +			__blk_end_request_all(rq, 0);
> +			continue;
> +		}
> +
> +		spin_unlock_irq(q->queue_lock);
> +
> +		/* Disallow writes to a read-only device */
> +
> +		if (write_request) {
> +			result = -EROFS;
> +			if (read_only)
> +				goto end_request;
> +			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
> +		}
> +
> +		/* Quit early if the snapshot has disappeared */
> +
> +		if (!atomic_read(&rbd_dev->exists)) {
> +			dout("request for non-existent snapshot");
> +			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
> +			result = -ENXIO;
> +			goto end_request;
> +		}
> +
> +		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
> +		length = (u64) blk_rq_bytes(rq);
> +
> +		result = -EINVAL;
> +		if (WARN_ON(offset && length > U64_MAX - offset + 1))
> +			goto end_request;	/* Shouldn't happen */
> +
> +		result = -ENOMEM;
> +		img_request = rbd_img_request_create(rbd_dev, offset, length,
> +							write_request);
> +		if (!img_request)
> +			goto end_request;
> +
> +		img_request->rq = rq;
> +
> +		result = rbd_img_request_fill_bio(img_request, rq->bio);
> +		if (!result)
> +			result = rbd_img_request_submit(img_request);
> +		if (result)
> +			rbd_img_request_put(img_request);
> +end_request:
> +		spin_lock_irq(q->queue_lock);
> +		if (result < 0) {
> +			rbd_warn(rbd_dev, "obj_request %s result %d\n",
> +				write_request ? "write" : "read", result);
> +			__blk_end_request_all(rq, result);
> +		}
> +	}
> +}
> +
>   /*
>    * block device queue callback
>    */
> @@ -1929,8 +2547,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
>   	disk->fops = &rbd_bd_ops;
>   	disk->private_data = rbd_dev;
>
> -	/* init rq */
> -	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
> +	(void) rbd_rq_fn;		/* avoid a warning */
> +	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
>   	if (!q)
>   		goto out_disk;
>

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Alex Elder Jan. 30, 2013, 12:34 a.m. UTC | #3
On 01/29/2013 04:43 AM, Josh Durgin wrote:
> On 01/24/2013 06:08 AM, Alex Elder wrote:
>> This is an update of the first patch in my request tracking
>> code series.  After posting it the other day I identified some
>> problems related to reference counting of image and object
>> requests.  I also am starting to look at the details of
>> implementing layered reads, and ended up making some
>> substantive changes.  Since I have not seen any review
>> feedback I thought the best thing would be to just
>> re-post the updated patch.
>>
>> The remaining patches in the series have changed accordingly,
>> but they have not really changed substantively, so I am
>> not re-posting those (but will if it's requested).
>>
>> The main functional change is that an image request no longer
>> maintains an array of object request pointers, it maintains
>> a list of object requests.  This simplifies some things, and
>> makes the image request structure fixed size.
>>
>> A few other functional changes:
>> - Reference counting of object and image requests is now
>>    done sensibly.
>> - Image requests now support a callback when complete,
>>    which will be used for layered I/O requests.
>> - There are a few new helper functions that encapsulate
>>    tying an object request to an image request.
>> - An distinct value is now used for the "which" field
>>    for object requests not associated with a image request
>>    (mainly used for validation/assertions).
>>
>> Other changes:
>> - Everything that was named "image_request" now uses
>>    "img_request" instead.
>> - A few blocks and lines of code have been rearranged.
>>
>> The updated series is available on the ceph-client git
>> repository in the branch "wip-rbd-review-v2".
>>
>>                     -Alex
>>
>> This patch fully implements the new request tracking code for rbd
>> I/O requests.

Responses to your review comments are below.

Thank you very much for careful, thorough, and thoughtful
work on this.  I believe it's very important and you do
a good job of it.

					-Alex

>> Each I/O request to an rbd image will get an rbd_image_request
>> structure allocated to track it.  This provides access to all
>> information about the original request, as well as access to the
>> set of one or more object requests that are initiated as a result
>> of the image request.
>>
>> An rbd_obj_request structure defines a request sent to a single osd
>> object (possibly) as part of an rbd image request.  An rbd object
>> request refers to a ceph_osd_request structure built up to represent
>> the request; for now it will contain a single osd operation.  It
>> also provides space to hold the result status and the version of the
>> object when the osd request completes.
>>
>> An rbd_obj_request structure can also stand on its own.  This will
>> be used for reading the version 1 header object, for issuing
>> acknowledgements to event notifications, and for making object
>> method calls.
>>
>> All rbd object requests now complete asynchronously with respect
>> to the osd client--they supply a common callback routine.
>>
>> This resolves:
>>      http://tracker.newdream.net/issues/3741
>>
>> Signed-off-by: Alex Elder <elder@inktank.com>
>> ---
>> v2: - fixed reference counting
>>      - image request callback support
>>      - image/object connection helper functions
>>      - distinct BAD_WHICH value for non-image object requests
>>
>>   drivers/block/rbd.c |  622
>> ++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 620 insertions(+), 2 deletions(-)
>>
>> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
>> index 6689363..46a61dd 100644
>> --- a/drivers/block/rbd.c
>> +++ b/drivers/block/rbd.c
>> @@ -181,6 +181,67 @@ struct rbd_req_coll {
>>       struct rbd_req_status    status[0];
>>   };
>>
>> +struct rbd_img_request;
>> +typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
>> +
>> +#define    BAD_WHICH    U32_MAX        /* Good which or bad which,
>> which? */
>> +
>> +struct rbd_obj_request;
>> +typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
>> +
>> +enum obj_req_type { obj_req_bio };    /* More types to come */
> 
> enum labels should be capitalized.

Not where I come from (at least not always), but I'll
convert these to be all caps.

>> +struct rbd_obj_request {
>> +    const char        *object_name;
>> +    u64            offset;        /* object start byte */
>> +    u64            length;        /* bytes from offset */
>> +
>> +    struct rbd_img_request    *img_request;
>> +    struct list_head    links;
>> +    u32            which;        /* posn image request list */
>> +
>> +    enum obj_req_type    type;
>> +    struct bio        *bio_list;
>> +
>> +    struct ceph_osd_request    *osd_req;
>> +
>> +    u64            xferred;    /* bytes transferred */
>> +    u64            version;
> 
> This version is only used (uselessly) for the watch operation. It
> should be removed in a future patch (along with the obj_ver in the
> header).

I'm not sure whether the description is quite right, but:
   http://tracker.ceph.com/issues/3952

>> +    s32            result;
>> +    atomic_t        done;
>> +
>> +    rbd_obj_callback_t    callback;
>> +
>> +    struct kref        kref;
>> +};
>> +
>> +struct rbd_img_request {
>> +    struct request        *rq;
>> +    struct rbd_device    *rbd_dev;
>> +    u64            offset;    /* starting image byte offset */
>> +    u64            length;    /* byte count from offset */
>> +    bool            write_request;    /* false for read */
>> +    union {
>> +        struct ceph_snap_context *snapc;    /* for writes */
>> +        u64        snap_id;        /* for reads */
>> +    };
>> +    spinlock_t        completion_lock;
> 
> It'd be nice to have a comment describing what this lock protects.

OK, I'll add one.  The lock may not be needed, but for now I left
it in.  It protects updates to the next_completion field.

>> +    u32            next_completion;
>> +    rbd_img_callback_t    callback;
>> +
>> +    u32            obj_request_count;
>> +    struct list_head    obj_requests;
> 
> Maybe note that these are rbd_obj_requests, and not ceph_osd_requests.

I meant for the name to suggest that (I'm pretty consistent
about using osd_req and obj_request prefixes), but I'll add
a short comment since the list type doesn't make it obvious.

>> +    struct kref        kref;
>> +};
>> +
>> +#define for_each_obj_request(ireq, oreq) \
>> +    list_for_each_entry(oreq, &ireq->obj_requests, links)
>> +#define for_each_obj_request_from(ireq, oreq) \
>> +    list_for_each_entry_from(oreq, &ireq->obj_requests, links)
>> +#define for_each_obj_request_safe(ireq, oreq, n) \
>> +    list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests,
>> links)
>> +
>>   /*
>>    * a single io request
>>    */

. . .

>> @@ -1395,6 +1512,26 @@ done:
>>       return ret;
>>   }
>>
>> +static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
>> +                struct rbd_obj_request *obj_request)
>> +{
>> +    return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
>> +}
>> +
>> +static void rbd_img_request_complete(struct rbd_img_request
>> *img_request)
>> +{
>> +    if (img_request->callback)
>> +        img_request->callback(img_request);
>> +    else
>> +        rbd_img_request_put(img_request);
>> +}
> 
> Why rely on the callback to rbd_img_request_put()? Wouldn't it be a
> bit simpler to unconditionally do the put here?

I think it's because I wanted the callback to have the chance
to defer completion, and hang onto the reference until that
time rather than taking another reference.  But right now it
isn't used so it's sort of moot anyway.

Unless you object, I'm going to leave it as-is for now,
and when I look at upcoming patches that will use this
functionality I may change it to drop the reference
unconditionally as you suggest.

>> +static void rbd_obj_request_complete(struct rbd_obj_request
>> *obj_request)
>> +{
>> +    if (obj_request->callback)
>> +        obj_request->callback(obj_request);
>> +}
>> +
>>   /*
>>    * Request sync osd read
>>    */

. . .

>> +static struct ceph_osd_request *rbd_osd_req_create(
>> +                    struct rbd_device *rbd_dev,
>> +                    bool write_request,
>> +                    struct rbd_obj_request *obj_request,
>> +                    struct ceph_osd_req_op *op)
>> +{
>> +    struct rbd_img_request *img_request = obj_request->img_request;
>> +    struct ceph_snap_context *snapc = NULL;
>> +    struct ceph_osd_client *osdc;
>> +    struct ceph_osd_request *osd_req;
>> +    struct timespec now;
>> +    struct timespec *mtime;
>> +    u64 snap_id = CEPH_NOSNAP;
>> +    u64 offset = obj_request->offset;
>> +    u64 length = obj_request->length;
>> +
>> +    if (img_request) {
>> +        rbd_assert(img_request->write_request == write_request);
>> +        if (img_request->write_request)
>> +            snapc = img_request->snapc;
>> +        else
>> +            snap_id = img_request->snap_id;
>> +    }
>> +
>> +    /* Allocate and initialize the request, for the single op */
>> +
>> +    osdc = &rbd_dev->rbd_client->client->osdc;
>> +    osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false,
>> GFP_ATOMIC);
>> +    if (!osd_req)
>> +        return NULL;    /* ENOMEM */
>> +
>> +    rbd_assert(obj_req_type_valid(obj_request->type));
>> +    switch (obj_request->type) {
>> +    case obj_req_bio:
>> +        rbd_assert(obj_request->bio_list != NULL);
>> +        osd_req->r_bio = obj_request->bio_list;
>> +        bio_get(osd_req->r_bio);
>> +        /* osd client requires "num pages" even for bio */
>> +        osd_req->r_num_pages = calc_pages_for(offset, length);
>> +        break;
>> +    }
>> +
>> +    if (write_request) {
>> +        osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
>> +        now = CURRENT_TIME;
>> +        mtime = &now;
>> +    } else {
>> +        osd_req->r_flags = CEPH_OSD_FLAG_READ;
>> +        mtime = NULL;    /* not needed for reads */
>> +        offset = 0;    /* These are not used... */
>> +        length = 0;    /* ...for osd read requests */
>> +    }
>> +
>> +    osd_req->r_callback = rbd_osd_req_callback;
>> +    osd_req->r_priv = obj_request;
>> +
>> +    /* No trailing '\0' required for the object name in the request */
> 
> It looks like ceph_calc_object_layout() does require the trailing '\0':

Bummer.  The request itself doesn't need it.

You're right though.  I'll fix that.  (And I'm inclined to
fix ceph_calc_object_layout() so it doesn't require it...)

All that's required is changing "<=" to "<" in this assertion:
        rbd_assert(osd_req->r_oid_len <= sizeof (osd_req->r_oid));

> osd_client.c:
>   __map_request()
>     ceph_calc_object_layout(...,->r_oid,...)
>       strlen(oid)
> 
>> +    osd_req->r_oid_len = strlen(obj_request->object_name);
>> +    rbd_assert(osd_req->r_oid_len <= sizeof (osd_req->r_oid));
>> +    memcpy(osd_req->r_oid, obj_request->object_name,
>> osd_req->r_oid_len);
>> +
>> +    osd_req->r_file_layout = rbd_dev->layout;    /* struct */
>> +
>> +    /* osd_req will get its own reference to snapc (if non-null) */
>> +
>> +    ceph_osdc_build_request(osd_req, offset, length, 1, op,
>> +                snapc, snap_id, mtime);
>> +
>> +    return osd_req;
>> +}

. . .

>> +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
>> +{
>> +    struct rbd_img_request *img_request;
>> +    u32 which = obj_request->which;
>> +    bool more = true;
>> +
>> +    img_request = obj_request->img_request;
>> +    rbd_assert(img_request != NULL);
>> +    rbd_assert(img_request->rq != NULL);
>> +    rbd_assert(which != BAD_WHICH);
>> +    rbd_assert(which < img_request->obj_request_count);
>> +    rbd_assert(which >= img_request->next_completion);
>> +
>> +    spin_lock(&img_request->completion_lock);
> 
> In the current equivalent code (rbd_coll_end_req_index), we use
> spin_lock_irq(), and don't hold the spinlock while calling
> blk_end_request.
> 
> Why the change, and is this change safe?

In the current "collection" code, the queue lock for the rbd
device is used.  And that lock *is* held (as required) when
__blk_end_request() function is called (and interrupts are
disabled)

In the new code, each image request has its own lock.  We
call blk_end_request() when there's something to tell the
block layer about (and that form of the function will
acquire the queue lock itself).  This separates the lock
from the Linux block code.

It's possible it is not be safe though.  Without thinking
a bit harder about this I'm not entirely sure whether I
need to disable interrupts.  So for now I'll just change
it to use spin_lock_irq() to be on the safe (and easier)
side.

As I said earlier, I'm not even sure a spinlock is
required here, atomic operations and barriers around
accesses to the next_completion field may be enough.
But that's an optimization for another day...

>> +    if (which != img_request->next_completion)
>> +        goto out;
>> +
>> +    for_each_obj_request_from(img_request, obj_request) {
>> +        unsigned int xferred;
>> +        int result;
>> +
>> +        rbd_assert(more);
>> +        rbd_assert(which < img_request->obj_request_count);
>> +
>> +        if (!atomic_read(&obj_request->done))
>> +            break;
>> +
>> +        rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
>> +        xferred = (unsigned int) obj_request->xferred;
>> +        result = (int) obj_request->result;
>> +        if (result)
>> +            rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
>> +                img_request->write_request ? "write" : "read",
>> +                result, xferred);
>> +
>> +        more = blk_end_request(img_request->rq, result, xferred);
>> +        which++;
>> +    }
>> +    rbd_assert(more ^ (which == img_request->obj_request_count));
>> +    img_request->next_completion = which;
>> +out:
>> +    spin_unlock(&img_request->completion_lock);
>> +
>> +    if (!more)
>> +        rbd_img_request_complete(img_request);
>> +}
>> +

. . .


--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Patch
diff mbox

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 6689363..46a61dd 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -181,6 +181,67 @@  struct rbd_req_coll {
 	struct rbd_req_status	status[0];
 };

+struct rbd_img_request;
+typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
+
+#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
+
+struct rbd_obj_request;
+typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
+
+enum obj_req_type { obj_req_bio };	/* More types to come */
+
+struct rbd_obj_request {
+	const char		*object_name;
+	u64			offset;		/* object start byte */
+	u64			length;		/* bytes from offset */
+
+	struct rbd_img_request	*img_request;
+	struct list_head	links;
+	u32			which;		/* posn image request list */
+
+	enum obj_req_type	type;
+	struct bio		*bio_list;
+
+	struct ceph_osd_request	*osd_req;
+
+	u64			xferred;	/* bytes transferred */
+	u64			version;
+	s32			result;
+	atomic_t		done;
+
+	rbd_obj_callback_t	callback;
+
+	struct kref		kref;
+};
+
+struct rbd_img_request {
+	struct request		*rq;
+	struct rbd_device	*rbd_dev;
+	u64			offset;	/* starting image byte offset */
+	u64			length;	/* byte count from offset */
+	bool			write_request;	/* false for read */
+	union {
+		struct ceph_snap_context *snapc;	/* for writes */
+		u64		snap_id;		/* for reads */
+	};
+	spinlock_t		completion_lock;
+	u32			next_completion;
+	rbd_img_callback_t	callback;
+
+	u32			obj_request_count;
+	struct list_head	obj_requests;
+
+	struct kref		kref;
+};
+
+#define for_each_obj_request(ireq, oreq) \
+	list_for_each_entry(oreq, &ireq->obj_requests, links)
+#define for_each_obj_request_from(ireq, oreq) \
+	list_for_each_entry_from(oreq, &ireq->obj_requests, links)
+#define for_each_obj_request_safe(ireq, oreq, n) \
+	list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
+
 /*
  * a single io request
  */
@@ -1031,6 +1092,62 @@  out_err:
 	return NULL;
 }

+static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
+{
+	kref_get(&obj_request->kref);
+}
+
+static void rbd_obj_request_destroy(struct kref *kref);
+static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
+{
+	rbd_assert(obj_request != NULL);
+	kref_put(&obj_request->kref, rbd_obj_request_destroy);
+}
+
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+	kref_get(&img_request->kref);
+}
+
+static void rbd_img_request_destroy(struct kref *kref);
+static void rbd_img_request_put(struct rbd_img_request *img_request)
+{
+	rbd_assert(img_request != NULL);
+	kref_put(&img_request->kref, rbd_img_request_destroy);
+}
+
+static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request,
+					struct rbd_obj_request *obj_request)
+{
+	rbd_obj_request_get(obj_request);
+	obj_request->img_request = img_request;
+	list_add_tail(&obj_request->links, &img_request->obj_requests);
+	obj_request->which = img_request->obj_request_count++;
+	rbd_assert(obj_request->which != BAD_WHICH);
+}
+
+static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request,
+					struct rbd_obj_request *obj_request)
+{
+	rbd_assert(obj_request->which != BAD_WHICH);
+	obj_request->which = BAD_WHICH;
+	list_del(&obj_request->links);
+	rbd_assert(obj_request->img_request == img_request);
+	obj_request->callback = NULL;
+	obj_request->img_request = NULL;
+	rbd_obj_request_put(obj_request);
+}
+
+static bool obj_req_type_valid(enum obj_req_type type)
+{
+	switch (type) {
+	case obj_req_bio:
+		return true;
+	default:
+		return false;
+	}
+}
+
 struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)