@@ -99,6 +99,7 @@ struct ceph_osd_req_op {
struct ceph_osd_data request_info;
struct ceph_osd_data request_data;
struct ceph_osd_data response_data;
+ struct ceph_osd_data chain_data;
__u8 class_len;
__u8 method_len;
__u8 argc;
@@ -907,6 +907,10 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
BUG_ON(!data->pages);
BUG_ON(!data->length);
+ /*
+ * bug here if a short read occurs and length < data->length; see
+ * http://tracker.ceph.com/issues/11424
+ */
cursor->resid = min(length, data->length);
page_count = calc_pages_for(data->alignment, (u64)data->length);
cursor->page_offset = data->alignment & ~PAGE_MASK;
@@ -301,6 +301,73 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
}
}
+static int __build_op_cls_chain(struct ceph_osd_request *osd_req)
+{
+ u64 chain_length = 0;
+ u32 chain_pagecount = 0;
+ struct ceph_osd_req_op *op = NULL;
+ struct ceph_osd_data *osd_data;
+ struct ceph_osd_data *chain_data;
+ struct page **pages;
+ int i;
+
+ chain_data = osd_req_op_data(osd_req, 0, cls, chain_data);
+
+ for (i=0; i<osd_req->r_num_ops; i++)
+ {
+ op = &osd_req->r_ops[i];
+ if (op->op != CEPH_OSD_OP_CALL)
+ break;
+
+ osd_data = osd_req_op_data(osd_req, i, cls, chain_data);
+ osd_data->length = 0;
+
+ osd_data = osd_req_op_data(osd_req, i, cls, response_data);
+ chain_length += osd_data->length;
+ }
+
+ chain_data->length = chain_length;
+ chain_pagecount = (u32)calc_pages_for(0, chain_data->length);
+ pages = ceph_alloc_page_vector(chain_pagecount, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+ ceph_osd_data_pages_init(chain_data, pages, chain_length, 0, false, false);
+
+ return 0;
+}
+
+static int __split_cls_op_chain(struct ceph_osd_request *osd_req)
+{
+ int i;
+ void * data;
+ void * p;
+ struct ceph_osd_data *osd_data;
+
+ osd_data = osd_req_op_data(osd_req, 0, cls, chain_data);
+
+ if (osd_data->length == 0)
+ return 0;
+
+ data = kzalloc(osd_data->length, GFP_KERNEL);
+ if (!data)
+ return -ENOMEM;
+
+ ceph_copy_from_page_vector(osd_data->pages, data, 0, osd_data->length);
+ ceph_osd_data_release(osd_data);
+
+ p = data;
+ for (i = 0; i < osd_req->r_num_ops; i++)
+ {
+ osd_data = osd_req_op_data(osd_req, i, cls, response_data);
+ ceph_copy_to_page_vector(osd_data->pages, p,
+ 0, osd_req->r_reply_op_len[i]);
+ p += osd_req->r_reply_op_len[i];
+ }
+
+ kfree(data);
+ return 0;
+}
+
/*
* requests
*/
@@ -694,8 +761,20 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
src->payload_len += data_length;
request_data_len += data_length;
}
- osd_data = &src->cls.response_data;
- ceph_osdc_msg_data_add(req->r_reply, osd_data);
+ if (which == 0)
+ {
+ int err;
+
+ err = __build_op_cls_chain(req);
+ if (err == -ENOMEM)
+ {
+ pr_err("error allocating memory for op chain\n");
+ return 0;
+ }
+ osd_data = &src->cls.chain_data;
+ if (osd_data->length)
+ ceph_osdc_msg_data_add(req->r_reply, osd_data);
+ }
break;
case CEPH_OSD_OP_STARTSYNC:
break;
@@ -1825,6 +1904,15 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
for (i = 0; i < numops; i++)
req->r_reply_op_result[i] = ceph_decode_32(&p);
+ if (req->r_ops[0].op == CEPH_OSD_OP_CALL &&
+ req->r_ops[0].cls.chain_data.length)
+ {
+ int err;
+ err = __split_cls_op_chain(req);
+ if (err == -ENOMEM)
+ goto bad_put;
+ }
+
if (le16_to_cpu(msg->hdr.version) >= 6) {
p += 8 + 4; /* skip replay_version */
p += 8; /* skip user_version */
Messages are received from the wire directly into destination buffers. A short read could result in corruption of the following destination buffers. Allocate a single message buffer for all class method calls and split them at the osd_client level. This only applies to ceph_msgs containing multiple op call and may break support for ceph_msgs containing a mix of class method calls that return data and other ops. Signed-off-by: Douglas Fuller <dfuller@redhat.com> --- include/linux/ceph/osd_client.h | 1 + net/ceph/messenger.c | 4 ++ net/ceph/osd_client.c | 92 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 95 insertions(+), 2 deletions(-)