diff mbox series

[RFC,v2,13/19] fuse: {uring} Handle uring shutdown

Message ID 20240529-fuse-uring-for-6-9-rfc2-out-v1-13-d149476b1d65@ddn.com (mailing list archive)
State New
Headers show
Series fuse: fuse-over-io-uring | expand

Commit Message

Bernd Schubert May 29, 2024, 6 p.m. UTC
Signed-off-by: Bernd Schubert <bschubert@ddn.com>
---
 fs/fuse/dev.c         |  10 +++
 fs/fuse/dev_uring.c   | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h |  67 +++++++++++++++++
 3 files changed, 271 insertions(+)

Comments

Josef Bacik May 30, 2024, 8:21 p.m. UTC | #1
On Wed, May 29, 2024 at 08:00:48PM +0200, Bernd Schubert wrote:
> Signed-off-by: Bernd Schubert <bschubert@ddn.com>
> ---
>  fs/fuse/dev.c         |  10 +++
>  fs/fuse/dev_uring.c   | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  fs/fuse/dev_uring_i.h |  67 +++++++++++++++++
>  3 files changed, 271 insertions(+)
> 
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index a7d26440de39..6ffd216b27c8 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -2202,6 +2202,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
>  		fc->connected = 0;
>  		spin_unlock(&fc->bg_lock);
>  
> +		fuse_uring_set_stopped(fc);
> +
>  		fuse_set_initialized(fc);
>  		list_for_each_entry(fud, &fc->devices, entry) {
>  			struct fuse_pqueue *fpq = &fud->pq;
> @@ -2245,6 +2247,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
>  		spin_unlock(&fc->lock);
>  
>  		fuse_dev_end_requests(&to_end);
> +
> +		/*
> +		 * fc->lock must not be taken to avoid conflicts with io-uring
> +		 * locks
> +		 */
> +		fuse_uring_abort(fc);

Perhaps a 

lockdep_assert_not_held(&fc->lock)

in fuse_uring_abort() then?

>  	} else {
>  		spin_unlock(&fc->lock);
>  	}
> @@ -2256,6 +2264,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
>  	/* matches implicit memory barrier in fuse_drop_waiting() */
>  	smp_mb();
>  	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
> +
> +	fuse_uring_wait_stopped_queues(fc);
>  }
>  
>  int fuse_dev_release(struct inode *inode, struct file *file)
> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
> index 5269b3f8891e..6001ba4d6e82 100644
> --- a/fs/fuse/dev_uring.c
> +++ b/fs/fuse/dev_uring.c
> @@ -48,6 +48,44 @@ fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
>  	io_uring_cmd_done(cmd, 0, 0, issue_flags);
>  }
>  
> +/* Abort all list queued request on the given ring queue */
> +static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
> +{
> +	struct fuse_req *req;
> +	LIST_HEAD(sync_list);
> +	LIST_HEAD(async_list);
> +
> +	spin_lock(&queue->lock);
> +
> +	list_for_each_entry(req, &queue->sync_fuse_req_queue, list)
> +		clear_bit(FR_PENDING, &req->flags);
> +	list_for_each_entry(req, &queue->async_fuse_req_queue, list)
> +		clear_bit(FR_PENDING, &req->flags);
> +
> +	list_splice_init(&queue->async_fuse_req_queue, &sync_list);
> +	list_splice_init(&queue->sync_fuse_req_queue, &async_list);
> +
> +	spin_unlock(&queue->lock);
> +
> +	/* must not hold queue lock to avoid order issues with fi->lock */
> +	fuse_dev_end_requests(&sync_list);
> +	fuse_dev_end_requests(&async_list);
> +}
> +
> +void fuse_uring_abort_end_requests(struct fuse_ring *ring)
> +{
> +	int qid;
> +
> +	for (qid = 0; qid < ring->nr_queues; qid++) {
> +		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
> +
> +		if (!queue->configured)
> +			continue;
> +
> +		fuse_uring_abort_end_queue_requests(queue);
> +	}
> +}
> +
>  /* Update conn limits according to ring values */
>  static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
>  {
> @@ -361,6 +399,162 @@ int fuse_uring_queue_cfg(struct fuse_ring *ring,
>  	return 0;
>  }
>  
> +static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
> +{
> +	struct fuse_req *req = ent->fuse_req;
> +
> +	ent->fuse_req = NULL;
> +	clear_bit(FRRS_FUSE_REQ, &ent->state);
> +	clear_bit(FR_SENT, &req->flags);
> +	req->out.h.error = -ECONNABORTED;
> +	fuse_request_end(req);
> +}
> +
> +/*
> + * Release a request/entry on connection shutdown
> + */
> +static bool fuse_uring_try_entry_stop(struct fuse_ring_ent *ent,
> +				      bool need_cmd_done)
> +	__must_hold(ent->queue->lock)
> +{
> +	struct fuse_ring_queue *queue = ent->queue;
> +	bool released = false;
> +
> +	if (test_bit(FRRS_FREED, &ent->state))
> +		goto out; /* no work left, freed before */

Just return false;

> +
> +	if (ent->state == BIT(FRRS_INIT) || test_bit(FRRS_WAIT, &ent->state) ||
> +	    test_bit(FRRS_USERSPACE, &ent->state)) {

Again, apologies for just now noticing this, but this is kind of a complicated
state machine.

I think I'd rather you just use ent->state as an actual state machine, so it has
one value and one value only at any given time, which appears to be what happens
except in that we have FRRS_INIT set in addition to whatever other bit is set.

Rework this so it's less complicated, because it's quite difficult to follow in
it's current form.  Thanks,

Josef
diff mbox series

Patch

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index a7d26440de39..6ffd216b27c8 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2202,6 +2202,8 @@  void fuse_abort_conn(struct fuse_conn *fc)
 		fc->connected = 0;
 		spin_unlock(&fc->bg_lock);
 
+		fuse_uring_set_stopped(fc);
+
 		fuse_set_initialized(fc);
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
@@ -2245,6 +2247,12 @@  void fuse_abort_conn(struct fuse_conn *fc)
 		spin_unlock(&fc->lock);
 
 		fuse_dev_end_requests(&to_end);
+
+		/*
+		 * fc->lock must not be taken to avoid conflicts with io-uring
+		 * locks
+		 */
+		fuse_uring_abort(fc);
 	} else {
 		spin_unlock(&fc->lock);
 	}
@@ -2256,6 +2264,8 @@  void fuse_wait_aborted(struct fuse_conn *fc)
 	/* matches implicit memory barrier in fuse_drop_waiting() */
 	smp_mb();
 	wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
+
+	fuse_uring_wait_stopped_queues(fc);
 }
 
 int fuse_dev_release(struct inode *inode, struct file *file)
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 5269b3f8891e..6001ba4d6e82 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -48,6 +48,44 @@  fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
 	io_uring_cmd_done(cmd, 0, 0, issue_flags);
 }
 
