All the mail mirrored from lore.kernel.org
 help / color / mirror / Atom feed
* [PATCHv2 0/6] support watch/notify version 2
@ 2015-06-17 14:25 Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
                   ` (5 more replies)
  0 siblings, 6 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

osd_client, rbd: add support for version 2 of watch/notify

Support watch/notify 2 in osd_client and update rbd for compatibility.
Map ceph_osd_event to expected watch/notify messages and store watch
and watch error callbacks. Implement CEPH_OSD_WATCH_OP_PING and
CEPH_OSD_WATCH_OP_RECONNECT. Handle CEPH_WATCH_EVENT_DISCONNECT.
Handle CEPH_WATCH_EVENT_NOTIFY_COMPLETE and provide a data item for
clients to consume.

v2: incorporated changes from Mike, Josh, and Ilya. Changed notify-ack
data to page vector and size instead of ceph_msg_data and other fixes.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>

Douglas Fuller (3):
  osd_client, rbd: update event interface for watch/notify2
  osd_client: add support for notify payloads via notify event
  osd_client: send watch ping messages

Mike Christie (3):
  ceph/rbd: add support for watch notify payloads
  ceph/rbd: add support for header version 2 and 3
  ceph/rbd: update watch-notify ceph_osd_op

 drivers/block/rbd.c             |  54 ++++--
 include/linux/ceph/ceph_fs.h    |   6 +-
 include/linux/ceph/osd_client.h |  55 ++++--
 include/linux/ceph/rados.h      |  18 +-
 net/ceph/osd_client.c           | 387 +++++++++++++++++++++++++++++++++++-----
 5 files changed, 443 insertions(+), 77 deletions(-)

-- 
1.9.3


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCHv2 1/6] ceph/rbd: add support for watch notify payloads
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This patch adds support for proto version 1 of watch-notify,
so drivers like rbd can be sent a buffer with information like
the notify operation being performed.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
---
 drivers/block/rbd.c             |  3 ++-
 include/linux/ceph/osd_client.h |  7 +++++--
 net/ceph/osd_client.c           | 21 ++++++++++++++++-----
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89fe8a4..4b9ba9f 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3103,7 +3103,8 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
+			 void *payload, int payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 7506b48..eab96b5 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *);
+	void (*cb)(u64, u64, u8, void *, void *, int);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,6 +197,8 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	void *payload;
+	int payload_len;
 };
 
 struct ceph_osd_client {
@@ -369,7 +371,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *),
+				  void (*event_cb)(u64, u64, u8, void *, void *,
+						   int),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5003367..aa1c5c46 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2277,7 +2277,7 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *),
+			   void (*event_cb)(u64, u64, u8, void *, void *, int),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2329,7 +2329,8 @@ static void do_event_work(struct work_struct *work)
 	u8 opcode = event_work->opcode;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data);
+	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
+		  event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2342,10 +2343,11 @@ static void do_event_work(struct work_struct *work)
 static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
-	void *p, *end;
+	void *p, *end, *payload = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id;
 	u8 opcode;
+	u32 payload_len = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2358,6 +2360,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	ceph_decode_64_safe(&p, end, ver, bad);
 	ceph_decode_64_safe(&p, end, notify_id, bad);
 
+	if (proto_ver >= 1) {
+		ceph_decode_32_safe(&p, end, payload_len, bad);
+		if (end - p < payload_len)
+			goto bad;
+		payload = p;
+	}
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2365,8 +2374,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
-	     cookie, ver, event);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
+	     cookie, ver, event, notify_id, payload_len);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2379,6 +2388,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->payload = payload;
+		event_work->payload_len = payload_len;
 
 		queue_work(osdc->notify_wq, &event_work->work);
 	}
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCHv2 2/6] ceph/rbd: add support for header version 2 and 3
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This adds support watch-notify header 2 and 3 support, so we can
get a return_code from those operations.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
[djf: fixed decoding bits]
Signed-off-by: Douglas Fuller <dfuller@redhat.com>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
---
 drivers/block/rbd.c             |  5 +++--
 include/linux/ceph/osd_client.h | 10 ++++++----
 net/ceph/osd_client.c           | 25 +++++++++++++++++++------
 3 files changed, 28 insertions(+), 12 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4b9ba9f..65421eb 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3103,8 +3103,9 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data,
-			 void *payload, int payload_len)
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
+			 u64 notifier_gid, void *data, void *payload,
+			 u32 payload_len)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
 	int ret;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index eab96b5..1c4e472 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -184,7 +184,7 @@ struct ceph_osd_event {
 	u64 cookie;
 	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *, void *, int);
+	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
 	void *data;
 	struct rb_node node;
 	struct list_head osd_node;
@@ -197,8 +197,10 @@ struct ceph_osd_event_work {
         u64 ver;
         u64 notify_id;
         u8 opcode;
+	s32 return_code;
+	u64 notifier_gid;
 	void *payload;
-	int payload_len;
+	u32 payload_len;
 };
 
 struct ceph_osd_client {
@@ -371,8 +373,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 
 /* watch/notify events */
 extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *, void *,
-						   int),
+				  void (*event_cb)(u64, u64, u8, s32, u64,
+						   void *, void *, u32),
 				  void *data, struct ceph_osd_event **pevent);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index aa1c5c46..8d96a52 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -2277,7 +2277,8 @@ static void __remove_event(struct ceph_osd_event *event)
 }
 
 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *, void *, int),
+			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
+					    void *, u32),
 			   void *data, struct ceph_osd_event **pevent)
 {
 	struct ceph_osd_event *event;
@@ -2327,10 +2328,12 @@ static void do_event_work(struct work_struct *work)
 	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
+	s32 return_code = event_work->return_code;
+	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data, event_work->payload,
-		  event_work->payload_len);
+	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
+		  event->data, event_work->payload, event_work->payload_len);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
@@ -2345,9 +2348,10 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 {
 	void *p, *end, *payload = NULL;
 	u8 proto_ver;
-	u64 cookie, ver, notify_id;
+	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
+	s32 return_code = 0;
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
 
@@ -2365,8 +2369,15 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		if (end - p < payload_len)
 			goto bad;
 		payload = p;
+		p += payload_len;
 	}
 
+	if (msg->hdr.version >= 2)
+		ceph_decode_32_safe(&p, end, return_code, bad);
+
+	if (msg->hdr.version >= 3)
+		ceph_decode_64_safe(&p, end, notifier_gid, bad);
+
 	spin_lock(&osdc->event_lock);
 	event = __find_event(osdc, cookie);
 	if (event) {
@@ -2374,8 +2385,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		get_event(event);
 	}
 	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u\n",
-	     cookie, ver, event, notify_id, payload_len);
+	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
+	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
 	if (event) {
 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
 		if (!event_work) {
@@ -2388,6 +2399,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 		event_work->ver = ver;
 		event_work->notify_id = notify_id;
 		event_work->opcode = opcode;
+		event_work->return_code = return_code;
+		event_work->notifier_gid = notifier_gid;
 		event_work->payload = payload;
 		event_work->payload_len = payload_len;
 
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCHv2 3/6] ceph/rbd: update watch-notify ceph_osd_op
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

From: Mike Christie <michaelc@cs.wisc.edu>

This syncs the ceph_osd_op struct with the current version of ceph
where the watch struct has been updated to support more ops and
the notify-ack support has been broken out of the watch struct.

Ceph commits
1a82cc3926fc7bc4cfbdd2fd4dfee8660d5107a1
2288f318e1b1f6a1c42b185fc1b4c41f23995247
73720130c34424bf1fe36058ebe8da66976f40fb

It still has us use the legacy watch op for now. I will add support
later. It is mostly a prepartion patch for more advanced notify support.

Questions:

1. Should linger also be set for CEPH_OSD_WATCH_OP_RECONNECT?
2. Not sure what watch.gen is used for. Is that for our internal
use or does the osd do something with it.

Signed-off-by: Mike Christie <michaelc@cs.wisc.edu>
[djf: moved watch event definitions to ceph_fs.h]
[djf: removed changes to rbd.c for SCSI]
Signed-off-by: Douglas Fuller <dfuller@redhat.com>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
---
 drivers/block/rbd.c             | 19 +++++++-----
 include/linux/ceph/ceph_fs.h    |  6 ++--
 include/linux/ceph/osd_client.h | 23 ++++++++++----
 include/linux/ceph/rados.h      | 18 +++++++++--
 net/ceph/osd_client.c           | 66 ++++++++++++++++++++++++++++++++++++-----
 5 files changed, 107 insertions(+), 25 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 65421eb..ed170b1 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -3089,8 +3089,8 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
 	if (!obj_request->osd_req)
 		goto out;
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-					notify_id, 0, 0);
+	osd_req_op_watch_init(obj_request->osd_req, 0,
+			      CEPH_OSD_OP_NOTIFY_ACK, 0, notify_id);
 	rbd_osd_req_format_read(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3138,7 +3138,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
  */
 static struct rbd_obj_request *rbd_obj_watch_request_helper(
 						struct rbd_device *rbd_dev,
-						bool watch)
+						u8 watch_opcode)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_options *opts = osdc->client->options;
@@ -3158,10 +3158,11 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 	}
 
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, watch);
+			      watch_opcode, rbd_dev->watch_event->cookie);
 	rbd_osd_req_format_write(obj_request);
 
-	if (watch)
+	if (watch_opcode == CEPH_OSD_WATCH_OP_LEGACY_WATCH ||
+	    watch_opcode == CEPH_OSD_WATCH_OP_WATCH)
 		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -3174,7 +3175,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
 
 	ret = obj_request->result;
 	if (ret) {
-		if (watch)
+		if (watch_opcode != CEPH_OSD_WATCH_OP_UNWATCH)
 			rbd_obj_request_end(obj_request);
 		goto out;
 	}
@@ -3203,7 +3204,8 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	if (ret < 0)
 		return ret;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
@@ -3237,7 +3239,8 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 	rbd_obj_request_put(rbd_dev->watch_request);
 	rbd_dev->watch_request = NULL;
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+	obj_request = rbd_obj_watch_request_helper(rbd_dev,
+						   CEPH_OSD_WATCH_OP_UNWATCH);
 	if (!IS_ERR(obj_request))
 		rbd_obj_request_put(obj_request);
 	else
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index d7d072a..06f5ed8 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -153,11 +153,11 @@ struct ceph_dir_layout {
 
 /* watch-notify operations */
 enum {
-  WATCH_NOTIFY				= 1, /* notifying watcher */
-  WATCH_NOTIFY_COMPLETE			= 2, /* notifier notified when done */
+  CEPH_WATCH_EVENT_NOTIFY		= 1, /* notifying watcher */
+  CEPH_WATCH_EVENT_NOTIFY_COMPLETE	= 2, /* notifier notified when done */
+  CEPH_WATCH_EVENT_DISCONNECT		= 3, /* we were disconnected */
 };
 
-
 struct ceph_mon_request_header {
 	__le64 have_version;
 	__le16 session_mon;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 1c4e472..12732d3 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -106,11 +106,15 @@ struct ceph_osd_req_op {
 		struct {
 			u64 cookie;
 			u64 ver;
-			u32 prot_ver;
-			u32 timeout;
-			__u8 flag;
+			__u8 op;
+			u32 gen;
 		} watch;
 		struct {
+			u64 cookie;
+			struct ceph_osd_data request_data;
+			struct ceph_osd_data response_data;
+		} notify;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -302,7 +306,16 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
+extern void osd_req_op_notify_request_data_pagelist(struct ceph_osd_request *,
+					unsigned int which,
+					struct ceph_pagelist *pagelist);
+extern void osd_req_op_notify_response_data_pages(struct ceph_osd_request *,
+					unsigned int which,
+					struct page **pages, u64 length,
+					u32 alignment, bool pages_from_pool,
+					bool own_pages);
+extern void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+				   unsigned int which, u16 opcode, u64 cookie);
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
@@ -311,7 +324,7 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
 				 size_t size, u8 cmp_op, u8 cmp_mode);
 extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
-					u64 cookie, u64 version, int flag);
+					u8 watch_opcode, u64 cookie);
 extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				       unsigned int which,
 				       u64 expected_object_size,
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 2f822dc..bcaabf3 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -417,6 +417,16 @@ enum {
 
 #define RADOS_NOTIFY_VER	1
 
+enum {
+	CEPH_OSD_WATCH_OP_UNWATCH = 0,
+	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
+	/* note: use only ODD ids to prevent pre-giant code from
+	 * interpreting the op as UNWATCH */
+	CEPH_OSD_WATCH_OP_WATCH = 3,
+	CEPH_OSD_WATCH_OP_RECONNECT = 5,
+	CEPH_OSD_WATCH_OP_PING = 7,
+};
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -450,10 +460,14 @@ struct ceph_osd_op {
 	        } __attribute__ ((packed)) snap;
 		struct {
 			__le64 cookie;
-			__le64 ver;
-			__u8 flag;	/* 0 = unwatch, 1 = watch */
+			__le64 ver;	/* no longer used */
+			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
+			__u32 gen;	/* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
+			__le64 cookie;
+		} __attribute__ ((packed)) notify;
+		struct {
 			__le64 offset, length;
 			__le64 src_offset;
 		} __attribute__ ((packed)) clonerange;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 8d96a52..2725a68 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -18,6 +18,7 @@
 #include <linux/ceph/decode.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
+#include <linux/ceph/ceph_fs.h>
 
 #define OSD_OP_FRONT_LEN	4096
 #define OSD_OPREPLY_FRONT_LEN	512
@@ -243,6 +244,29 @@ void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
 
+void osd_req_op_notify_response_data_pages(struct ceph_osd_request *osd_req,
+			unsigned int which, struct page **pages, u64 length,
+			u32 alignment, bool pages_from_pool, bool own_pages)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, response_data);
+	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
+				pages_from_pool, own_pages);
+}
+EXPORT_SYMBOL(osd_req_op_notify_response_data_pages);
+
+void osd_req_op_notify_request_data_pagelist(
+			struct ceph_osd_request *osd_req,
+			unsigned int which, struct ceph_pagelist *pagelist)
+{
+	struct ceph_osd_data *osd_data;
+
+	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	ceph_osd_data_pagelist_init(osd_data, pagelist);
+}
+EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
+
 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
 {
 	switch (osd_data->type) {
@@ -292,6 +316,10 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 		ceph_osd_data_release(&op->cls.request_data);
 		ceph_osd_data_release(&op->cls.response_data);
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		ceph_osd_data_release(&op->notify.request_data);
+		ceph_osd_data_release(&op->notify.response_data);
+		break;
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
 		ceph_osd_data_release(&op->xattr.osd_data);
@@ -588,9 +616,18 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
-				unsigned int which, u16 opcode,
-				u64 cookie, u64 version, int flag)
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+			    u16 opcode, u64 cookie)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+
+	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+	op->watch.cookie = cookie;
+}
+EXPORT_SYMBOL(osd_req_op_notify_init);
+
+void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which,
+			   u16 opcode, u8 watch_opcode, u64 cookie)
 {
 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
 						      opcode, 0);
@@ -598,9 +635,9 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 
 	op->watch.cookie = cookie;
-	op->watch.ver = version;
-	if (opcode == CEPH_OSD_OP_WATCH && flag)
-		op->watch.flag = (u8)1;
+	op->watch.ver = 0;
+	op->watch.op = watch_opcode;
+	op->watch.gen = 0;
 }
 EXPORT_SYMBOL(osd_req_op_watch_init);
 
@@ -708,11 +745,26 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
+	case CEPH_OSD_OP_NOTIFY:
+		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+
+		osd_data = &src->notify.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE);
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		osd_data = &src->notify.response_data;
+		ceph_osdc_msg_data_add(req->r_reply, osd_data);
+		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
-		dst->watch.flag = src->watch.flag;
+		dst->watch.op = src->watch.op;
+		dst->watch.gen = cpu_to_le32(src->watch.gen);
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
                   ` (2 preceding siblings ...)
  2015-06-17 14:25 ` [PATCHv2 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  2016-02-11 17:51   ` David Disseldorp
  2015-06-17 14:25 ` [PATCHv2 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
  2015-06-17 14:25 ` [PATCHv2 6/6] osd_client: send watch ping messages Douglas Fuller
  5 siblings, 1 reply; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

