diff mbox

[5/6] osd_client: add support for notify payloads via notify event

Message ID 0010a14dcbe868312b9be6c7add8ec9d43fc0a28.1434124007.git.dfuller@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Douglas Fuller June 12, 2015, 3:56 p.m. UTC
Add support in notify events for receiving data from notify_ack. Notify
events are optional; data is discarded if no event is found.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 net/ceph/osd_client.c | 39 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 36 insertions(+), 3 deletions(-)

Comments

Mike Christie June 16, 2015, 3:26 p.m. UTC | #1
On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> @@ -2533,8 +2548,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>  	if (msg->hdr.version >= 2)
>  		ceph_decode_32_safe(&p, end, return_code, bad);
>  
> -	if (msg->hdr.version >= 3)
> +	if (msg->hdr.version >= 3) {
>  		ceph_decode_32_safe(&p, end, notifier_gid, bad);
> +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);

It's not completely clear how/who can use this data. Would rbd be
calling ceph_osdc_create_notify_event/ceph_osdc_create_notify_event, or
is some libceph code (net/ceph)?

If it's rbd, is it supposed to be digging into ceph_msg_data structs?
Did we want to pass it a pagelist or CEPH_OSD_DATA_TYPE_PAGES type of
pages array?



> +	}
>  
>  	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
>  		return_code, notifier_gid, data);
> @@ -3061,7 +3078,23 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  	switch (type) {
>  	case CEPH_MSG_OSD_MAP:
>  	case CEPH_MSG_WATCH_NOTIFY:
> -		return ceph_msg_new(type, front, GFP_NOFS, false);
> +		{
> +			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
> +			size_t len = con->in_hdr.data_len;
> +			if (len > 0) {
> +				struct page **pages;
> +				struct ceph_osd_data osd_data;
> +				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);


You should use GFP_NOIO, or at least GFP_NOFS like above. Anything but
GFP_KERNEL.

Also handle the allocation failure like is done in other places.


> +				osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
> +				osd_data.pages = pages;
> +				osd_data.length = len;
> +				osd_data.alignment = 0;
> +				osd_data.pages_from_pool = false;
> +				osd_data.own_pages = false;
> +				ceph_osdc_msg_data_add(m, &osd_data);
> +			}
> +			return m;
> +		}
>  	case CEPH_MSG_OSD_OPREPLY:
>  		return get_reply(con, hdr, skip);
>  	default:
> 

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Douglas Fuller June 16, 2015, 5:22 p.m. UTC | #2
----- Original Message -----
> From: "Mike Christie" <mchristi@redhat.com>
> To: "Douglas Fuller" <dfuller@redhat.com>, ceph-devel@vger.kernel.org
> Sent: Tuesday, June 16, 2015 11:26:43 AM
> Subject: Re: [PATCH 5/6] osd_client: add support for notify payloads via notify event
> 
> On 06/12/2015 10:56 AM, Douglas Fuller wrote:
> > @@ -2533,8 +2548,10 @@ static void handle_watch_notify(struct
> > ceph_osd_client *osdc,
> >  	if (msg->hdr.version >= 2)
> >  		ceph_decode_32_safe(&p, end, return_code, bad);
> >  
> > -	if (msg->hdr.version >= 3)
> > +	if (msg->hdr.version >= 3) {
> >  		ceph_decode_32_safe(&p, end, notifier_gid, bad);
> > +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> 
> It's not completely clear how/who can use this data. Would rbd be
> calling ceph_osdc_create_notify_event/ceph_osdc_create_notify_event, or
> is some libceph code (net/ceph)?

rbd would be calling ceph_osdc_create_notify_event and then ceph_osdc_wait_event (which
waits for CEPH_WATCH_EVENT_NOTIFY_COMPLETE).

> If it's rbd, is it supposed to be digging into ceph_msg_data structs?
> Did we want to pass it a pagelist or CEPH_OSD_DATA_TYPE_PAGES type of
> pages array?

Yeah, it would be cleaner to just copy the pages pointer and size. I'll change that.

Either way, the decoding gets hairy and cumbersome. I think we should
extend osd_client to have one or two convenience routines like
ceph_osdc_for_each_notifier or something like that.
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d435bf2..d56f7a6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -267,7 +267,7 @@  void osd_req_op_notify_request_data_pagelist(
 {
 	struct ceph_osd_data *osd_data;
 
-	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	osd_data = osd_req_op_data(osd_req, which, watch, request_data);
 	ceph_osd_data_pagelist_init(osd_data, pagelist);
 }
 EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
@@ -629,6 +629,13 @@  void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
 	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+
+	notify_event = __find_event(osd_req->r_osdc, cookie);
+	/* Only linger if the caller is interested in the notify acks. */
+	if (notify_event) {
+		ceph_osdc_set_request_linger(osd_req->r_osdc, osd_req);
+		notify_event->osd_req = osd_req;
+	}
 	op->watch.cookie = cookie;
 }
 EXPORT_SYMBOL(osd_req_op_notify_init);
@@ -767,6 +774,14 @@  static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		ceph_osdc_msg_data_add(req->r_reply, osd_data);
 		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
+		osd_data = &src->watch.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		/* fallthrough */
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
@@ -2533,8 +2548,10 @@  static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 2)
 		ceph_decode_32_safe(&p, end, return_code, bad);
 
-	if (msg->hdr.version >= 3)
+	if (msg->hdr.version >= 3) {
 		ceph_decode_32_safe(&p, end, notifier_gid, bad);
+		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+	}
 
 	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
 		return_code, notifier_gid, data);
@@ -3061,7 +3078,23 @@  static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
-		return ceph_msg_new(type, front, GFP_NOFS, false);
+		{
+			struct ceph_msg *m = ceph_msg_new(type, front, GFP_NOFS, false);
+			size_t len = con->in_hdr.data_len;
+			if (len > 0) {
+				struct page **pages;
+				struct ceph_osd_data osd_data;
+				pages = ceph_alloc_page_vector(calc_pages_for(0, len), GFP_KERNEL);
+				osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
+				osd_data.pages = pages;
+				osd_data.length = len;
+				osd_data.alignment = 0;
+				osd_data.pages_from_pool = false;
+				osd_data.own_pages = false;
+				ceph_osdc_msg_data_add(m, &osd_data);
+			}
+			return m;
+		}
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);
 	default: