@@ -2202,6 +2202,8 @@ void fuse_abort_conn(struct fuse_conn *fc)
fc->connected = 0;
spin_unlock(&fc->bg_lock);
+ fuse_uring_set_stopped(fc);
+
fuse_set_initialized(fc);
list_for_each_entry(fud, &fc->devices, entry) {
struct fuse_pqueue *fpq = &fud->pq;
@@ -2245,6 +2247,12 @@ void fuse_abort_conn(struct fuse_conn *fc)
spin_unlock(&fc->lock);
fuse_dev_end_requests(&to_end);
+
+ /*
+ * fc->lock must not be taken to avoid conflicts with io-uring
+ * locks
+ */
+ fuse_uring_abort(fc);
} else {
spin_unlock(&fc->lock);
}
@@ -2256,6 +2264,8 @@ void fuse_wait_aborted(struct fuse_conn *fc)
/* matches implicit memory barrier in fuse_drop_waiting() */
smp_mb();
wait_event(fc->blocked_waitq, atomic_read(&fc->num_waiting) == 0);
+
+ fuse_uring_wait_stopped_queues(fc);
}
int fuse_dev_release(struct inode *inode, struct file *file)
@@ -48,6 +48,44 @@ fuse_uring_async_send_to_ring(struct io_uring_cmd *cmd,
io_uring_cmd_done(cmd, 0, 0, issue_flags);
}
+/* Abort all list queued request on the given ring queue */
+static void fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
+{
+ struct fuse_req *req;
+ LIST_HEAD(sync_list);
+ LIST_HEAD(async_list);
+
+ spin_lock(&queue->lock);
+
+ list_for_each_entry(req, &queue->sync_fuse_req_queue, list)
+ clear_bit(FR_PENDING, &req->flags);
+ list_for_each_entry(req, &queue->async_fuse_req_queue, list)
+ clear_bit(FR_PENDING, &req->flags);
+
+ list_splice_init(&queue->async_fuse_req_queue, &sync_list);
+ list_splice_init(&queue->sync_fuse_req_queue, &async_list);
+
+ spin_unlock(&queue->lock);
+
+ /* must not hold queue lock to avoid order issues with fi->lock */
+ fuse_dev_end_requests(&sync_list);
+ fuse_dev_end_requests(&async_list);
+}
+
+void fuse_uring_abort_end_requests(struct fuse_ring *ring)
+{
+ int qid;
+
+ for (qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ if (!queue->configured)
+ continue;
+
+ fuse_uring_abort_end_queue_requests(queue);
+ }
+}
+
/* Update conn limits according to ring values */
static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
{
@@ -361,6 +399,162 @@ int fuse_uring_queue_cfg(struct fuse_ring *ring,
return 0;
}
+static void fuse_uring_stop_fuse_req_end(struct fuse_ring_ent *ent)
+{
+ struct fuse_req *req = ent->fuse_req;
+
+ ent->fuse_req = NULL;
+ clear_bit(FRRS_FUSE_REQ, &ent->state);
+ clear_bit(FR_SENT, &req->flags);
+ req->out.h.error = -ECONNABORTED;
+ fuse_request_end(req);
+}
+
+/*
+ * Release a request/entry on connection shutdown
+ */
+static bool fuse_uring_try_entry_stop(struct fuse_ring_ent *ent,
+ bool need_cmd_done)
+ __must_hold(ent->queue->lock)
+{
+ struct fuse_ring_queue *queue = ent->queue;
+ bool released = false;
+
+ if (test_bit(FRRS_FREED, &ent->state))
+ goto out; /* no work left, freed before */
+
+ if (ent->state == BIT(FRRS_INIT) || test_bit(FRRS_WAIT, &ent->state) ||
+ test_bit(FRRS_USERSPACE, &ent->state)) {
+ set_bit(FRRS_FREED, &ent->state);
+
+ if (need_cmd_done) {
+ pr_devel("qid=%d tag=%d sending cmd_done\n", queue->qid,
+ ent->tag);
+
+ spin_unlock(&queue->lock);
+ io_uring_cmd_done(ent->cmd, -ENOTCONN, 0,
+ IO_URING_F_UNLOCKED);
+ spin_lock(&queue->lock);
+ }
+
+ if (ent->fuse_req)
+ fuse_uring_stop_fuse_req_end(ent);
+ released = true;
+ }
+out:
+ return released;
+}
+
+static void fuse_uring_stop_list_entries(struct list_head *head,
+ struct fuse_ring_queue *queue,
+ bool need_cmd_done)
+{
+ struct fuse_ring *ring = queue->ring;
+ struct fuse_ring_ent *ent, *next;
+ ssize_t queue_refs = SSIZE_MAX;
+
+ list_for_each_entry_safe(ent, next, head, list) {
+ if (fuse_uring_try_entry_stop(ent, need_cmd_done)) {
+ queue_refs = atomic_dec_return(&ring->queue_refs);
+ list_del_init(&ent->list);
+ }
+
+ if (WARN_ON_ONCE(queue_refs < 0))
+ pr_warn("qid=%d queue_refs=%zd", queue->qid,
+ queue_refs);
+ }
+}
+
+static void fuse_uring_stop_queue(struct fuse_ring_queue *queue)
+ __must_hold(&queue->lock)
+{
+ fuse_uring_stop_list_entries(&queue->ent_in_userspace, queue, false);
+ fuse_uring_stop_list_entries(&queue->async_ent_avail_queue, queue, true);
+ fuse_uring_stop_list_entries(&queue->sync_ent_avail_queue, queue, true);
+}
+
+/*
+ * Log state debug info
+ */
+static void fuse_uring_stop_ent_state(struct fuse_ring *ring)
+{
+ int qid, tag;
+
+ for (qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ for (tag = 0; tag < ring->queue_depth; tag++) {
+ struct fuse_ring_ent *ent = &queue->ring_ent[tag];
+
+ if (!test_bit(FRRS_FREED, &ent->state))
+ pr_info("ring=%p qid=%d tag=%d state=%lu\n",
+ ring, qid, tag, ent->state);
+ }
+ }
+ ring->stop_debug_log = 1;
+}
+
+static void fuse_uring_async_stop_queues(struct work_struct *work)
+{
+ int qid;
+ struct fuse_ring *ring =
+ container_of(work, struct fuse_ring, stop_work.work);
+
+ for (qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ if (!queue->configured)
+ continue;
+
+ spin_lock(&queue->lock);
+ fuse_uring_stop_queue(queue);
+ spin_unlock(&queue->lock);
+ }
+
+ if (atomic_read(&ring->queue_refs) > 0) {
+ if (time_after(jiffies,
+ ring->stop_time + FUSE_URING_STOP_WARN_TIMEOUT))
+ fuse_uring_stop_ent_state(ring);
+
+ pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+
+ schedule_delayed_work(&ring->stop_work,
+ FUSE_URING_STOP_INTERVAL);
+ } else {
+ wake_up_all(&ring->stop_waitq);
+ }
+}
+
+/*
+ * Stop the ring queues
+ */
+void fuse_uring_stop_queues(struct fuse_ring *ring)
+{
+ int qid;
+
+ for (qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ if (!queue->configured)
+ continue;
+
+ spin_lock(&queue->lock);
+ fuse_uring_stop_queue(queue);
+ spin_unlock(&queue->lock);
+ }
+
+ if (atomic_read(&ring->queue_refs) > 0) {
+ pr_info("ring=%p scheduling intervalled queue stop\n", ring);
+ ring->stop_time = jiffies;
+ INIT_DELAYED_WORK(&ring->stop_work,
+ fuse_uring_async_stop_queues);
+ schedule_delayed_work(&ring->stop_work,
+ FUSE_URING_STOP_INTERVAL);
+ } else {
+ wake_up_all(&ring->stop_waitq);
+ }
+}
+
/*
* Checks for errors and stores it into the request
*/
@@ -16,6 +16,9 @@
/* IORING_MAX_ENTRIES */
#define FUSE_URING_MAX_QUEUE_DEPTH 32768
+#define FUSE_URING_STOP_WARN_TIMEOUT (5 * HZ)
+#define FUSE_URING_STOP_INTERVAL (HZ/20)
+
enum fuse_ring_req_state {
/* request is basially initialized */
@@ -203,6 +206,7 @@ int fuse_uring_mmap(struct file *filp, struct vm_area_struct *vma);
int fuse_uring_queue_cfg(struct fuse_ring *ring,
struct fuse_ring_queue_config *qcfg);
void fuse_uring_ring_destruct(struct fuse_ring *ring);
+void fuse_uring_stop_queues(struct fuse_ring *ring);
int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
static inline void fuse_uring_conn_init(struct fuse_ring *ring,
@@ -275,6 +279,58 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
return fc->ring && fc->ring->per_core_queue;
}
+static inline void fuse_uring_set_stopped_queues(struct fuse_ring *ring)
+{
+ int qid;
+
+ for (qid = 0; qid < ring->nr_queues; qid++) {
+ struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
+
+ if (!queue->configured)
+ continue;
+
+ spin_lock(&queue->lock);
+ queue->stopped = 1;
+ spin_unlock(&queue->lock);
+ }
+}
+
+/*
+ * Set per queue aborted flag
+ */
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+ __must_hold(fc->lock)
+{
+ if (fc->ring == NULL)
+ return;
+
+ fc->ring->ready = false;
+
+ fuse_uring_set_stopped_queues(fc->ring);
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+ struct fuse_ring *ring = fc->ring;
+
+ if (ring == NULL)
+ return;
+
+ if (ring->configured && atomic_read(&ring->queue_refs) > 0) {
+ fuse_uring_abort_end_requests(ring);
+ fuse_uring_stop_queues(ring);
+ }
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+ struct fuse_ring *ring = fc->ring;
+
+ if (ring && ring->configured)
+ wait_event(ring->stop_waitq,
+ atomic_read(&ring->queue_refs) == 0);
+}
+
#else /* CONFIG_FUSE_IO_URING */
struct fuse_ring;
@@ -298,6 +354,17 @@ static inline bool fuse_per_core_queue(struct fuse_conn *fc)
return false;
}
+static inline void fuse_uring_set_stopped(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_abort(struct fuse_conn *fc)
+{
+}
+
+static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
+{
+}
#endif /* CONFIG_FUSE_IO_URING */
Signed-off-by: Bernd Schubert <bschubert@ddn.com> --- fs/fuse/dev.c | 10 +++ fs/fuse/dev_uring.c | 194 ++++++++++++++++++++++++++++++++++++++++++++++++++ fs/fuse/dev_uring_i.h | 67 +++++++++++++++++ 3 files changed, 271 insertions(+)