Change unused ceph_osd_event structure to refer to pending watch/notify2
messages. Watch events include the separate watch and watch error callbacks
used for watch/notify2. Update rbd to use separate watch and watch error
callbacks via the new watch event.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 drivers/block/rbd.c             |  39 ++++++---
 include/linux/ceph/osd_client.h |  27 +++++--
 net/ceph/osd_client.c           | 173 +++++++++++++++++++++++++++++-----------
 3 files changed, 175 insertions(+), 64 deletions(-)

diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index ed170b1..ffe9eb6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -427,6 +427,8 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
 				       size_t count);
 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
 static void rbd_spec_put(struct rbd_spec *spec);
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
 
 static int rbd_dev_id_to_minor(int dev_id)
 {
@@ -3103,19 +3105,17 @@ out:
 	return ret;
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
-			 u64 notifier_gid, void *data, void *payload,
-			 u32 payload_len)
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, u64 notifier_id,
+                        void *data, size_t data_len)
 {
-	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
 	int ret;
 
 	if (!rbd_dev)
 		return;
 
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-		rbd_dev->header_name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+	dout("%s: \"%s\" notify_id %llu bl len %lu\n", __func__,
+	    rbd_dev->header_name, (unsigned long long)notify_id, data_len);
 
 	/*
 	 * Until adequate refresh error handling is in place, there is
@@ -3132,6 +3132,24 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, s32 return_code,
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
+static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
+{
+	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
+	int ret;
+
+	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
+		err, cookie);
+	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
+	         rbd_dev->header_name, err, cookie);
+
+	/* reset watch */
+	rbd_dev_header_unwatch_sync(rbd_dev);
+	ret = rbd_dev_header_watch_sync(rbd_dev);
+	rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+}
+
 /*
  * Send a (un)watch request and wait for the ack.  Return a request
  * with a ref held on success or error.
@@ -3199,13 +3217,14 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 	rbd_assert(!rbd_dev->watch_event);
 	rbd_assert(!rbd_dev->watch_request);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
+	ret = ceph_osdc_create_watch_event(osdc, rbd_watch_cb,
+	                                  rbd_watch_error_cb,
+	                                  rbd_dev, &rbd_dev->watch_event);
 	if (ret < 0)
 		return ret;
 
 	obj_request = rbd_obj_watch_request_helper(rbd_dev,
-						CEPH_OSD_WATCH_OP_LEGACY_WATCH);
+						CEPH_OSD_WATCH_OP_WATCH);
 	if (IS_ERR(obj_request)) {
 		ceph_osdc_cancel_event(rbd_dev->watch_event);
 		rbd_dev->watch_event = NULL;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 12732d3..c673d75 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -108,6 +108,7 @@ struct ceph_osd_req_op {
 			u64 ver;
 			__u8 op;
 			u32 gen;
+			struct ceph_osd_data request_data;
 		} watch;
 		struct {
 			u64 cookie;
@@ -186,13 +187,21 @@ struct ceph_request_redirect {
 
 struct ceph_osd_event {
 	u64 cookie;
-	int one_shot;
 	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, s32, u64, void *, void *, u32);
+	struct ceph_osd_request *osd_req;
 	void *data;
 	struct rb_node node;
-	struct list_head osd_node;
 	struct kref kref;
+	union {
+		struct {
+			void (*watchcb)(void *, u64, u64, u64, void *, size_t);
+			void (*errcb)(void *, u64, int);
+		} watch;
+		struct {
+			struct ceph_msg_data *notify_data;
+			struct completion complete;
+		} notify;
+	};
 };
 
 struct ceph_osd_event_work {
@@ -385,10 +394,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct page **pages, int nr_pages);
 
 /* watch/notify events */
-extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, s32, u64,
-						   void *, void *, u32),
-				  void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_watch_event(struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent);
+extern int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                         struct ceph_osd_event **pevent);
+extern void ceph_osdc_wait_event(struct ceph_osd_client *osdc,
+				struct ceph_osd_event *event);
 extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
 extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2725a68..af05c0f 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -37,7 +37,13 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req);
 static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
-			   struct ceph_osd_request *req);
+                           struct ceph_osd_request *req);
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                           u64 cookie);
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data);
 
 /*
  * Implement client access to distributed object storage cluster.
@@ -616,10 +622,12 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_notify_init(struct ceph_osd_request *osd_req, unsigned int which,
+void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
+                            unsigned int which,
 			    u16 opcode, u64 cookie)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode, 0);
+	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
 	op->watch.cookie = cookie;
@@ -2274,7 +2282,7 @@ void ceph_osdc_put_event(struct ceph_osd_event *event)
 EXPORT_SYMBOL(ceph_osdc_put_event);
 
 static void __insert_event(struct ceph_osd_client *osdc,
-			     struct ceph_osd_event *new)
+                           struct ceph_osd_event *new)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2296,7 +2304,7 @@ static void __insert_event(struct ceph_osd_client *osdc,
 }
 
 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
-					        u64 cookie)
+                                           u64 cookie)
 {
 	struct rb_node **p = &osdc->event_tree.rb_node;
 	struct rb_node *parent = NULL;
@@ -2328,27 +2336,60 @@ static void __remove_event(struct ceph_osd_event *event)
 	}
 }
 
-int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, s32, u64, void *,
-					    void *, u32),
-			   void *data, struct ceph_osd_event **pevent)
+static struct ceph_osd_event *__alloc_event(struct ceph_osd_client *osdc,
+                                            void *data)
 {
 	struct ceph_osd_event *event;
 
 	event = kmalloc(sizeof(*event), GFP_NOIO);
 	if (!event)
-		return -ENOMEM;
+		return NULL;
 
 	dout("create_event %p\n", event);
-	event->cb = event_cb;
-	event->one_shot = 0;
 	event->data = data;
 	event->osdc = osdc;
-	INIT_LIST_HEAD(&event->osd_node);
+	event->osd_req = NULL;
 	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
 
+	return event;
+}
+
+int ceph_osdc_create_watch_event(struct ceph_osd_client *osdc,
+                         void (*watchcb)(void *, u64, u64, u64, void *, size_t),
+                         void (*errcb)(void *, u64, int),
+                         void *data, struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, data);
+	if (!event)
+		return -ENOMEM;
+
+	event->watch.watchcb = watchcb;
+	event->watch.errcb = errcb;
+
+	spin_lock(&osdc->event_lock);
+	event->cookie = ++osdc->event_count;
+	__insert_event(osdc, event);
+	spin_unlock(&osdc->event_lock);
+	*pevent = event;
+	return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_watch_event);
+
+int ceph_osdc_create_notify_event(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_event **pevent)
+{
+	struct ceph_osd_event *event;
+	
+	event = __alloc_event(osdc, NULL);
+	if (!event)
+		return -ENOMEM;
+
+	init_completion(&event->notify.complete);
+
 	spin_lock(&osdc->event_lock);
 	event->cookie = ++osdc->event_count;
 	__insert_event(osdc, event);
@@ -2357,7 +2398,14 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
 	*pevent = event;
 	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_create_event);
+EXPORT_SYMBOL(ceph_osdc_create_notify_event);
+
+void ceph_osdc_wait_event(struct ceph_osd_client *osdc,
+                          struct ceph_osd_event *event)
+{
+	wait_for_completion(&event->notify.complete);
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
 
 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
 {
@@ -2377,20 +2425,78 @@ static void do_event_work(struct work_struct *work)
 	struct ceph_osd_event_work *event_work =
 		container_of(work, struct ceph_osd_event_work, work);
 	struct ceph_osd_event *event = event_work->event;
-	u64 ver = event_work->ver;
 	u64 notify_id = event_work->notify_id;
 	u8 opcode = event_work->opcode;
 	s32 return_code = event_work->return_code;
 	u64 notifier_gid = event_work->notifier_gid;
 
 	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, return_code, notifier_gid,
-		  event->data, event_work->payload, event_work->payload_len);
+	if (opcode == CEPH_WATCH_EVENT_NOTIFY)
+		event->watch.watchcb(event->data, notify_id,
+		                     event->cookie, notifier_gid,
+		                     event_work->payload,
+		                     event_work->payload_len);
+	else if (opcode == CEPH_WATCH_EVENT_DISCONNECT && event->watch.errcb)
+		event->watch.errcb(event->data, event->cookie, return_code);
 	dout("do_event_work completed %p\n", event);
 	ceph_osdc_put_event(event);
 	kfree(event_work);
 }
 
+static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
+                       u64 cookie, u64 notify_id, u32 payload_len,
+                       void *payload, s32 return_code, u64 notifier_gid,
+                       struct ceph_msg_data *data)
+{
+	struct ceph_osd_event *event;
+	struct ceph_osd_event_work *event_work;
+
+	spin_lock(&osdc->event_lock);
+	event = __find_event(osdc, cookie);
+	if (event)
+		get_event(event);
+	spin_unlock(&osdc->event_lock);
+
+	dout("handle_watch_notify cookie %lld event %p notify id %llu payload "
+	     "len %u return code %d notifier gid %llu\n",
+             cookie, event, notify_id, payload_len, return_code, notifier_gid);
+	switch(opcode) {
+		case CEPH_WATCH_EVENT_NOTIFY:
+		case CEPH_WATCH_EVENT_DISCONNECT:
+			if (event) {
+				event_work = kmalloc(sizeof(*event_work),
+				                     GFP_NOIO);
+				if (!event_work) {
+					pr_err("couldn't allocate event_work\n");
+					ceph_osdc_put_event(event);
+					return;
+				}
+				INIT_WORK(&event_work->work, do_event_work);
+				event_work->event = event;
+				event_work->notify_id = notify_id;
+				event_work->opcode = opcode;
+				event_work->return_code = return_code;
+				event_work->notifier_gid = notifier_gid;
+				event_work->payload = payload;
+				event_work->payload_len = payload_len;
+
+				queue_work(osdc->notify_wq, &event_work->work);
+			}
+			break;
+		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
+			if (event) {
+				event->notify.notify_data = data;
+				if (event->osd_req) {
+					ceph_osdc_cancel_request(event->osd_req);
+					event->osd_req = NULL;
+				}
+				complete_all(&event->notify.complete);
+			}
+			break;
+		default:
+			BUG();
+	}
+}
 
 /*
  * Process osd watch notifications
@@ -2399,13 +2505,12 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
 	void *p, *end, *payload = NULL;
+	struct ceph_msg_data *data = NULL;
 	u8 proto_ver;
 	u64 cookie, ver, notify_id, notifier_gid = 0;
 	u8 opcode;
 	u32 payload_len = 0;
 	s32 return_code = 0;
-	struct ceph_osd_event *event;
-	struct ceph_osd_event_work *event_work;
 
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -2430,34 +2535,8 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 3)
 		ceph_decode_64_safe(&p, end, notifier_gid, bad);
 
-	spin_lock(&osdc->event_lock);
-	event = __find_event(osdc, cookie);
-	if (event) {
-		BUG_ON(event->one_shot);
-		get_event(event);
-	}
-	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p notify id %llu payload len %u return code %d notifier gid %llu\n",
-	     cookie, ver, event, notify_id, payload_len, return_code, notifier_gid);
-	if (event) {
-		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
-		if (!event_work) {
-			pr_err("couldn't allocate event_work\n");
-			ceph_osdc_put_event(event);
-			return;
-		}
-		INIT_WORK(&event_work->work, do_event_work);
-		event_work->event = event;
-		event_work->ver = ver;
-		event_work->notify_id = notify_id;
-		event_work->opcode = opcode;
-		event_work->return_code = return_code;
-		event_work->notifier_gid = notifier_gid;
-		event_work->payload = payload;
-		event_work->payload_len = payload_len;
-
-		queue_work(osdc->notify_wq, &event_work->work);
-	}
+	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
+		return_code, notifier_gid, data);
 
 	return;
 
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCHv2 5/6] osd_client: add support for notify payloads via notify event
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
                   ` (3 preceding siblings ...)
  2015-06-17 14:25 ` [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  2015-06-17 16:11   ` Mike Christie
  2015-06-23  0:20   ` Mike Christie
  2015-06-17 14:25 ` [PATCHv2 6/6] osd_client: send watch ping messages Douglas Fuller
  5 siblings, 2 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

Add support in notify events for receiving data from notify_ack. Notify
events are optional; data is discarded if no event is found.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 include/linux/ceph/osd_client.h |   3 +-
 net/ceph/osd_client.c           | 135 ++++++++++++++++++++++++++++++++++++++--
 2 files changed, 131 insertions(+), 7 deletions(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index c673d75..31e308b 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -198,7 +198,8 @@ struct ceph_osd_event {
 			void (*errcb)(void *, u64, int);
 		} watch;
 		struct {
-			struct ceph_msg_data *notify_data;
+			struct page **notify_data;
+			size_t notify_data_len;
 			struct completion complete;
 		} notify;
 	};
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index af05c0f..99bac91 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -43,7 +43,7 @@ static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
 static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
                        u64 cookie, u64 notify_id, u32 payload_len,
                        void *payload, s32 return_code, u64 notifier_gid,
-                       struct ceph_msg_data *data);
+                       struct page **data, size_t data_len);
 
 /*
  * Implement client access to distributed object storage cluster.
@@ -268,7 +268,7 @@ void osd_req_op_notify_request_data_pagelist(
 {
 	struct ceph_osd_data *osd_data;
 
-	osd_data = osd_req_op_data(osd_req, which, notify, request_data);
+	osd_data = osd_req_op_data(osd_req, which, watch, request_data);
 	ceph_osd_data_pagelist_init(osd_data, pagelist);
 }
 EXPORT_SYMBOL(osd_req_op_notify_request_data_pagelist);
@@ -630,6 +630,13 @@ void osd_req_op_notify_init(struct ceph_osd_request *osd_req,
 	struct ceph_osd_event *notify_event;
 
 	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY);
+
+	notify_event = __find_event(osd_req->r_osdc, cookie);
+	/* Only linger if the caller is interested in the notify acks. */
+	if (notify_event) {
+		ceph_osdc_set_request_linger(osd_req->r_osdc, osd_req);
+		notify_event->osd_req = osd_req;
+	}
 	op->watch.cookie = cookie;
 }
 EXPORT_SYMBOL(osd_req_op_notify_init);
