diff mbox series

[RFC,v3,08/17] fuse: {uring} Handle SQEs - register commands

Message ID 20240901-b4-fuse-uring-rfcv3-without-mmap-v3-8-9207f7391444@ddn.com (mailing list archive)
State New
Headers show
Series fuse: fuse-over-io-uring | expand

Commit Message

Bernd Schubert Sept. 1, 2024, 1:37 p.m. UTC
This adds basic support for ring SQEs (with opcode=IORING_OP_URING_CMD).
For now only FUSE_URING_REQ_FETCH is handled to register queue entries.

Signed-off-by: Bernd Schubert <bschubert@ddn.com>
---
 fs/fuse/dev.c             |   3 +
 fs/fuse/dev_uring.c       | 231 ++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h     |  60 ++++++++++++
 include/uapi/linux/fuse.h |  38 ++++++++
 4 files changed, 332 insertions(+)

Comments

Jens Axboe Sept. 4, 2024, 3:40 p.m. UTC | #1
On 9/1/24 7:37 AM, Bernd Schubert wrote:
> +/**
> + * Entry function from io_uring to handle the given passthrough command
> + * (op cocde IORING_OP_URING_CMD)
> + */
> +int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
> +{
> +	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
> +	struct fuse_dev *fud;
> +	struct fuse_conn *fc;
> +	struct fuse_ring *ring;
> +	struct fuse_ring_queue *queue;
> +	struct fuse_ring_ent *ring_ent = NULL;
> +	u32 cmd_op = cmd->cmd_op;
> +	int ret = 0;
> +
> +	ret = -ENODEV;
> +	fud = fuse_get_dev(cmd->file);
> +	if (!fud)
> +		goto out;
> +	fc = fud->fc;
> +
> +	ring = fc->ring;
> +	if (!ring)
> +		goto out;
> +
> +	queue = fud->ring_q;
> +	if (!queue)
> +		goto out;
> +
> +	ret = -EINVAL;
> +	if (queue->qid != cmd_req->qid)
> +		goto out;
> +
> +	ret = -ERANGE;
> +	if (cmd_req->tag > ring->queue_depth)
> +		goto out;
> +
> +	ring_ent = &queue->ring_ent[cmd_req->tag];
> +
> +	pr_devel("%s:%d received: cmd op %d qid %d (%p) tag %d  (%p)\n",
> +		 __func__, __LINE__, cmd_op, cmd_req->qid, queue, cmd_req->tag,
> +		 ring_ent);
> +
> +	spin_lock(&queue->lock);
> +	ret = -ENOTCONN;
> +	if (unlikely(fc->aborted || queue->stopped))
> +		goto err_unlock;
> +
> +	switch (cmd_op) {
> +	case FUSE_URING_REQ_FETCH:
> +		ret = fuse_uring_fetch(ring_ent, cmd, issue_flags);
> +		break;
> +	default:
> +		ret = -EINVAL;
> +		pr_devel("Unknown uring command %d", cmd_op);
> +		goto err_unlock;
> +	}
> +out:
> +	pr_devel("uring cmd op=%d, qid=%d tag=%d ret=%d\n", cmd_op,
> +		 cmd_req->qid, cmd_req->tag, ret);
> +
> +	if (ret < 0) {
> +		if (ring_ent != NULL) {
> +			pr_info_ratelimited("error: uring cmd op=%d, qid=%d tag=%d ret=%d\n",
> +					    cmd_op, cmd_req->qid, cmd_req->tag,
> +					    ret);
> +
> +			/* must not change the entry state, as userspace
> +			 * might have sent random data, but valid requests
> +			 * might be registered already - don't confuse those.
> +			 */
> +		}
> +		io_uring_cmd_done(cmd, ret, 0, issue_flags);
> +	}
> +
> +	return -EIOCBQUEUED;
> +
> +err_unlock:
> +	spin_unlock(&queue->lock);
> +	goto out;
> +}

Just a minor thing, but you should be able to just return an error from
here, at least for commands where you don't yet have teardown associated
with it, rather than punting through task_work for that too. Doesn't
really matter and maybe it's cleaner to just keep it the same.
diff mbox series

Patch

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index fec995818a9e..998027825481 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -2477,6 +2477,9 @@  const struct file_operations fuse_dev_operations = {
 	.fasync		= fuse_dev_fasync,
 	.unlocked_ioctl = fuse_dev_ioctl,
 	.compat_ioctl   = compat_ptr_ioctl,
+#ifdef CONFIG_FUSE_IO_URING
+	.uring_cmd	= fuse_uring_cmd,
+#endif
 };
 EXPORT_SYMBOL_GPL(fuse_dev_operations);
 
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 4dcb4972242e..46c2274193bf 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -29,6 +29,30 @@ 
 #include <linux/topology.h>
 #include <linux/io_uring/cmd.h>
 
