diff mbox

[1/6] ceph/rbd: add support for watch notify payloads

Message ID d6abc610deea16335ce674cbe473131e2577844f.1434124007.git.dfuller@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Douglas Fuller June 12, 2015, 3:56 p.m. UTC
From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support for proto version 1 of watch-notify,
so drivers like rbd can be sent a buffer with information like
the notify operation being performed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
---
 drivers/block/rbd.c             |  3 ++-
 include/linux/ceph/osd_client.h |  7 +++++--
 net/ceph/osd_client.c           | 21 ++++++++++++++++-----
 3 files changed, 23 insertions(+), 8 deletions(-)

Comments

Josh Durgin June 16, 2015, 9:22 p.m. UTC | #1
On 06/12/2015 08:56 AM, Douglas Fuller wrote:
> From: Mike Christie <michaelc@cs.wisc.edu>
>
> This patch adds support for proto version 1 of watch-notify,
> so drivers like rbd can be sent a buffer with information like
> the notify operation being performed.
>
> Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>

Reviewed-by: Josh Durgin <jdurgin@redhat.com>

> ---
>   drivers/block/rbd.c             |  3 ++-
>   include/linux/ceph/osd_client.h |  7 +++++--
>   net/ceph/osd_client.c           | 21 ++++++++++++++++-----
>   3 files changed, 23 insertions(+), 8 deletions(-)
>
> diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
> index 89fe8a4..4b9ba9f 100644
> --- a/drivers/block/rbd.c
> +++ b/drivers/block/rbd.c
> @@ -3103,7 +3103,8 @@ out:
>   	return ret;
>   }
>
> -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
> +static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
> +			 void *payload, int payload_len)
>   {
>   	struct rbd_device *rbd_dev = (struct rbd_device *)data;
>   	int ret;
> diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
> index 7506b48..eab96b5 100644
> --- a/include/linux/ceph/osd_client.h
> +++ b/include/linux/ceph/osd_client.h
> @@ -184,7 +184,7 @@ struct ceph_osd_event {
>   	u64 cookie;
>   	int one_shot;
>   	struct ceph_osd_client *osdc;
> -	void (*cb)(u64, u64, u8, void *);
> +	void (*cb)(u64, u64, u8, void *, void *, int);
>   	void *data;
>   	struct rb_node node;
>   	struct list_head osd_node;
> @@ -197,6 +197,8 @@ struct ceph_osd_event_work {
>           u64 ver;
>           u64 notify_id;
>           u8 opcode;
> +	void *payload;
> +	int payload_len;
>   };
>
>   struct ceph_osd_client {
> @@ -369,7 +371,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
>
>   /* watch/notify events */
>   extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -				  void (*event_cb)(u64, u64, u8, void *),
> +				  void (*event_cb)(u64, u64, u8, void *, void *,
> +						   int),
>   				  void *data, struct ceph_osd_event **pevent);
>   extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
>   extern void ceph_osdc_put_event(struct ceph_osd_event *event);
> diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
> index 5003367..aa1c5c46 100644
> --- a/net/ceph/osd_client.c
> +++ b/net/ceph/osd_client.c
> @@ -2277,7 +2277,7 @@ static void __remove_event(struct ceph_osd_event *event)
>   }
>
>   int ceph_osdc_create_event(struct ceph_osd_client *osdc,
> -			   void (*event_cb)(u64, u64, u8, void *),
> +			   void (*event_cb)(u64, u64, u8, void *, void *, int),
>   			   void *data, struct ceph_osd_event **pevent)
>   {
>   	struct ceph_osd_event *event;
> @@ -2329,7 +2329,8 @@ static void do_event_work(struct work_struct *work)
>   	u8 opcode = event_work->opcode;
>
>   	dout("do_event_work completing %p\n", event);
> -	event->cb(ver, notify_id, opcode, event->data);
> +	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
> +		  event_work->payload_len);
>   	dout("do_event_work completed %p\n", event);
>   	ceph_osdc_put_event(event);
>   	kfree(event_work);
> @@ -2342,10 +2343,11 @@ static void do_event_work(struct work_struct *work)
>   static void handle_watch_notify(struct ceph_osd_client *osdc,
>   				struct ceph_msg *msg)
>   {
> -	void *p, *end;
> +	void *p, *end, *payload = NULL;
>   	u8 proto_ver;
>   	u64 cookie, ver, notify_id;
>   	u8 opcode;
> +	u32 payload_len = 0;
>   	struct ceph_osd_event *event;
>   	struct ceph_osd_event_work *event_work;
>
> @@ -2358,6 +2360,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   	ceph_decode_64_safe(&p, end, ver, bad);
>   	ceph_decode_64_safe(&p, end, notify_id, bad);
>
> +	if (proto_ver >= 1) {
> +		ceph_decode_32_safe(&p, end, payload_len, bad);
> +		if (end - p < payload_len)
> +			goto bad;
> +		payload = p;
> +	}
> +
>   	spin_lock(&osdc->event_lock);
>   	event = __find_event(osdc, cookie);
>   	if (event) {
> @@ -2365,8 +2374,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		get_event(event);
>   	}
>   	spin_unlock(&osdc->event_lock);
> -	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
> -	     cookie, ver, event);
> +	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
> +	     cookie, ver, event, notify_id, payload_len);
>   	if (event) {
>   		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
>   		if (!event_work) {
> @@ -2379,6 +2388,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>   		event_work->ver = ver;
>   		event_work->notify_id = notify_id;
>   		event_work->opcode = opcode;
> +		event_work->payload = payload;
> +		event_work->payload_len = payload_len;
>
>   		queue_work(osdc->notify_wq, &event_work->work);
>   	}
>

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89fe8a4..4b9ba9f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3103,7 +3103,8 @@  out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
+			 void *payload, int payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b48..eab96b5 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@  struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *);
+	void (*cb)(u64, u64, u8, void *, void *, int);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,6 +197,8 @@  struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	void *payload;
+	int payload_len;
 };
 
 struct ceph_osd_client {
@@ -369,7 +371,8 @@  extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *),
+				  void (*event_cb)(u64, u64, u8, void *, void *,
+						   int),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5003367..aa1c5c46 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2277,7 +2277,7 @@  static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *),
+			   void (*event_cb)(u64, u64, u8, void *, void *, int),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2329,7 +2329,8 @@  static void do_event_work(struct work_struct *work)
 	u8 opcode = event_work->opcode;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data);
+	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
+		  event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2342,10 +2343,11 @@  static void do_event_work(struct work_struct *work)
 static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
-	void *p, *end;
+	void *p, *end, *payload = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id;
 	u8 opcode;
+	u32 payload_len = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2358,6 +2360,13 @@  static void handle_watch_notify(struct ceph_osd_client *osdc,
 	ceph_decode_64_safe(&p, end, ver, bad);
 	ceph_decode_64_safe(&p, end, notify_id, bad);
 
+	if (proto_ver >= 1) {
+		ceph_decode_32_safe(&p, end, payload_len, bad);
+		if (end - p < payload_len)
+			goto bad;
+		payload = p;
+	}
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2365,8 +2374,8 @@  static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
-	     cookie, ver, event);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
+	     cookie, ver, event, notify_id, payload_len);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2379,6 +2388,8 @@  static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->payload = payload;
+		event_work->payload_len = payload_len;
 
 		queue_work(osdc->notify_wq, &event_work->work);
 	}