@@ -768,6 +775,14 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
 		ceph_osdc_msg_data_add(req->r_reply, osd_data);
 		break;
 	case CEPH_OSD_OP_NOTIFY_ACK:
+		osd_data = &src->watch.request_data;
+		data_length = ceph_osd_data_length(osd_data);
+		if (data_length) {
+			ceph_osdc_msg_data_add(req->r_request, osd_data);
+			src->payload_len += data_length;
+			request_data_len += data_length;
+		}
+		/* fallthrough */
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
 		dst->watch.ver = cpu_to_le64(src->watch.ver);
@@ -1693,6 +1708,84 @@ static void handle_osds_timeout(struct work_struct *work)
 			      round_jiffies_relative(delay));
 }
 
+static void __ping_callback(struct ceph_osd_request *osd_req,
+               struct ceph_msg *msg)
+{
+	struct ceph_osd_req_op * info = &osd_req->r_ops[0];
+	struct ceph_osd_request *target = osd_req->r_priv;
+	u64 result = osd_req->r_reply_op_result[0];
+
+	dout("got pong result %llu\n", result);
+
+	if (target->r_ops[0].watch.gen != info->watch.gen) {
+		dout("ignoring pong result out of phase (%u != %u)\n",
+		     target->r_ops[0].watch.gen, info->watch.gen);
+		return;
+	}
+	if (result != 0)
+		__do_event(osd_req->r_osdc, CEPH_WATCH_EVENT_DISCONNECT,
+		           info->watch.cookie, 0, 0, NULL, result, 0, NULL, 0);
+
+	ceph_osdc_put_request(target);
+	ceph_osdc_put_request(osd_req);
+}
+
+static void __send_linger_ping(struct ceph_osd_request *req)
+{
+	struct ceph_osd_request *ping_req;
+	int ret;
+
+	dout("ping for watch %llu\n", req->r_tid);
+
+	ping_req = ceph_osdc_alloc_request(req->r_osdc, NULL, 1, false,
+	                                   GFP_NOIO);
+	if (!ping_req) {
+		WARN(true, "failed to allocate memory to ping, skipping");
+		return;
+	}
+
+	ping_req->r_base_oloc.pool = req->r_base_oloc.pool;
+	ping_req->r_flags = CEPH_OSD_OP_READ;
+	ceph_oid_copy(&ping_req->r_base_oid, &req->r_base_oid);
+	ping_req->r_callback = __ping_callback;
+	osd_req_op_watch_init(ping_req, 0, CEPH_OSD_OP_WATCH,
+	                      CEPH_OSD_WATCH_OP_PING,
+	                      req->r_ops[0].watch.cookie);
+	ping_req->r_ops[0].watch.gen = req->r_ops[0].watch.gen;
+	ping_req->r_priv = req;
+	ceph_osdc_build_request(ping_req, 0, NULL, cpu_to_le64(CEPH_NOSNAP),
+	                        NULL);
+	ceph_osdc_get_request(req);
+	ret = ceph_osdc_start_request(req->r_osdc, ping_req, false);
+	if (ret) {
+		ceph_osdc_put_request(ping_req);
+		ceph_osdc_cancel_request(ping_req);
+	}
+}
+
+static void handle_linger_ping(struct work_struct *work)
+{
+	struct ceph_osd_client *osdc;
+
+	struct ceph_osd_request *req, *nreq;
+
+	osdc = container_of(work, struct ceph_osd_client,
+	                    linger_ping_work.work);
+
+	dout("scanning for watches to ping about\n");
+
+	list_for_each_entry_safe(req, nreq, &osdc->req_linger, r_linger_item) {
+		int i;
+		for (i = 0; i < req->r_num_ops; i++) {
+			if (req->r_ops[i].op == CEPH_OSD_OP_WATCH)
+				__send_linger_ping(req);
+		}
+	}
+	schedule_delayed_work(&osdc->linger_ping_work,
+	                      osdc->client->options->osd_keepalive_timeout);
+}
+
+>>>>>>> a53afe3... changed data return type to page vector
 static int ceph_oloc_decode(void **p, void *end,
 			    struct ceph_object_locator *oloc)
 {
@@ -2446,7 +2539,7 @@ static void do_event_work(struct work_struct *work)
 static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
                        u64 cookie, u64 notify_id, u32 payload_len,
                        void *payload, s32 return_code, u64 notifier_gid,
-                       struct ceph_msg_data *data)
+                       struct page **data, size_t data_len)
 {
 	struct ceph_osd_event *event;
 	struct ceph_osd_event_work *event_work;
@@ -2486,6 +2579,7 @@ static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
 		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
 			if (event) {
 				event->notify.notify_data = data;
+				event->notify.notify_data_len = data_len;
 				if (event->osd_req) {
 					ceph_osdc_cancel_request(event->osd_req);
 					event->osd_req = NULL;
@@ -2532,11 +2626,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 	if (msg->hdr.version >= 2)
 		ceph_decode_32_safe(&p, end, return_code, bad);
 
-	if (msg->hdr.version >= 3)
+	if (msg->hdr.version >= 3) {
 		ceph_decode_64_safe(&p, end, notifier_gid, bad);
+		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
+	}
 
 	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
-		return_code, notifier_gid, data);
+		return_code, notifier_gid, data->pages, data->length);
 
 	return;
 
@@ -3055,12 +3151,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 	struct ceph_osd *osd = con->private;
 	int type = le16_to_cpu(hdr->type);
 	int front = le32_to_cpu(hdr->front_len);
+	struct ceph_msg *m;
+	size_t len = con->in_hdr.data_len;
 
 	*skip = 0;
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
-		return ceph_msg_new(type, front, GFP_NOFS, false);
+		m = ceph_msg_new(type, front, GFP_NOFS, false);
+		if (!m)
+			goto out;
+
+		if (len > 0) {
+			struct page **pages;
+			struct ceph_osd_data osd_data;
+			pages = ceph_alloc_page_vector(
+				      calc_pages_for(0, len), GFP_NOFS);
+			if (!pages)
+				goto out2;
+			osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
+			osd_data.pages = pages;
+			osd_data.length = len;
+			osd_data.alignment = 0;
+			osd_data.pages_from_pool = false;
+			osd_data.own_pages = false;
+			ceph_osdc_msg_data_add(m, &osd_data);
+		}
+		return m;
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);
 	default:
