diff mbox series

[06/20] rbd: introduce obj_req->osd_reqs list

Message ID 20190625144111.11270-7-idryomov@gmail.com (mailing list archive)
State New, archived
Headers show
Series rbd: support for object-map and fast-diff | expand

Commit Message

Ilya Dryomov June 25, 2019, 2:40 p.m. UTC
Since the dawn of time it had been assumed that a single object request
spawns a single OSD request.  This is already impacting copyup: instead
of sending empty and current snapc copyups together, we wait for empty
snapc OSD request to complete in order to reassign obj_req->osd_req
with current snapc OSD request.  Looking further, updating potentially
hundreds of snapshot object maps serially is a non-starter.

Replace obj_req->osd_req pointer with obj_req->osd_reqs list.  Use
osd_req->r_unsafe_item for linkage -- it's used by the filesystem for
a similar purpose.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
---
 drivers/block/rbd.c | 191 +++++++++++++++++++++++---------------------
 1 file changed, 100 insertions(+), 91 deletions(-)

Comments

Dongsheng Yang July 1, 2019, 5:28 a.m. UTC | #1
On 06/25/2019 10:40 PM, Ilya Dryomov wrote:
> Since the dawn of time it had been assumed that a single object request
> spawns a single OSD request.  This is already impacting copyup: instead
> of sending empty and current snapc copyups together, we wait for empty
> snapc OSD request to complete in order to reassign obj_req->osd_req
> with current snapc OSD request.  Looking further, updating potentially
> hundreds of snapshot object maps serially is a non-starter.
>
> Replace obj_req->osd_req pointer with obj_req->osd_reqs list.  Use
> osd_req->r_unsafe_item for linkage -- it's used by the filesystem for
> a similar purpose.
>
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>

Reviewed-by: Dongsheng Yang <dongsheng.yang@easystack.cn>
> ---
>   drivers/block/rbd.c | 191 +++++++++++++++++++++++---------------------
>   1 file changed, 100 insertions(+), 91 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 51dd1b99c242..5c34fe215c63 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -289,7 +289,7 @@ struct rbd_obj_request {
>   	struct bio_vec		*copyup_bvecs;
>   	u32			copyup_bvec_count;
>   
> -	struct ceph_osd_request	*osd_req;
> +	struct list_head	osd_reqs;	/* w/ r_unsafe_item */
>   
>   	struct mutex		state_mutex;
>   	struct kref		kref;
> @@ -1410,7 +1410,9 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
>   
>   static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
>   {
> -	struct ceph_osd_request *osd_req = obj_request->osd_req;
> +	struct ceph_osd_request *osd_req =
> +	    list_last_entry(&obj_request->osd_reqs, struct ceph_osd_request,
> +			    r_unsafe_item);
>   
>   	dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
>   	     obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
> @@ -1497,7 +1499,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
>   
>   	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
>   	     osd_req->r_result, obj_req);
> -	rbd_assert(osd_req == obj_req->osd_req);
>   
>   	/*
>   	 * Writes aren't allowed to return a data payload.  In some
> @@ -1512,17 +1513,17 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
>   	rbd_obj_handle_request(obj_req, result);
>   }
>   
> -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
> +static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
>   {
> -	struct ceph_osd_request *osd_req = obj_request->osd_req;
> +	struct rbd_obj_request *obj_request = osd_req->r_priv;
>   
>   	osd_req->r_flags = CEPH_OSD_FLAG_READ;
>   	osd_req->r_snapid = obj_request->img_request->snap_id;
>   }
>   
> -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
> +static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
>   {
> -	struct ceph_osd_request *osd_req = obj_request->osd_req;
> +	struct rbd_obj_request *obj_request = osd_req->r_priv;
>   
>   	osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
>   	ktime_get_real_ts64(&osd_req->r_mtime);
> @@ -1530,19 +1531,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
>   }
>   
>   static struct ceph_osd_request *
> -__rbd_osd_req_create(struct rbd_obj_request *obj_req,
> -		     struct ceph_snap_context *snapc, unsigned int num_ops)
> +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
> +			  struct ceph_snap_context *snapc, int num_ops)
>   {
>   	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>   	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
>   	struct ceph_osd_request *req;
>   	const char *name_format = rbd_dev->image_format == 1 ?
>   				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
> +	int ret;
>   
>   	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
>   	if (!req)
> -		return NULL;
> +		return ERR_PTR(-ENOMEM);
>   
> +	list_add_tail(&req->r_unsafe_item, &obj_req->osd_reqs);
>   	req->r_callback = rbd_osd_req_callback;
>   	req->r_priv = obj_req;
>   
> @@ -1553,27 +1556,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req,
>   	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
>   	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
>   
> -	if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
> -			rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
> -		goto err_req;
> +	ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
> +			       rbd_dev->header.object_prefix,
> +			       obj_req->ex.oe_objno);
> +	if (ret)
> +		return ERR_PTR(ret);
>   
>   	return req;
> -
> -err_req:
> -	ceph_osdc_put_request(req);
> -	return NULL;
>   }
>   
>   static struct ceph_osd_request *
> -rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
> +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
>   {
> -	return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
> -				    num_ops);
> -}
> -
> -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
> -{
> -	ceph_osdc_put_request(osd_req);
> +	return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
> +					 num_ops);
>   }
>   
>   static struct rbd_obj_request *rbd_obj_request_create(void)
> @@ -1585,6 +1581,7 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
>   		return NULL;
>   
>   	ceph_object_extent_init(&obj_request->ex);
> +	INIT_LIST_HEAD(&obj_request->osd_reqs);
>   	mutex_init(&obj_request->state_mutex);
>   	kref_init(&obj_request->kref);
>   
> @@ -1595,14 +1592,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
>   static void rbd_obj_request_destroy(struct kref *kref)
>   {
>   	struct rbd_obj_request *obj_request;
> +	struct ceph_osd_request *osd_req;
>   	u32 i;
>   
>   	obj_request = container_of(kref, struct rbd_obj_request, kref);
>   
>   	dout("%s: obj %p\n", __func__, obj_request);
>   
> -	if (obj_request->osd_req)
> -		rbd_osd_req_destroy(obj_request->osd_req);
> +	while (!list_empty(&obj_request->osd_reqs)) {
> +		osd_req = list_first_entry(&obj_request->osd_reqs,
> +					struct ceph_osd_request, r_unsafe_item);
> +		list_del_init(&osd_req->r_unsafe_item);
> +		ceph_osdc_put_request(osd_req);
> +	}
>   
>   	switch (obj_request->img_request->data_type) {
>   	case OBJ_REQUEST_NODATA:
> @@ -1796,11 +1798,13 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
>   	return 0;
>   }
>   
> -static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
> +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
>   {
> +	struct rbd_obj_request *obj_req = osd_req->r_priv;
> +
>   	switch (obj_req->img_request->data_type) {
>   	case OBJ_REQUEST_BIO:
> -		osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
> +		osd_req_op_extent_osd_data_bio(osd_req, which,
>   					       &obj_req->bio_pos,
>   					       obj_req->ex.oe_len);
>   		break;
> @@ -1809,7 +1813,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
>   		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
>   							obj_req->ex.oe_len);
>   		rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
> -		osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
> +		osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
>   						    &obj_req->bvec_pos);
>   		break;
>   	default:
> @@ -1819,21 +1823,22 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
>   
>   static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
>   {
> -	obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	struct ceph_osd_request *osd_req;
>   
> -	osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
> +	osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
> +
> +	osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
>   			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
> -	rbd_osd_req_setup_data(obj_req, 0);
> +	rbd_osd_setup_data(osd_req, 0);
>   
> -	rbd_osd_req_format_read(obj_req);
> +	rbd_osd_format_read(osd_req);
>   	obj_req->read_state = RBD_OBJ_READ_START;
>   	return 0;
>   }
>   
> -static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
> -				unsigned int which)
> +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
>   {
>   	struct page **pages;
>   
> @@ -1849,8 +1854,8 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
>   	if (IS_ERR(pages))
>   		return PTR_ERR(pages);
>   
> -	osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
> -	osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
> +	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
> +	osd_req_op_raw_data_in_pages(osd_req, which, pages,
>   				     8 + sizeof(struct ceph_timespec),
>   				     0, false, true);
>   	return 0;
> @@ -1861,13 +1866,14 @@ static int count_write_ops(struct rbd_obj_request *obj_req)
>   	return 2; /* setallochint + write/writefull */
>   }
>   
> -static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
> -				  unsigned int which)
> +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
> +				      int which)
>   {
> +	struct rbd_obj_request *obj_req = osd_req->r_priv;
>   	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>   	u16 opcode;
>   
> -	osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
> +	osd_req_op_alloc_hint_init(osd_req, which++,
>   				   rbd_dev->layout.object_size,
>   				   rbd_dev->layout.object_size);
>   
> @@ -1876,16 +1882,16 @@ static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
>   	else
>   		opcode = CEPH_OSD_OP_WRITE;
>   
> -	osd_req_op_extent_init(obj_req->osd_req, which, opcode,
> +	osd_req_op_extent_init(osd_req, which, opcode,
>   			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
> -	rbd_osd_req_setup_data(obj_req, which++);
> +	rbd_osd_setup_data(osd_req, which);
>   
> -	rbd_assert(which == obj_req->osd_req->r_num_ops);
> -	rbd_osd_req_format_write(obj_req);
> +	rbd_osd_format_write(osd_req);
>   }
>   
>   static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
>   {
> +	struct ceph_osd_request *osd_req;
>   	unsigned int num_osd_ops, which = 0;
>   	int ret;
>   
> @@ -1901,18 +1907,18 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
>   	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
>   		num_osd_ops++; /* stat */
>   
> -	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
>   
>   	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
> -		ret = __rbd_obj_setup_stat(obj_req, which++);
> +		ret = rbd_osd_setup_stat(osd_req, which++);
>   		if (ret)
>   			return ret;
>   	}
>   
>   	obj_req->write_state = RBD_OBJ_WRITE_START;
> -	__rbd_obj_setup_write(obj_req, which);
> +	__rbd_osd_setup_write_ops(osd_req, which);
>   	return 0;
>   }
>   
> @@ -1925,6 +1931,7 @@ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
>   static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
>   {
>   	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +	struct ceph_osd_request *osd_req;
>   	u64 off = obj_req->ex.oe_off;
>   	u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
>   	int ret;
> @@ -1953,24 +1960,24 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
>   	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
>   		obj_req->flags |= RBD_OBJ_FLAG_DELETION;
>   
> -	obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	osd_req = rbd_obj_add_osd_request(obj_req, 1);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
>   
>   	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
>   		rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
> -		osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
> +		osd_req_op_init(osd_req, 0, CEPH_OSD_OP_DELETE, 0);
>   	} else {
>   		dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
>   		     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
>   		     off, next_off - off);
> -		osd_req_op_extent_init(obj_req->osd_req, 0,
> +		osd_req_op_extent_init(osd_req, 0,
>   				       truncate_or_zero_opcode(obj_req),
>   				       off, next_off - off, 0, 0);
>   	}
>   
>   	obj_req->write_state = RBD_OBJ_WRITE_START;
> -	rbd_osd_req_format_write(obj_req);
> +	rbd_osd_format_write(osd_req);
>   	return 0;
>   }
>   
> @@ -1987,20 +1994,21 @@ static int count_zeroout_ops(struct rbd_obj_request *obj_req)
>   	return num_osd_ops;
>   }
>   
> -static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
> -				    unsigned int which)
> +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
> +					int which)
>   {
> +	struct rbd_obj_request *obj_req = osd_req->r_priv;
>   	u16 opcode;
>   
>   	if (rbd_obj_is_entire(obj_req)) {
>   		if (obj_req->num_img_extents) {
>   			if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
> -				osd_req_op_init(obj_req->osd_req, which++,
> +				osd_req_op_init(osd_req, which++,
>   						CEPH_OSD_OP_CREATE, 0);
>   			opcode = CEPH_OSD_OP_TRUNCATE;
>   		} else {
>   			rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
> -			osd_req_op_init(obj_req->osd_req, which++,
> +			osd_req_op_init(osd_req, which++,
>   					CEPH_OSD_OP_DELETE, 0);
>   			opcode = 0;
>   		}
> @@ -2009,16 +2017,16 @@ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
>   	}
>   
>   	if (opcode)
> -		osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
> +		osd_req_op_extent_init(osd_req, which, opcode,
>   				       obj_req->ex.oe_off, obj_req->ex.oe_len,
>   				       0, 0);
>   
> -	rbd_assert(which == obj_req->osd_req->r_num_ops);
> -	rbd_osd_req_format_write(obj_req);
> +	rbd_osd_format_write(osd_req);
>   }
>   
>   static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>   {
> +	struct ceph_osd_request *osd_req;
>   	unsigned int num_osd_ops, which = 0;
>   	int ret;
>   
> @@ -2038,18 +2046,18 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>   	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
>   		num_osd_ops++; /* stat */
>   
> -	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
>   
>   	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
> -		ret = __rbd_obj_setup_stat(obj_req, which++);
> +		ret = rbd_osd_setup_stat(osd_req, which++);
>   		if (ret)
>   			return ret;
>   	}
>   
>   	obj_req->write_state = RBD_OBJ_WRITE_START;
> -	__rbd_obj_setup_zeroout(obj_req, which);
> +	__rbd_osd_setup_zeroout_ops(osd_req, which);
>   	return 0;
>   }
>   
> @@ -2061,6 +2069,7 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>   static int __rbd_img_fill_request(struct rbd_img_request *img_req)
>   {
>   	struct rbd_obj_request *obj_req, *next_obj_req;
> +	struct ceph_osd_request *osd_req;
>   	int ret;
>   
>   	for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
> @@ -2087,7 +2096,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
>   			continue;
>   		}
>   
> -		ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +		osd_req = list_last_entry(&obj_req->osd_reqs,
> +					  struct ceph_osd_request,
> +					  r_unsafe_item);
> +		ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>   		if (ret)
>   			return ret;
>   	}
> @@ -2538,28 +2550,27 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
>   static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
>   					    u32 bytes)
>   {
> +	struct ceph_osd_request *osd_req;
>   	int ret;
>   
>   	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
> -	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
>   	rbd_assert(bytes > 0 && bytes != MODS_ONLY);
> -	rbd_osd_req_destroy(obj_req->osd_req);
>   
> -	obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
>   
> -	ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
> +	ret = osd_req_op_cls_init(osd_req, 0, "rbd", "copyup");
>   	if (ret)
>   		return ret;
>   
> -	osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
> +	osd_req_op_cls_request_data_bvecs(osd_req, 0,
>   					  obj_req->copyup_bvecs,
>   					  obj_req->copyup_bvec_count,
>   					  bytes);
> -	rbd_osd_req_format_write(obj_req);
> +	rbd_osd_format_write(osd_req);
>   
> -	ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>   	if (ret)
>   		return ret;
>   
> @@ -2570,14 +2581,12 @@ static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
>   static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>   {
>   	struct rbd_img_request *img_req = obj_req->img_request;
> +	struct ceph_osd_request *osd_req;
>   	unsigned int num_osd_ops = (bytes != MODS_ONLY);
>   	unsigned int which = 0;
>   	int ret;
>   
>   	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
> -	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
> -		   obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
> -	rbd_osd_req_destroy(obj_req->osd_req);
>   
>   	switch (img_req->op_type) {
>   	case OBJ_OP_WRITE:
> @@ -2590,17 +2599,17 @@ static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>   		BUG();
>   	}
>   
> -	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -	if (!obj_req->osd_req)
> -		return -ENOMEM;
> +	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +	if (IS_ERR(osd_req))
> +		return PTR_ERR(osd_req);
>   
>   	if (bytes != MODS_ONLY) {
> -		ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
> +		ret = osd_req_op_cls_init(osd_req, which, "rbd",
>   					  "copyup");
>   		if (ret)
>   			return ret;
>   
> -		osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
> +		osd_req_op_cls_request_data_bvecs(osd_req, which++,
>   						  obj_req->copyup_bvecs,
>   						  obj_req->copyup_bvec_count,
>   						  bytes);
> @@ -2608,16 +2617,16 @@ static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>   
>   	switch (img_req->op_type) {
>   	case OBJ_OP_WRITE:
> -		__rbd_obj_setup_write(obj_req, which);
> +		__rbd_osd_setup_write_ops(osd_req, which);
>   		break;
>   	case OBJ_OP_ZEROOUT:
> -		__rbd_obj_setup_zeroout(obj_req, which);
> +		__rbd_osd_setup_zeroout_ops(osd_req, which);
>   		break;
>   	default:
>   		BUG();
>   	}
>   
> -	ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>   	if (ret)
>   		return ret;
>
Jason Dillaman July 1, 2019, 6:34 p.m. UTC | #2
On Tue, Jun 25, 2019 at 10:44 AM Ilya Dryomov <idryomov@gmail.com> wrote:
>
> Since the dawn of time it had been assumed that a single object request
> spawns a single OSD request.  This is already impacting copyup: instead
> of sending empty and current snapc copyups together, we wait for empty
> snapc OSD request to complete in order to reassign obj_req->osd_req
> with current snapc OSD request.  Looking further, updating potentially
> hundreds of snapshot object maps serially is a non-starter.
>
> Replace obj_req->osd_req pointer with obj_req->osd_reqs list.  Use
> osd_req->r_unsafe_item for linkage -- it's used by the filesystem for
> a similar purpose.

Nit: just curious on the history of "r_unsafe_item"'s name. Since it
would be re-used twice for an osd request list, should (could) it be
renamed?

>
> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
> ---
>  drivers/block/rbd.c | 191 +++++++++++++++++++++++---------------------
>  1 file changed, 100 insertions(+), 91 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 51dd1b99c242..5c34fe215c63 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -289,7 +289,7 @@ struct rbd_obj_request {
>         struct bio_vec          *copyup_bvecs;
>         u32                     copyup_bvec_count;
>
> -       struct ceph_osd_request *osd_req;
> +       struct list_head        osd_reqs;       /* w/ r_unsafe_item */
>
>         struct mutex            state_mutex;
>         struct kref             kref;
> @@ -1410,7 +1410,9 @@ static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
>
>  static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
>  {
> -       struct ceph_osd_request *osd_req = obj_request->osd_req;
> +       struct ceph_osd_request *osd_req =
> +           list_last_entry(&obj_request->osd_reqs, struct ceph_osd_request,
> +                           r_unsafe_item);
>
>         dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
>              obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
> @@ -1497,7 +1499,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
>
>         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
>              osd_req->r_result, obj_req);
> -       rbd_assert(osd_req == obj_req->osd_req);
>
>         /*
>          * Writes aren't allowed to return a data payload.  In some
> @@ -1512,17 +1513,17 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
>         rbd_obj_handle_request(obj_req, result);
>  }
>
> -static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
> +static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
>  {
> -       struct ceph_osd_request *osd_req = obj_request->osd_req;
> +       struct rbd_obj_request *obj_request = osd_req->r_priv;
>
>         osd_req->r_flags = CEPH_OSD_FLAG_READ;
>         osd_req->r_snapid = obj_request->img_request->snap_id;
>  }
>
> -static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
> +static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
>  {
> -       struct ceph_osd_request *osd_req = obj_request->osd_req;
> +       struct rbd_obj_request *obj_request = osd_req->r_priv;
>
>         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
>         ktime_get_real_ts64(&osd_req->r_mtime);
> @@ -1530,19 +1531,21 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
>  }
>
>  static struct ceph_osd_request *
> -__rbd_osd_req_create(struct rbd_obj_request *obj_req,
> -                    struct ceph_snap_context *snapc, unsigned int num_ops)
> +__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
> +                         struct ceph_snap_context *snapc, int num_ops)
>  {
>         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
>         struct ceph_osd_request *req;
>         const char *name_format = rbd_dev->image_format == 1 ?
>                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
> +       int ret;
>
>         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
>         if (!req)
> -               return NULL;
> +               return ERR_PTR(-ENOMEM);
>
> +       list_add_tail(&req->r_unsafe_item, &obj_req->osd_reqs);
>         req->r_callback = rbd_osd_req_callback;
>         req->r_priv = obj_req;
>
> @@ -1553,27 +1556,20 @@ __rbd_osd_req_create(struct rbd_obj_request *obj_req,
>         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
>         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
>
> -       if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
> -                       rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
> -               goto err_req;
> +       ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
> +                              rbd_dev->header.object_prefix,
> +                              obj_req->ex.oe_objno);
> +       if (ret)
> +               return ERR_PTR(ret);
>
>         return req;
> -
> -err_req:
> -       ceph_osdc_put_request(req);
> -       return NULL;
>  }
>
>  static struct ceph_osd_request *
> -rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
> +rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
>  {
> -       return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
> -                                   num_ops);
> -}
> -
> -static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
> -{
> -       ceph_osdc_put_request(osd_req);
> +       return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
> +                                        num_ops);
>  }
>
>  static struct rbd_obj_request *rbd_obj_request_create(void)
> @@ -1585,6 +1581,7 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
>                 return NULL;
>
>         ceph_object_extent_init(&obj_request->ex);
> +       INIT_LIST_HEAD(&obj_request->osd_reqs);
>         mutex_init(&obj_request->state_mutex);
>         kref_init(&obj_request->kref);
>
> @@ -1595,14 +1592,19 @@ static struct rbd_obj_request *rbd_obj_request_create(void)
>  static void rbd_obj_request_destroy(struct kref *kref)
>  {
>         struct rbd_obj_request *obj_request;
> +       struct ceph_osd_request *osd_req;
>         u32 i;
>
>         obj_request = container_of(kref, struct rbd_obj_request, kref);
>
>         dout("%s: obj %p\n", __func__, obj_request);
>
> -       if (obj_request->osd_req)
> -               rbd_osd_req_destroy(obj_request->osd_req);
> +       while (!list_empty(&obj_request->osd_reqs)) {
> +               osd_req = list_first_entry(&obj_request->osd_reqs,
> +                                       struct ceph_osd_request, r_unsafe_item);
> +               list_del_init(&osd_req->r_unsafe_item);
> +               ceph_osdc_put_request(osd_req);
> +       }
>
>         switch (obj_request->img_request->data_type) {
>         case OBJ_REQUEST_NODATA:
> @@ -1796,11 +1798,13 @@ static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
>         return 0;
>  }
>
> -static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
> +static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
>  {
> +       struct rbd_obj_request *obj_req = osd_req->r_priv;
> +
>         switch (obj_req->img_request->data_type) {
>         case OBJ_REQUEST_BIO:
> -               osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
> +               osd_req_op_extent_osd_data_bio(osd_req, which,
>                                                &obj_req->bio_pos,
>                                                obj_req->ex.oe_len);
>                 break;
> @@ -1809,7 +1813,7 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
>                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
>                                                         obj_req->ex.oe_len);
>                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
> -               osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
> +               osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
>                                                     &obj_req->bvec_pos);
>                 break;
>         default:
> @@ -1819,21 +1823,22 @@ static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
>
>  static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
>  {
> -       obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       struct ceph_osd_request *osd_req;
>
> -       osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
> +       osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
> +
> +       osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
>                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
> -       rbd_osd_req_setup_data(obj_req, 0);
> +       rbd_osd_setup_data(osd_req, 0);
>
> -       rbd_osd_req_format_read(obj_req);
> +       rbd_osd_format_read(osd_req);
>         obj_req->read_state = RBD_OBJ_READ_START;
>         return 0;
>  }
>
> -static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
> -                               unsigned int which)
> +static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
>  {
>         struct page **pages;
>
> @@ -1849,8 +1854,8 @@ static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
>         if (IS_ERR(pages))
>                 return PTR_ERR(pages);
>
> -       osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
> -       osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
> +       osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
> +       osd_req_op_raw_data_in_pages(osd_req, which, pages,
>                                      8 + sizeof(struct ceph_timespec),
>                                      0, false, true);
>         return 0;
> @@ -1861,13 +1866,14 @@ static int count_write_ops(struct rbd_obj_request *obj_req)
>         return 2; /* setallochint + write/writefull */
>  }
>
> -static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
> -                                 unsigned int which)
> +static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
> +                                     int which)
>  {
> +       struct rbd_obj_request *obj_req = osd_req->r_priv;
>         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
>         u16 opcode;
>
> -       osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
> +       osd_req_op_alloc_hint_init(osd_req, which++,
>                                    rbd_dev->layout.object_size,
>                                    rbd_dev->layout.object_size);
>
> @@ -1876,16 +1882,16 @@ static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
>         else
>                 opcode = CEPH_OSD_OP_WRITE;
>
> -       osd_req_op_extent_init(obj_req->osd_req, which, opcode,
> +       osd_req_op_extent_init(osd_req, which, opcode,
>                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
> -       rbd_osd_req_setup_data(obj_req, which++);
> +       rbd_osd_setup_data(osd_req, which);
>
> -       rbd_assert(which == obj_req->osd_req->r_num_ops);
> -       rbd_osd_req_format_write(obj_req);
> +       rbd_osd_format_write(osd_req);
>  }
>
>  static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
>  {
> +       struct ceph_osd_request *osd_req;
>         unsigned int num_osd_ops, which = 0;
>         int ret;
>
> @@ -1901,18 +1907,18 @@ static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
>         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
>                 num_osd_ops++; /* stat */
>
> -       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
>
>         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
> -               ret = __rbd_obj_setup_stat(obj_req, which++);
> +               ret = rbd_osd_setup_stat(osd_req, which++);
>                 if (ret)
>                         return ret;
>         }
>
>         obj_req->write_state = RBD_OBJ_WRITE_START;
> -       __rbd_obj_setup_write(obj_req, which);
> +       __rbd_osd_setup_write_ops(osd_req, which);
>         return 0;
>  }
>
> @@ -1925,6 +1931,7 @@ static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
>  static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
>  {
>         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
> +       struct ceph_osd_request *osd_req;
>         u64 off = obj_req->ex.oe_off;
>         u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
>         int ret;
> @@ -1953,24 +1960,24 @@ static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
>         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
>                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
>
> -       obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       osd_req = rbd_obj_add_osd_request(obj_req, 1);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
>
>         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
>                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
> -               osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
> +               osd_req_op_init(osd_req, 0, CEPH_OSD_OP_DELETE, 0);
>         } else {
>                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
>                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
>                      off, next_off - off);
> -               osd_req_op_extent_init(obj_req->osd_req, 0,
> +               osd_req_op_extent_init(osd_req, 0,
>                                        truncate_or_zero_opcode(obj_req),
>                                        off, next_off - off, 0, 0);
>         }
>
>         obj_req->write_state = RBD_OBJ_WRITE_START;
> -       rbd_osd_req_format_write(obj_req);
> +       rbd_osd_format_write(osd_req);
>         return 0;
>  }
>
> @@ -1987,20 +1994,21 @@ static int count_zeroout_ops(struct rbd_obj_request *obj_req)
>         return num_osd_ops;
>  }
>
> -static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
> -                                   unsigned int which)
> +static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
> +                                       int which)
>  {
> +       struct rbd_obj_request *obj_req = osd_req->r_priv;
>         u16 opcode;
>
>         if (rbd_obj_is_entire(obj_req)) {
>                 if (obj_req->num_img_extents) {
>                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
> -                               osd_req_op_init(obj_req->osd_req, which++,
> +                               osd_req_op_init(osd_req, which++,
>                                                 CEPH_OSD_OP_CREATE, 0);
>                         opcode = CEPH_OSD_OP_TRUNCATE;
>                 } else {
>                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
> -                       osd_req_op_init(obj_req->osd_req, which++,
> +                       osd_req_op_init(osd_req, which++,
>                                         CEPH_OSD_OP_DELETE, 0);
>                         opcode = 0;
>                 }
> @@ -2009,16 +2017,16 @@ static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
>         }
>
>         if (opcode)
> -               osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
> +               osd_req_op_extent_init(osd_req, which, opcode,
>                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
>                                        0, 0);
>
> -       rbd_assert(which == obj_req->osd_req->r_num_ops);
> -       rbd_osd_req_format_write(obj_req);
> +       rbd_osd_format_write(osd_req);
>  }
>
>  static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>  {
> +       struct ceph_osd_request *osd_req;
>         unsigned int num_osd_ops, which = 0;
>         int ret;
>
> @@ -2038,18 +2046,18 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
>                 num_osd_ops++; /* stat */
>
> -       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
>
>         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
> -               ret = __rbd_obj_setup_stat(obj_req, which++);
> +               ret = rbd_osd_setup_stat(osd_req, which++);
>                 if (ret)
>                         return ret;
>         }
>
>         obj_req->write_state = RBD_OBJ_WRITE_START;
> -       __rbd_obj_setup_zeroout(obj_req, which);
> +       __rbd_osd_setup_zeroout_ops(osd_req, which);
>         return 0;
>  }
>
> @@ -2061,6 +2069,7 @@ static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
>  static int __rbd_img_fill_request(struct rbd_img_request *img_req)
>  {
>         struct rbd_obj_request *obj_req, *next_obj_req;
> +       struct ceph_osd_request *osd_req;
>         int ret;
>
>         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
> @@ -2087,7 +2096,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
>                         continue;
>                 }
>
> -               ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +               osd_req = list_last_entry(&obj_req->osd_reqs,
> +                                         struct ceph_osd_request,
> +                                         r_unsafe_item);
> +               ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>                 if (ret)
>                         return ret;
>         }
> @@ -2538,28 +2550,27 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
>  static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
>                                             u32 bytes)
>  {
> +       struct ceph_osd_request *osd_req;
>         int ret;
>
>         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
> -       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
>         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
> -       rbd_osd_req_destroy(obj_req->osd_req);
>
> -       obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
>
> -       ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
> +       ret = osd_req_op_cls_init(osd_req, 0, "rbd", "copyup");
>         if (ret)
>                 return ret;
>
> -       osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
> +       osd_req_op_cls_request_data_bvecs(osd_req, 0,
>                                           obj_req->copyup_bvecs,
>                                           obj_req->copyup_bvec_count,
>                                           bytes);
> -       rbd_osd_req_format_write(obj_req);
> +       rbd_osd_format_write(osd_req);
>
> -       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>         if (ret)
>                 return ret;
>
> @@ -2570,14 +2581,12 @@ static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
>  static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>  {
>         struct rbd_img_request *img_req = obj_req->img_request;
> +       struct ceph_osd_request *osd_req;
>         unsigned int num_osd_ops = (bytes != MODS_ONLY);
>         unsigned int which = 0;
>         int ret;
>
>         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
> -       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
> -                  obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
> -       rbd_osd_req_destroy(obj_req->osd_req);
>
>         switch (img_req->op_type) {
>         case OBJ_OP_WRITE:
> @@ -2590,17 +2599,17 @@ static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>                 BUG();
>         }
>
> -       obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
> -       if (!obj_req->osd_req)
> -               return -ENOMEM;
> +       osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
> +       if (IS_ERR(osd_req))
> +               return PTR_ERR(osd_req);
>
>         if (bytes != MODS_ONLY) {
> -               ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
> +               ret = osd_req_op_cls_init(osd_req, which, "rbd",
>                                           "copyup");
>                 if (ret)
>                         return ret;
>
> -               osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
> +               osd_req_op_cls_request_data_bvecs(osd_req, which++,
>                                                   obj_req->copyup_bvecs,
>                                                   obj_req->copyup_bvec_count,
>                                                   bytes);
> @@ -2608,16 +2617,16 @@ static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
>
>         switch (img_req->op_type) {
>         case OBJ_OP_WRITE:
> -               __rbd_obj_setup_write(obj_req, which);
> +               __rbd_osd_setup_write_ops(osd_req, which);
>                 break;
>         case OBJ_OP_ZEROOUT:
> -               __rbd_obj_setup_zeroout(obj_req, which);
> +               __rbd_osd_setup_zeroout_ops(osd_req, which);
>                 break;
>         default:
>                 BUG();
>         }
>
> -       ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
> +       ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
>         if (ret)
>                 return ret;
>
> --
> 2.19.2
>
Ilya Dryomov July 3, 2019, 8:53 a.m. UTC | #3
On Mon, Jul 1, 2019 at 8:34 PM Jason Dillaman <jdillama@redhat.com> wrote:
>
> On Tue, Jun 25, 2019 at 10:44 AM Ilya Dryomov <idryomov@gmail.com> wrote:
> >
> > Since the dawn of time it had been assumed that a single object request
> > spawns a single OSD request.  This is already impacting copyup: instead
> > of sending empty and current snapc copyups together, we wait for empty
> > snapc OSD request to complete in order to reassign obj_req->osd_req
> > with current snapc OSD request.  Looking further, updating potentially
> > hundreds of snapshot object maps serially is a non-starter.
> >
> > Replace obj_req->osd_req pointer with obj_req->osd_reqs list.  Use
> > osd_req->r_unsafe_item for linkage -- it's used by the filesystem for
> > a similar purpose.
>
> Nit: just curious on the history of "r_unsafe_item"'s name. Since it
> would be re-used twice for an osd request list, should (could) it be
> renamed?

This is from when we had safe and unsafe replies (commit vs ack).  It
has since become a private list item for use by libceph clients.  I'll
rename it to r_private_item.

Thanks,

                Ilya
diff mbox series

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 51dd1b99c242..5c34fe215c63 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -289,7 +289,7 @@  struct rbd_obj_request {
 	struct bio_vec		*copyup_bvecs;
 	u32			copyup_bvec_count;
 
-	struct ceph_osd_request	*osd_req;
+	struct list_head	osd_reqs;	/* w/ r_unsafe_item */
 
 	struct mutex		state_mutex;
 	struct kref		kref;
@@ -1410,7 +1410,9 @@  static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
 
 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct ceph_osd_request *osd_req =
+	    list_last_entry(&obj_request->osd_reqs, struct ceph_osd_request,
+			    r_unsafe_item);
 
 	dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
 	     obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
@@ -1497,7 +1499,6 @@  static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 
 	dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
 	     osd_req->r_result, obj_req);
-	rbd_assert(osd_req == obj_req->osd_req);
 
 	/*
 	 * Writes aren't allowed to return a data payload.  In some
@@ -1512,17 +1513,17 @@  static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 	rbd_obj_handle_request(obj_req, result);
 }
 
-static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
 
 	osd_req->r_flags = CEPH_OSD_FLAG_READ;
 	osd_req->r_snapid = obj_request->img_request->snap_id;
 }
 
-static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
+static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
 {
-	struct ceph_osd_request *osd_req = obj_request->osd_req;
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
 
 	osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
 	ktime_get_real_ts64(&osd_req->r_mtime);
@@ -1530,19 +1531,21 @@  static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
 }
 
 static struct ceph_osd_request *
-__rbd_osd_req_create(struct rbd_obj_request *obj_req,
-		     struct ceph_snap_context *snapc, unsigned int num_ops)
+__rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
+			  struct ceph_snap_context *snapc, int num_ops)
 {
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_request *req;
 	const char *name_format = rbd_dev->image_format == 1 ?
 				      RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
+	int ret;
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
 	if (!req)
-		return NULL;
+		return ERR_PTR(-ENOMEM);
 
+	list_add_tail(&req->r_unsafe_item, &obj_req->osd_reqs);
 	req->r_callback = rbd_osd_req_callback;
 	req->r_priv = obj_req;
 
@@ -1553,27 +1556,20 @@  __rbd_osd_req_create(struct rbd_obj_request *obj_req,
 	ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
 	req->r_base_oloc.pool = rbd_dev->layout.pool_id;
 
-	if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
-			rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
-		goto err_req;
+	ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
+			       rbd_dev->header.object_prefix,
+			       obj_req->ex.oe_objno);
+	if (ret)
+		return ERR_PTR(ret);
 
 	return req;
-
-err_req:
-	ceph_osdc_put_request(req);
-	return NULL;
 }
 
 static struct ceph_osd_request *
-rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
+rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
 {
-	return __rbd_osd_req_create(obj_req, obj_req->img_request->snapc,
-				    num_ops);
-}
-
-static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
-{
-	ceph_osdc_put_request(osd_req);
+	return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
+					 num_ops);
 }
 
 static struct rbd_obj_request *rbd_obj_request_create(void)
@@ -1585,6 +1581,7 @@  static struct rbd_obj_request *rbd_obj_request_create(void)
 		return NULL;
 
 	ceph_object_extent_init(&obj_request->ex);
+	INIT_LIST_HEAD(&obj_request->osd_reqs);
 	mutex_init(&obj_request->state_mutex);
 	kref_init(&obj_request->kref);
 
@@ -1595,14 +1592,19 @@  static struct rbd_obj_request *rbd_obj_request_create(void)
 static void rbd_obj_request_destroy(struct kref *kref)
 {
 	struct rbd_obj_request *obj_request;
+	struct ceph_osd_request *osd_req;
 	u32 i;
 
 	obj_request = container_of(kref, struct rbd_obj_request, kref);
 
 	dout("%s: obj %p\n", __func__, obj_request);
 
-	if (obj_request->osd_req)
-		rbd_osd_req_destroy(obj_request->osd_req);
+	while (!list_empty(&obj_request->osd_reqs)) {
+		osd_req = list_first_entry(&obj_request->osd_reqs,
+					struct ceph_osd_request, r_unsafe_item);
+		list_del_init(&osd_req->r_unsafe_item);
+		ceph_osdc_put_request(osd_req);
+	}
 
 	switch (obj_request->img_request->data_type) {
 	case OBJ_REQUEST_NODATA:
@@ -1796,11 +1798,13 @@  static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
 	return 0;
 }
 
-static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
+static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
 {
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
+
 	switch (obj_req->img_request->data_type) {
 	case OBJ_REQUEST_BIO:
-		osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
+		osd_req_op_extent_osd_data_bio(osd_req, which,
 					       &obj_req->bio_pos,
 					       obj_req->ex.oe_len);
 		break;
@@ -1809,7 +1813,7 @@  static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
 		rbd_assert(obj_req->bvec_pos.iter.bi_size ==
 							obj_req->ex.oe_len);
 		rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
-		osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
+		osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
 						    &obj_req->bvec_pos);
 		break;
 	default:
@@ -1819,21 +1823,22 @@  static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
 
 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
 {
-	obj_req->osd_req = __rbd_osd_req_create(obj_req, NULL, 1);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	struct ceph_osd_request *osd_req;
 
-	osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
+	osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
+
+	osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
 			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-	rbd_osd_req_setup_data(obj_req, 0);
+	rbd_osd_setup_data(osd_req, 0);
 
-	rbd_osd_req_format_read(obj_req);
+	rbd_osd_format_read(osd_req);
 	obj_req->read_state = RBD_OBJ_READ_START;
 	return 0;
 }
 
-static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
-				unsigned int which)
+static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
 {
 	struct page **pages;
 
@@ -1849,8 +1854,8 @@  static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
 	if (IS_ERR(pages))
 		return PTR_ERR(pages);
 
-	osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
-	osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
+	osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
+	osd_req_op_raw_data_in_pages(osd_req, which, pages,
 				     8 + sizeof(struct ceph_timespec),
 				     0, false, true);
 	return 0;
@@ -1861,13 +1866,14 @@  static int count_write_ops(struct rbd_obj_request *obj_req)
 	return 2; /* setallochint + write/writefull */
 }
 
-static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
-				  unsigned int which)
+static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
+				      int which)
 {
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
 	u16 opcode;
 
-	osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
+	osd_req_op_alloc_hint_init(osd_req, which++,
 				   rbd_dev->layout.object_size,
 				   rbd_dev->layout.object_size);
 
@@ -1876,16 +1882,16 @@  static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
 	else
 		opcode = CEPH_OSD_OP_WRITE;
 
-	osd_req_op_extent_init(obj_req->osd_req, which, opcode,
+	osd_req_op_extent_init(osd_req, which, opcode,
 			       obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
-	rbd_osd_req_setup_data(obj_req, which++);
+	rbd_osd_setup_data(osd_req, which);
 
-	rbd_assert(which == obj_req->osd_req->r_num_ops);
-	rbd_osd_req_format_write(obj_req);
+	rbd_osd_format_write(osd_req);
 }
 
 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
 {
+	struct ceph_osd_request *osd_req;
 	unsigned int num_osd_ops, which = 0;
 	int ret;
 
@@ -1901,18 +1907,18 @@  static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
 	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
 		num_osd_ops++; /* stat */
 
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
 	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
-		ret = __rbd_obj_setup_stat(obj_req, which++);
+		ret = rbd_osd_setup_stat(osd_req, which++);
 		if (ret)
 			return ret;
 	}
 
 	obj_req->write_state = RBD_OBJ_WRITE_START;
-	__rbd_obj_setup_write(obj_req, which);
+	__rbd_osd_setup_write_ops(osd_req, which);
 	return 0;
 }
 
@@ -1925,6 +1931,7 @@  static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
 {
 	struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+	struct ceph_osd_request *osd_req;
 	u64 off = obj_req->ex.oe_off;
 	u64 next_off = obj_req->ex.oe_off + obj_req->ex.oe_len;
 	int ret;
@@ -1953,24 +1960,24 @@  static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
 	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
 		obj_req->flags |= RBD_OBJ_FLAG_DELETION;
 
-	obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = rbd_obj_add_osd_request(obj_req, 1);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
 	if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
 		rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
-		osd_req_op_init(obj_req->osd_req, 0, CEPH_OSD_OP_DELETE, 0);
+		osd_req_op_init(osd_req, 0, CEPH_OSD_OP_DELETE, 0);
 	} else {
 		dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
 		     obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
 		     off, next_off - off);
-		osd_req_op_extent_init(obj_req->osd_req, 0,
+		osd_req_op_extent_init(osd_req, 0,
 				       truncate_or_zero_opcode(obj_req),
 				       off, next_off - off, 0, 0);
 	}
 
 	obj_req->write_state = RBD_OBJ_WRITE_START;
-	rbd_osd_req_format_write(obj_req);
+	rbd_osd_format_write(osd_req);
 	return 0;
 }
 