+static int fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
+{
+	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
+		return -EIO;
+
+	ent->state = FRRS_COMMIT;
+	list_del_init(&ent->list);
+
+	return 0;
+}
+
+/* Update conn limits according to ring values */
+static void fuse_uring_conn_cfg_limits(struct fuse_ring *ring)
+{
+	struct fuse_conn *fc = ring->fc;
+
+	/*
+	 * This not ideal, as multiplication with nr_queue assumes the limit
+	 * gets reached when all queues are used, but even a single queue
+	 * might reach the limit.
+	 */
+	WRITE_ONCE(fc->max_background, ring->nr_queues * ring->max_nr_async);
+}
+
 static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 				 struct fuse_ring *ring)
 {
@@ -37,6 +61,11 @@  static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 	queue->qid = qid;
 	queue->ring = ring;
 
+	spin_lock_init(&queue->lock);
+
+	INIT_LIST_HEAD(&queue->sync_ent_avail_queue);
+	INIT_LIST_HEAD(&queue->async_ent_avail_queue);
+
 	for (tag = 0; tag < ring->queue_depth; tag++) {
 		struct fuse_ring_ent *ent = &queue->ring_ent[tag];
 
@@ -44,6 +73,8 @@  static void fuse_uring_queue_cfg(struct fuse_ring_queue *queue, int qid,
 		ent->tag = tag;
 
 		ent->state = FRRS_INIT;
+
+		INIT_LIST_HEAD(&ent->list);
 	}
 }
 
@@ -141,3 +172,203 @@  int fuse_uring_conn_cfg(struct file *file, void __user *argp)
 	kvfree(ring);
 	return res;
 }