@@ -3069,6 +3186,12 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 		*skip = 1;
 		return NULL;
 	}
+out2:
+	ceph_msg_put(m);
+out:
+	pr_err("couldn't allocate reply, will skip\n");
+	*skip = 1;
+	return NULL;
 }
 
 /*
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* [PATCHv2 6/6] osd_client: send watch ping messages
  2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
                   ` (4 preceding siblings ...)
  2015-06-17 14:25 ` [PATCHv2 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
@ 2015-06-17 14:25 ` Douglas Fuller
  5 siblings, 0 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 14:25 UTC (permalink / raw
  To: ceph-devel

Send CEPH_OSD_WATCH_OP_PING every osd_keepalive_timeout for each watch
event registered. When errors are detected, look up the watch event and
send it CEPH_WATCH_EVENT_DISCONNECTED.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
---
 include/linux/ceph/osd_client.h |  1 +
 net/ceph/osd_client.c           | 19 ++++++++++++++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 31e308b..7fbbd22 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -238,6 +238,7 @@ struct ceph_osd_client {
 	int                    num_requests;
 	struct delayed_work    timeout_work;
 	struct delayed_work    osds_timeout_work;
+	struct delayed_work    linger_ping_work;
 #ifdef CONFIG_DEBUG_FS
 	struct dentry 	       *debugfs_file;
 #endif
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 99bac91..9a5fcae 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -110,6 +110,7 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 	osd_data->own_pages = own_pages;
 }
 
+
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 			struct ceph_pagelist *pagelist)
 {
@@ -1363,6 +1364,13 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
 	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	WARN_ON(!req->r_linger);
 
+	++req->r_ops[0].watch.gen;
+
+	if (list_empty(&osdc->req_linger))
+		schedule_delayed_work(&osdc->linger_ping_work,
+			       round_jiffies_relative(
+			         osdc->client->options->osd_keepalive_timeout));
+
 	ceph_osdc_get_request(req);
 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
 	if (req->r_osd)
@@ -1383,6 +1391,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 
 	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	list_del_init(&req->r_linger_item);
+	if (++req->r_ops[0].watch.gen > 1 &&
+		req->r_ops[0].watch.op == CEPH_OSD_WATCH_OP_WATCH) {
+		struct timespec mtime = CURRENT_TIME;
+		req->r_ops[0].watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
+		ceph_osdc_build_request(req, 0, req->r_snapc, req->r_snapid, &mtime);
+	}
 
 	if (req->r_osd) {
 		list_del_init(&req->r_linger_osd_item);
@@ -1391,6 +1405,9 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 			req->r_osd = NULL;
 	}
 	ceph_osdc_put_request(req);
+
+	if (list_empty(&osdc->req_linger))
+		cancel_delayed_work(&osdc->linger_ping_work);
 }
 
 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
@@ -1785,7 +1802,6 @@ static void handle_linger_ping(struct work_struct *work)
 	                      osdc->client->options->osd_keepalive_timeout);
 }
 
->>>>>>> a53afe3... changed data return type to page vector
 static int ceph_oloc_decode(void **p, void *end,
 			    struct ceph_object_locator *oloc)
 {
@@ -2873,6 +2889,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	osdc->num_requests = 0;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+	INIT_DELAYED_WORK(&osdc->linger_ping_work, handle_linger_ping);
 	spin_lock_init(&osdc->event_lock);
 	osdc->event_tree = RB_ROOT;
 	osdc->event_count = 0;
-- 
1.9.3


^ permalink raw reply related	[flat|nested] 11+ messages in thread

* Re: [PATCHv2 5/6] osd_client: add support for notify payloads via notify event
  2015-06-17 14:25 ` [PATCHv2 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
@ 2015-06-17 16:11   ` Mike Christie
  2015-06-17 17:07     ` Douglas Fuller
  2015-06-23  0:20   ` Mike Christie
  1 sibling, 1 reply; 11+ messages in thread
From: Mike Christie @ 2015-06-17 16:11 UTC (permalink / raw
  To: Douglas Fuller, ceph-devel

On 06/17/2015 09:25 AM, Douglas Fuller wrote:
> @@ -2486,6 +2579,7 @@ static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
>  		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
>  			if (event) {
>  				event->notify.notify_data = data;
> +				event->notify.notify_data_len = data_len;
>  				if (event->osd_req) {
>  					ceph_osdc_cancel_request(event->osd_req);
>  					event->osd_req = NULL;
> @@ -2532,11 +2626,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>  	if (msg->hdr.version >= 2)
>  		ceph_decode_32_safe(&p, end, return_code, bad);
>  
> -	if (msg->hdr.version >= 3)
> +	if (msg->hdr.version >= 3) {
>  		ceph_decode_64_safe(&p, end, notifier_gid, bad);
> +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
> +	}
>  
>  	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
> -		return_code, notifier_gid, data);
> +		return_code, notifier_gid, data->pages, data->length);
>  
>  	return;
>  
> @@ -3055,12 +3151,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  	struct ceph_osd *osd = con->private;
>  	int type = le16_to_cpu(hdr->type);
>  	int front = le32_to_cpu(hdr->front_len);
> +	struct ceph_msg *m;
> +	size_t len = con->in_hdr.data_len;
>  
>  	*skip = 0;
>  	switch (type) {
>  	case CEPH_MSG_OSD_MAP:
>  	case CEPH_MSG_WATCH_NOTIFY:
> -		return ceph_msg_new(type, front, GFP_NOFS, false);
> +		m = ceph_msg_new(type, front, GFP_NOFS, false);
> +		if (!m)
> +			goto out;
> +
> +		if (len > 0) {
> +			struct page **pages;
> +			struct ceph_osd_data osd_data;
> +			pages = ceph_alloc_page_vector(
> +				      calc_pages_for(0, len), GFP_NOFS);
> +			if (!pages)
> +				goto out2;
> +			osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
> +			osd_data.pages = pages;
> +			osd_data.length = len;
> +			osd_data.alignment = 0;
> +			osd_data.pages_from_pool = false;
> +			osd_data.own_pages = false;
> +			ceph_osdc_msg_data_add(m, &osd_data);
> +		}
> +		return m;
>  	case CEPH_MSG_OSD_OPREPLY:
>  		return get_reply(con, hdr, skip);
>  	default:
> @@ -3069,6 +3186,12 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  		*skip = 1;
>  		return NULL;
>  	}
> +out2:
> +	ceph_msg_put(m);
> +out:
> +	pr_err("couldn't allocate reply, will skip\n");
> +	*skip = 1;
> +	return NULL;
>  }
>  
>  /*
> 

When is the data freed?

Does it get freed when we do dispatch->ceph_msg_put [is this the last
put so we do] -> ceph_msg_release?

If so, then I think when __do_event() does complete_all() the thread
that did the ceph_osdc_wait_event could end up accessing freed memory.


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCHv2 5/6] osd_client: add support for notify payloads via notify event
  2015-06-17 16:11   ` Mike Christie
@ 2015-06-17 17:07     ` Douglas Fuller
  0 siblings, 0 replies; 11+ messages in thread
From: Douglas Fuller @ 2015-06-17 17:07 UTC (permalink / raw
  To: Mike Christie; +Cc: ceph-devel


> On Jun 17, 2015, at 12:11 PM, Mike Christie <mchristi@redhat.com> wrote:
> 
> On 06/17/2015 09:25 AM, Douglas Fuller wrote:
>> @@ -2486,6 +2579,7 @@ static void __do_event(struct ceph_osd_client *osdc, u8 opcode,
>> 		case CEPH_WATCH_EVENT_NOTIFY_COMPLETE:
>> 			if (event) {
>> 				event->notify.notify_data = data;
>> +				event->notify.notify_data_len = data_len;
>> 				if (event->osd_req) {
>> 					ceph_osdc_cancel_request(event->osd_req);
>> 					event->osd_req = NULL;
>> @@ -2532,11 +2626,13 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
>> 	if (msg->hdr.version >= 2)
>> 		ceph_decode_32_safe(&p, end, return_code, bad);
>> 
>> -	if (msg->hdr.version >= 3)
>> +	if (msg->hdr.version >= 3) {
>> 		ceph_decode_64_safe(&p, end, notifier_gid, bad);
>> +		data = list_first_entry(&msg->data, struct ceph_msg_data, links);
>> +	}
>> 
>> 	__do_event(osdc, opcode, cookie, notify_id, payload_len, payload,
>> -		return_code, notifier_gid, data);
>> +		return_code, notifier_gid, data->pages, data->length);
>> 
>> 	return;
>> 
>> @@ -3055,12 +3151,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>> 	struct ceph_osd *osd = con->private;
>> 	int type = le16_to_cpu(hdr->type);
>> 	int front = le32_to_cpu(hdr->front_len);
>> +	struct ceph_msg *m;
>> +	size_t len = con->in_hdr.data_len;
>> 
>> 	*skip = 0;
>> 	switch (type) {
>> 	case CEPH_MSG_OSD_MAP:
>> 	case CEPH_MSG_WATCH_NOTIFY:
>> -		return ceph_msg_new(type, front, GFP_NOFS, false);
>> +		m = ceph_msg_new(type, front, GFP_NOFS, false);
>> +		if (!m)
>> +			goto out;
>> +
>> +		if (len > 0) {
>> +			struct page **pages;
>> +			struct ceph_osd_data osd_data;
>> +			pages = ceph_alloc_page_vector(
>> +				      calc_pages_for(0, len), GFP_NOFS);
>> +			if (!pages)
>> +				goto out2;
>> +			osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;
>> +			osd_data.pages = pages;
>> +			osd_data.length = len;
>> +			osd_data.alignment = 0;
>> +			osd_data.pages_from_pool = false;
>> +			osd_data.own_pages = false;
>> +			ceph_osdc_msg_data_add(m, &osd_data);
>> +		}
>> +		return m;
>> 	case CEPH_MSG_OSD_OPREPLY:
>> 		return get_reply(con, hdr, skip);
>> 	default:
>> @@ -3069,6 +3186,12 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>> 		*skip = 1;
>> 		return NULL;
>> 	}
>> +out2:
>> +	ceph_msg_put(m);
>> +out:
>> +	pr_err("couldn't allocate reply, will skip\n");
>> +	*skip = 1;
>> +	return NULL;
>> }
>> 
>> /*
>> 
> 
> When is the data freed?

The data itself is not, the user has to free it.

> Does it get freed when we do dispatch->ceph_msg_put [is this the last
> put so we do] -> ceph_msg_release?

No. For some reason, ceph_msg_release only frees pagelists and does not touch page vectors.

Generally, users calling osd ops expecting replies as page vectors have to free them when they’re finished. I’m not sure why this is the default behavior, but it seems to be. For this case, it might make more sense to free the data in __release_event.

> If so, then I think when __do_event() does complete_all() the thread
> that did the ceph_osdc_wait_event could end up accessing freed memory.

--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCHv2 5/6] osd_client: add support for notify payloads via notify event
  2015-06-17 14:25 ` [PATCHv2 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
  2015-06-17 16:11   ` Mike Christie
@ 2015-06-23  0:20   ` Mike Christie
  1 sibling, 0 replies; 11+ messages in thread
From: Mike Christie @ 2015-06-23  0:20 UTC (permalink / raw
  To: Douglas Fuller, ceph-devel

On 06/17/2015 09:25 AM, Douglas Fuller wrote:
>  
> @@ -3055,12 +3151,33 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
>  	struct ceph_osd *osd = con->private;
>  	int type = le16_to_cpu(hdr->type);
>  	int front = le32_to_cpu(hdr->front_len);
> +	struct ceph_msg *m;
> +	size_t len = con->in_hdr.data_len;
>  
>  	*skip = 0;
>  	switch (type) {
>  	case CEPH_MSG_OSD_MAP:
>  	case CEPH_MSG_WATCH_NOTIFY:
> -		return ceph_msg_new(type, front, GFP_NOFS, false);
> +		m = ceph_msg_new(type, front, GFP_NOFS, false);
> +		if (!m)
> +			goto out;
> +
> +		if (len > 0) {
> +			struct page **pages;
> +			struct ceph_osd_data osd_data;
> +			pages = ceph_alloc_page_vector(
> +				      calc_pages_for(0, len), GFP_NOFS);
> +			if (!pages)
> +				goto out2;
> +			osd_data.type = CEPH_OSD_DATA_TYPE_PAGES;

Sorry for the late comment. ceph_alloc_page_vector uses ERR_PTR, so the
above check should be

if (IS_ERR(pages))
	goto out2;
--
To unsubscribe from this list: send the line "unsubscribe ceph-devel" in

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2
  2015-06-17 14:25 ` [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
@ 2016-02-11 17:51   ` David Disseldorp
  0 siblings, 0 replies; 11+ messages in thread
From: David Disseldorp @ 2016-02-11 17:51 UTC (permalink / raw
  To: Douglas Fuller; +Cc: ceph-devel, Mike Christie

Hi Douglas,

Do you still plan on getting these changes upstream? I have one comment
on this patch:

On Wed, 17 Jun 2015 07:25:54 -0700, Douglas Fuller wrote:

> +static void rbd_watch_error_cb(void *arg, u64 cookie, int err)
> +{
> +	struct rbd_device *rbd_dev = (struct rbd_device *)arg;
> +	int ret;
> +
> +	dout("%s: watch error %d on cookie %llu\n", rbd_dev->header_name,
> +		err, cookie);
> +	rbd_warn(rbd_dev, "%s: watch error %d on cookie %llu\n",
> +	         rbd_dev->header_name, err, cookie);
> +
> +	/* reset watch */
> +	rbd_dev_header_unwatch_sync(rbd_dev);
> +	ret = rbd_dev_header_watch_sync(rbd_dev);
> +	rbd_dev_refresh(rbd_dev);
> +	if (ret)
> +		rbd_warn(rbd_dev, "refresh failed: %d", ret);
> +}