+/* Abort all list queued request on the given ring queue */
+static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
+{
+	struct fuse_req *req;
+	LIST_HEAD(sync_list);
+	LIST_HEAD(async_list);
+
+	spin_lock(&queue->lock);
+
+	list_for_each_entry(req, &queue->sync_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+	list_for_each_entry(req, &queue->async_fuse_req_queue, list)
+		clear_bit(FR_PENDING, &req->flags);
+
+	list_splice_init(&queue->async_fuse_req_queue, &sync_list);
+	list_splice_init(&queue->sync_fuse_req_queue, &async_list);
+
+	spin_unlock(&queue->lock);
+
+	/* must not hold queue lock to avoid order issues with fi->lock */
+	fuse_dev_end_requests(&sync_list);
+	fuse_dev_end_requests(&async_list);
+}
+
+void fuse_uring_abort_end_requests(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		fuse_uring_abort_end_queue_requests(queue);
+	}
+}
+
 /* Update conn limits according to ring values */
 static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
 {
@@ -361,6 +399,162 @@  int fuse_uring_queue_cfg(struct fuse_ring *ring,
 	return 0;
 }
 
+static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
+{
+	struct fuse_req *req = ent->fuse_req;
+
+	ent->fuse_req = NULL;
+	clear_bit(FRRS_FUSE_REQ, &ent->state);
+	clear_bit(FR_SENT, &req->flags);
+	req->out.h.error = -ECONNABORTED;
+	fuse_request_end(req);
+}
+
+/*
+ * Release a request/entry on connection shutdown
+ */
+static bool fuse_uring_try_entry_stop(struct fuse_ring_ent *ent,
+				      bool need_cmd_done)
+	__must_hold(ent->queue->lock)
+{
+	struct fuse_ring_queue *queue = ent->queue;
+	bool released = false;
+
+	if (test_bit(FRRS_FREED, &ent->state))
+		goto out; /* no work left, freed before */
+
+	if (ent->state == BIT(FRRS_INIT) || test_bit(FRRS_WAIT, &ent->state) ||
+	    test_bit(FRRS_USERSPACE, &ent->state)) {
+		set_bit(FRRS_FREED, &ent->state);
+
+		if (need_cmd_done) {
+			pr_devel("qid=%d tag=%d sending cmd_done\n", queue->qid,
+				 ent->tag);
+
+			spin_unlock(&queue->lock);
+			io_uring_cmd_done(ent->cmd, -ENOTCONN, 0,
+					  IO_URING_F_UNLOCKED);
+			spin_lock(&queue->lock);
+		}
+
+		if (ent->fuse_req)
+			fuse_uring_stop_fuse_req_end(ent);
+		released = true;
+	}
+out:
+	return released;
+}
+
+static void fuse_uring_stop_list_entries(struct list_head *head,
+					 struct fuse_ring_queue *queue,
+					 bool need_cmd_done)
+{
+	struct fuse_ring *ring = queue->ring;
+	struct fuse_ring_ent *ent, *next;
+	ssize_t queue_refs = SSIZE_MAX;
+
+	list_for_each_entry_safe(ent, next, head, list) {
+		if (fuse_uring_try_entry_stop(ent, need_cmd_done)) {
+			queue_refs = atomic_dec_return(&ring->queue_refs);
+			list_del_init(&ent->list);
+		}
+
+		if (WARN_ON_ONCE(queue_refs < 0))
+			pr_warn("qid=%d queue_refs=%zd", queue->qid,
+				queue_refs);
+	}
+}
+
+static void fuse_uring_stop_queue(struct fuse_ring_queue *queue)
+	__must_hold(&queue->lock)
+{
+	fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, false);
+	fuse_uring_stop_list_entries(&queue->async_ent_avail_queue, queue, true);
+	fuse_uring_stop_list_entries(&queue->sync_ent_avail_queue, queue, true);
+}
+
+/*
+ * Log state debug info
+ */
+static void fuse_uring_stop_ent_state(struct fuse_ring *ring)
+{
+	int qid, tag;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		for (tag = 0; tag < ring->queue_depth; tag++) {
+			struct fuse_ring_ent *ent = &queue->ring_ent[tag];
+
+			if (!test_bit(FRRS_FREED, &ent->state))
+				pr_info("ring=%p qid=%d tag=%d state=%lu\n",
+					ring, qid, tag, ent->state);
+		}
+	}
+	ring->stop_debug_log = 1;
+}
+
+static void fuse_uring_async_stop_queues(struct work_struct *work)
+{
+	int qid;
+	struct fuse_ring *ring =
+		container_of(work, struct fuse_ring, stop_work.work);
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		fuse_uring_stop_queue(queue);
+		spin_unlock(&queue->lock);
+	}
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		if (time_after(jiffies,
+			       ring->stop_time + FUSE_URING_STOP_WARN_TIMEOUT))
+			fuse_uring_stop_ent_state(ring);
+
+		pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+
+		schedule_delayed_work(&ring->stop_work,
+				      FUSE_URING_STOP_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
+/*
+ * Stop the ring queues
+ */
+void fuse_uring_stop_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		fuse_uring_stop_queue(queue);
+		spin_unlock(&queue->lock);
+	}
+
+	if (atomic_read(&ring->queue_refs) > 0) {
+		pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+		ring->stop_time = jiffies;
+		INIT_DELAYED_WORK(&ring->stop_work,
+				  fuse_uring_async_stop_queues);
+		schedule_delayed_work(&ring->stop_work,
+				      FUSE_URING_STOP_INTERVAL);
+	} else {
+		wake_up_all(&ring->stop_waitq);
+	}
+}
+
 /*
  * Checks for errors and stores it into the request
  */
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index b2be67bb2fa7..e5fc84e2f3ea 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -16,6 +16,9 @@ 
 /* IORING_MAX_ENTRIES */
 #define FUSE_URING_MAX_QUEUE_DEPTH 32768
 