@@ -1987,20 +1994,21 @@  static int count_zeroout_ops(struct rbd_obj_request *obj_req)
 	return num_osd_ops;
 }
 
-static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
-				    unsigned int which)
+static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
+					int which)
 {
+	struct rbd_obj_request *obj_req = osd_req->r_priv;
 	u16 opcode;
 
 	if (rbd_obj_is_entire(obj_req)) {
 		if (obj_req->num_img_extents) {
 			if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
-				osd_req_op_init(obj_req->osd_req, which++,
+				osd_req_op_init(osd_req, which++,
 						CEPH_OSD_OP_CREATE, 0);
 			opcode = CEPH_OSD_OP_TRUNCATE;
 		} else {
 			rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
-			osd_req_op_init(obj_req->osd_req, which++,
+			osd_req_op_init(osd_req, which++,
 					CEPH_OSD_OP_DELETE, 0);
 			opcode = 0;
 		}
@@ -2009,16 +2017,16 @@  static void __rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req,
 	}
 
 	if (opcode)
-		osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
+		osd_req_op_extent_init(osd_req, which, opcode,
 				       obj_req->ex.oe_off, obj_req->ex.oe_len,
 				       0, 0);
 
-	rbd_assert(which == obj_req->osd_req->r_num_ops);
-	rbd_osd_req_format_write(obj_req);
+	rbd_osd_format_write(osd_req);
 }
 
 static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
 {
+	struct ceph_osd_request *osd_req;
 	unsigned int num_osd_ops, which = 0;
 	int ret;
 
@@ -2038,18 +2046,18 @@  static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
 	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
 		num_osd_ops++; /* stat */
 
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
 	if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
-		ret = __rbd_obj_setup_stat(obj_req, which++);
+		ret = rbd_osd_setup_stat(osd_req, which++);
 		if (ret)
 			return ret;
 	}
 
 	obj_req->write_state = RBD_OBJ_WRITE_START;
-	__rbd_obj_setup_zeroout(obj_req, which);
+	__rbd_osd_setup_zeroout_ops(osd_req, which);
 	return 0;
 }
 
@@ -2061,6 +2069,7 @@  static int rbd_obj_setup_zeroout(struct rbd_obj_request *obj_req)
 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 {
 	struct rbd_obj_request *obj_req, *next_obj_req;
+	struct ceph_osd_request *osd_req;
 	int ret;
 
 	for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
@@ -2087,7 +2096,10 @@  static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 			continue;
 		}
 
-		ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+		osd_req = list_last_entry(&obj_req->osd_reqs,
+					  struct ceph_osd_request,
+					  r_unsafe_item);
+		ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
 		if (ret)
 			return ret;
 	}