+
+/*
+ * Put a ring request onto hold, it is no longer used for now.
+ */
+static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent,
+				 struct fuse_ring_queue *queue)
+	__must_hold(&queue->lock)
+{
+	struct fuse_ring *ring = queue->ring;
+
+	lockdep_assert_held(&queue->lock);
+
+	/* unsets all previous flags - basically resets */
+	pr_devel("%s ring=%p qid=%d tag=%d state=%d async=%d\n", __func__,
+		 ring, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
+		 ring_ent->async);
+
+	if (WARN_ON(ring_ent->state != FRRS_COMMIT)) {
+		pr_warn("%s qid=%d tag=%d state=%d async=%d\n", __func__,
+			ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
+			ring_ent->async);
+		return;
+	}
+
+	WARN_ON_ONCE(!list_empty(&ring_ent->list));
+
+	if (ring_ent->async)
+		list_add(&ring_ent->list, &queue->async_ent_avail_queue);
+	else
+		list_add(&ring_ent->list, &queue->sync_ent_avail_queue);
+
+	ring_ent->state = FRRS_WAIT;
+}
+
+/*
+ * fuse_uring_req_fetch command handling
+ */
+static int _fuse_uring_fetch(struct fuse_ring_ent *ring_ent,
+			    struct io_uring_cmd *cmd, unsigned int issue_flags)
+__must_hold(ring_ent->queue->lock)
+{
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	struct fuse_ring *ring = queue->ring;
+	int nr_ring_sqe;
+
+	lockdep_assert_held(&queue->lock);
+
+	/* register requests for foreground requests first, then backgrounds */
+	if (queue->nr_req_sync >= ring->max_nr_sync) {
+		queue->nr_req_async++;
+		ring_ent->async = 1;
+	} else
+		queue->nr_req_sync++;
+
+	fuse_uring_ent_avail(ring_ent, queue);
+
+	if (WARN_ON_ONCE(queue->nr_req_sync +
+			 queue->nr_req_async > ring->queue_depth)) {
+		/* should be caught by ring state before and queue depth
+		 * check before
+		 */
+		pr_info("qid=%d tag=%d req cnt (fg=%d async=%d exceeds depth=%zu",
+			queue->qid, ring_ent->tag, queue->nr_req_sync,
+			queue->nr_req_async, ring->queue_depth);
+		return -ERANGE;
+	}
+
+	WRITE_ONCE(ring_ent->cmd, cmd);
+
+	nr_ring_sqe = ring->queue_depth * ring->nr_queues;
+	if (atomic_inc_return(&ring->nr_sqe_init) == nr_ring_sqe) {
+		fuse_uring_conn_cfg_limits(ring);
+		ring->ready = 1;
+	}
+
+	return 0;
+}
+
+static int fuse_uring_fetch(struct fuse_ring_ent *ring_ent,
+			    struct io_uring_cmd *cmd, unsigned int issue_flags)
+	__releases(ring_ent->queue->lock)
+{
+	struct fuse_ring *ring = ring_ent->queue->ring;
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	int ret;
+
+	/* No other bit must be set here */
+	ret = -EINVAL;
+	if (ring_ent->state != FRRS_INIT)
+		goto err;
+
+	/*
+	 * FUSE_URING_REQ_FETCH is an initialization exception, needs
+	 * state override
+	 */
+	ring_ent->state = FRRS_USERSPACE;
+	ret = fuse_ring_ring_ent_unset_userspace(ring_ent);
+	if (ret != 0) {
+		pr_info_ratelimited(
+			"qid=%d tag=%d register req state %d expected %d",
+			queue->qid, ring_ent->tag, ring_ent->state,
+			FRRS_INIT);
+		goto err;
+	}
+
+	ret = _fuse_uring_fetch(ring_ent, cmd, issue_flags);
+	if (ret)
+		goto err;
+
+	/*
+	 * The ring entry is registered now and needs to be handled
+	 * for shutdown.
+	 */
+	atomic_inc(&ring->queue_refs);
+err:
+	spin_unlock(&queue->lock);
+	return ret;
+}
+
+/**
+ * Entry function from io_uring to handle the given passthrough command
+ * (op cocde IORING_OP_URING_CMD)
+ */
+int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
+{
+	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
+	struct fuse_dev *fud;
+	struct fuse_conn *fc;
+	struct fuse_ring *ring;
+	struct fuse_ring_queue *queue;
+	struct fuse_ring_ent *ring_ent = NULL;
+	u32 cmd_op = cmd->cmd_op;
+	int ret = 0;
+
+	ret = -ENODEV;
+	fud = fuse_get_dev(cmd->file);
+	if (!fud)
+		goto out;
+	fc = fud->fc;
+
+	ring = fc->ring;
+	if (!ring)
+		goto out;
+
+	queue = fud->ring_q;
+	if (!queue)
+		goto out;
+
+	ret = -EINVAL;
+	if (queue->qid != cmd_req->qid)
+		goto out;
+
+	ret = -ERANGE;
+	if (cmd_req->tag > ring->queue_depth)
+		goto out;
+
+	ring_ent = &queue->ring_ent[cmd_req->tag];
+
+	pr_devel("%s:%d received: cmd op %d qid %d (%p) tag %d  (%p)\n",
+		 __func__, __LINE__, cmd_op, cmd_req->qid, queue, cmd_req->tag,
+		 ring_ent);
+
+	spin_lock(&queue->lock);
+	ret = -ENOTCONN;
+	if (unlikely(fc->aborted || queue->stopped))
+		goto err_unlock;
+
+	switch (cmd_op) {
+	case FUSE_URING_REQ_FETCH:
+		ret = fuse_uring_fetch(ring_ent, cmd, issue_flags);
+		break;
+	default:
+		ret = -EINVAL;
+		pr_devel("Unknown uring command %d", cmd_op);
+		goto err_unlock;
+	}
+out:
+	pr_devel("uring cmd op=%d, qid=%d tag=%d ret=%d\n", cmd_op,
+		 cmd_req->qid, cmd_req->tag, ret);
+
+	if (ret < 0) {
+		if (ring_ent != NULL) {
+			pr_info_ratelimited("error: uring cmd op=%d, qid=%d tag=%d ret=%d\n",
+					    cmd_op, cmd_req->qid, cmd_req->tag,
+					    ret);
+
+			/* must not change the entry state, as userspace
+			 * might have sent random data, but valid requests
+			 * might be registered already - don't confuse those.
+			 */
+		}
+		io_uring_cmd_done(cmd, ret, 0, issue_flags);
+	}
+
+	return -EIOCBQUEUED;
+
+err_unlock:
+	spin_unlock(&queue->lock);
+	goto out;
+}
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 26266f923321..6561f4178cac 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -19,6 +19,15 @@  enum fuse_ring_req_state {
 
 	/* request is basially initialized */
 	FRRS_INIT,
+
+	/* ring entry received from userspace and it being processed */
+	FRRS_COMMIT,
+
+	/* The ring request waits for a new fuse request */
+	FRRS_WAIT,
+
+	/* request is in or on the way to user space */
+	FRRS_USERSPACE,
 };
 
 /* A fuse ring entry, part of the ring queue */