+#define FUSE_URING_STOP_WARN_TIMEOUT (5 * HZ)
+#define FUSE_URING_STOP_INTERVAL (HZ/20)
+
 enum fuse_ring_req_state {
 
 	/* request is basially initialized */
@@ -203,6 +206,7 @@  int fuse_uring_mmap(struct file *filp, struct vm_area_struct *vma);
 int fuse_uring_queue_cfg(struct fuse_ring *ring,
 			 struct fuse_ring_queue_config *qcfg);
 void fuse_uring_ring_destruct(struct fuse_ring *ring);
+void fuse_uring_stop_queues(struct fuse_ring *ring);
 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
 
 static inline void fuse_uring_conn_init(struct fuse_ring *ring,
@@ -275,6 +279,58 @@  static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return fc->ring && fc->ring->per_core_queue;
 }
 
+static inline void fuse_uring_set_stopped_queues(struct fuse_ring *ring)
+{
+	int qid;
+
+	for (qid = 0; qid < ring->nr_queues; qid++) {
+		struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+		if (!queue->configured)
+			continue;
+
+		spin_lock(&queue->lock);
+		queue->stopped = 1;
+		spin_unlock(&queue->lock);
+	}
+}
+
+/*
+ *  Set per queue aborted flag
+ */
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+	__must_hold(fc->lock)
+{
+	if (fc->ring == NULL)
+		return;
+
+	fc->ring->ready = false;
+
+	fuse_uring_set_stopped_queues(fc->ring);
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring == NULL)
+		return;
+
+	if (ring->configured && atomic_read(&ring->queue_refs) > 0) {
+		fuse_uring_abort_end_requests(ring);
+		fuse_uring_stop_queues(ring);
+	}
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+	struct fuse_ring *ring = fc->ring;
+
+	if (ring && ring->configured)
+		wait_event(ring->stop_waitq,
+			   atomic_read(&ring->queue_refs) == 0);
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -298,6 +354,17 @@  static inline bool fuse_per_core_queue(struct fuse_conn *fc)
 	return false;
 }
 
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+}
 
 #endif /* CONFIG_FUSE_IO_URING */