@@ -2538,28 +2550,27 @@  static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
 static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
 					    u32 bytes)
 {
+	struct ceph_osd_request *osd_req;
 	int ret;
 
 	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
-	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
 	rbd_assert(bytes > 0 && bytes != MODS_ONLY);
-	rbd_osd_req_destroy(obj_req->osd_req);
 
-	obj_req->osd_req = __rbd_osd_req_create(obj_req, &rbd_empty_snapc, 1);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
-	ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
+	ret = osd_req_op_cls_init(osd_req, 0, "rbd", "copyup");
 	if (ret)
 		return ret;
 
-	osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
+	osd_req_op_cls_request_data_bvecs(osd_req, 0,
 					  obj_req->copyup_bvecs,
 					  obj_req->copyup_bvec_count,
 					  bytes);
-	rbd_osd_req_format_write(obj_req);
+	rbd_osd_format_write(osd_req);
 
-	ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
 	if (ret)
 		return ret;
 
@@ -2570,14 +2581,12 @@  static int rbd_obj_issue_copyup_empty_snapc(struct rbd_obj_request *obj_req,
 static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
 {
 	struct rbd_img_request *img_req = obj_req->img_request;
+	struct ceph_osd_request *osd_req;
 	unsigned int num_osd_ops = (bytes != MODS_ONLY);
 	unsigned int which = 0;
 	int ret;
 
 	dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
-	rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT ||
-		   obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL);
-	rbd_osd_req_destroy(obj_req->osd_req);
 
 	switch (img_req->op_type) {
 	case OBJ_OP_WRITE:
@@ -2590,17 +2599,17 @@  static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
 		BUG();
 	}
 
-	obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
-	if (!obj_req->osd_req)
-		return -ENOMEM;
+	osd_req = rbd_obj_add_osd_request(obj_req, num_osd_ops);
+	if (IS_ERR(osd_req))
+		return PTR_ERR(osd_req);
 
 	if (bytes != MODS_ONLY) {
-		ret = osd_req_op_cls_init(obj_req->osd_req, which, "rbd",
+		ret = osd_req_op_cls_init(osd_req, which, "rbd",
 					  "copyup");
 		if (ret)
 			return ret;
 
-		osd_req_op_cls_request_data_bvecs(obj_req->osd_req, which++,
+		osd_req_op_cls_request_data_bvecs(osd_req, which++,
 						  obj_req->copyup_bvecs,
 						  obj_req->copyup_bvec_count,
 						  bytes);
@@ -2608,16 +2617,16 @@  static int rbd_obj_issue_copyup_ops(struct rbd_obj_request *obj_req, u32 bytes)
 
 	switch (img_req->op_type) {
 	case OBJ_OP_WRITE:
-		__rbd_obj_setup_write(obj_req, which);
+		__rbd_osd_setup_write_ops(osd_req, which);
 		break;
 	case OBJ_OP_ZEROOUT:
-		__rbd_obj_setup_zeroout(obj_req, which);
+		__rbd_osd_setup_zeroout_ops(osd_req, which);
 		break;
 	default:
 		BUG();
 	}
 
-	ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+	ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
 	if (ret)
 		return ret;