@@ -31,6 +40,13 @@  struct fuse_ring_ent {
 
 	/* state the request is currently in */
 	enum fuse_ring_req_state state;
+
+	/* is this an async or sync entry */
+	unsigned int async : 1;
+
+	struct list_head list;
+
+	struct io_uring_cmd *cmd;
 };
 
 struct fuse_ring_queue {
@@ -43,6 +59,30 @@  struct fuse_ring_queue {
 	/* queue id, typically also corresponds to the cpu core */
 	unsigned int qid;
 
+	/*
+	 * queue lock, taken when any value in the queue changes _and_ also
+	 * a ring entry state changes.
+	 */
+	spinlock_t lock;
+
+	/* available ring entries (struct fuse_ring_ent) */
+	struct list_head async_ent_avail_queue;
+	struct list_head sync_ent_avail_queue;
+
+	/*
+	 * available number of sync requests,
+	 * loosely bound to fuse foreground requests
+	 */
+	int nr_req_sync;
+
+	/*
+	 * available number of async requests
+	 * loosely bound to fuse background requests
+	 */
+	int nr_req_async;
+
+	unsigned int stopped : 1;
+
 	/* size depends on queue depth */
 	struct fuse_ring_ent ring_ent[] ____cacheline_aligned_in_smp;
 };
@@ -79,11 +119,21 @@  struct fuse_ring {
 	/* numa aware memory allocation */
 	unsigned int numa_aware : 1;
 
+	/* Is the ring read to take requests */
+	unsigned int ready : 1;
+
+	/* number of SQEs initialized */
+	atomic_t nr_sqe_init;
+
+	/* Used to release the ring on stop */
+	atomic_t queue_refs;
+
 	struct fuse_ring_queue queues[] ____cacheline_aligned_in_smp;
 };
 
 void fuse_uring_abort_end_requests(struct fuse_ring *ring);
 int fuse_uring_conn_cfg(struct file *file, void __user *argp);
+int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
 
 static inline void fuse_uring_conn_destruct(struct fuse_conn *fc)
 {
@@ -113,6 +163,11 @@  static inline bool fuse_uring_configured(struct fuse_conn *fc)
 	return false;
 }
 
+static inline bool fuse_per_core_queue(struct fuse_conn *fc)
+{
+	return fc->ring && fc->ring->per_core_queue;
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -131,6 +186,11 @@  static inline bool fuse_uring_configured(struct fuse_conn *fc)
 	return false;
 }
 
+static inline bool fuse_per_core_queue(struct fuse_conn *fc)
+{
+	return false;
+}
+
 #endif /* CONFIG_FUSE_IO_URING */
 
 #endif /* _FS_FUSE_DEV_URING_I_H */
diff --git a/include/uapi/linux/fuse.h b/include/uapi/linux/fuse.h
index 143ed3c1c7b3..586358e9992c 100644
--- a/include/uapi/linux/fuse.h
+++ b/include/uapi/linux/fuse.h
@@ -1247,6 +1247,12 @@  struct fuse_supp_groups {
 #define FUSE_RING_HEADER_BUF_SIZE 4096
 #define FUSE_RING_MIN_IN_OUT_ARG_SIZE 4096
 
+/*
+ * Request is background type. Daemon side is free to use this information
+ * to handle foreground/background CQEs with different priorities.
+ */
+#define FUSE_RING_REQ_FLAG_ASYNC (1ull << 0)
+
 /**
  * This structure mapped onto the
  */
@@ -1272,4 +1278,36 @@  struct fuse_ring_req {
 	char in_out_arg[];
 };
 
+/**
+ * sqe commands to the kernel
+ */
+enum fuse_uring_cmd {
+	FUSE_URING_REQ_INVALID = 0,
+
+	/* submit sqe to kernel to get a request */
+	FUSE_URING_REQ_FETCH = 1,
+
+	/* commit result and fetch next request */
+	FUSE_URING_REQ_COMMIT_AND_FETCH = 2,
+};
+
+/**
+ * In the 80B command area of the SQE.
+ */
+struct fuse_uring_cmd_req {
+	/* User buffer */
+	uint64_t buf_ptr;
+
+	/* length of the user buffer */
+	uint32_t buf_len;
+
+	/* queue the command is for (queue index) */
+	uint16_t qid;
+
+	/* queue entry (array index) */
+	uint16_t tag;
+
+	uint32_t flags;
+};
+
 #endif /* _LINUX_FUSE_H */