The watch reset here could potentially race with the
rbd_dev_header_unwatch_sync() performed on unmap. To handle this,
notify_wq should be stopped prior to unwatch in do_rbd_remove().

Cheers, David

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2016-02-11 17:51 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2015-06-17 14:25 [PATCHv2 0/6] support watch/notify version 2 Douglas Fuller
2015-06-17 14:25 ` [PATCHv2 1/6] ceph/rbd: add support for watch notify payloads Douglas Fuller
2015-06-17 14:25 ` [PATCHv2 2/6] ceph/rbd: add support for header version 2 and 3 Douglas Fuller
2015-06-17 14:25 ` [PATCHv2 3/6] ceph/rbd: update watch-notify ceph_osd_op Douglas Fuller
2015-06-17 14:25 ` [PATCHv2 4/6] osd_client, rbd: update event interface for watch/notify2 Douglas Fuller
2016-02-11 17:51   ` David Disseldorp
2015-06-17 14:25 ` [PATCHv2 5/6] osd_client: add support for notify payloads via notify event Douglas Fuller
2015-06-17 16:11   ` Mike Christie
2015-06-17 17:07     ` Douglas Fuller
2015-06-23  0:20   ` Mike Christie
2015-06-17 14:25 ` [PATCHv2 6/6] osd_client: send watch ping messages Douglas Fuller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.