diff mbox series

[v9,10/17] fuse: Add io-uring sqe commit and fetch support

Message ID 20250107-fuse-uring-for-6-10-rfc4-v9-10-9c786f9a7a9d@ddn.com (mailing list archive)
State New
Headers show
Series fuse: fuse-over-io-uring | expand

Commit Message

Bernd Schubert Jan. 7, 2025, 12:25 a.m. UTC
This adds support for fuse request completion through ring SQEs
(FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing
the ring entry it becomes available for new fuse requests.
Handling of requests through the ring (SQE/CQE handling)
is complete now.

Fuse request data are copied through the mmaped ring buffer,
there is no support for any zero copy yet.

Signed-off-by: Bernd Schubert <bschubert@ddn.com>
---
 fs/fuse/dev_uring.c   | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/fuse/dev_uring_i.h |  12 ++
 fs/fuse/fuse_i.h      |   4 +
 3 files changed, 466 insertions(+)

Comments

Luis Henriques Jan. 7, 2025, 2:42 p.m. UTC | #1
Hi,

On Tue, Jan 07 2025, Bernd Schubert wrote:

> This adds support for fuse request completion through ring SQEs
> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing
> the ring entry it becomes available for new fuse requests.
> Handling of requests through the ring (SQE/CQE handling)
> is complete now.
>
> Fuse request data are copied through the mmaped ring buffer,
> there is no support for any zero copy yet.

Please find below a few more comments.

Also, please note that I'm trying to understand this patchset (and the
whole fuse-over-io-uring thing), so most of my comments are minor nits.
And those that are not may simply be wrong!  I'm just noting them as I
navigate through the code.

(And by the way, thanks for this work!)

> Signed-off-by: Bernd Schubert <bschubert@ddn.com>
> ---
>  fs/fuse/dev_uring.c   | 450 ++++++++++++++++++++++++++++++++++++++++++++++++++
>  fs/fuse/dev_uring_i.h |  12 ++
>  fs/fuse/fuse_i.h      |   4 +
>  3 files changed, 466 insertions(+)
>
> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
> index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644
> --- a/fs/fuse/dev_uring.c
> +++ b/fs/fuse/dev_uring.c
> @@ -26,6 +26,19 @@ bool fuse_uring_enabled(void)
>  	return enable_uring;
>  }
>  
> +static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err,
> +			       int error)
> +{
> +	struct fuse_req *req = ring_ent->fuse_req;
> +
> +	if (set_err)
> +		req->out.h.error = error;
> +
> +	clear_bit(FR_SENT, &req->flags);
> +	fuse_request_end(ring_ent->fuse_req);
> +	ring_ent->fuse_req = NULL;
> +}
> +
>  void fuse_uring_destruct(struct fuse_conn *fc)
>  {
>  	struct fuse_ring *ring = fc->ring;
> @@ -41,8 +54,11 @@ void fuse_uring_destruct(struct fuse_conn *fc)
>  			continue;
>  
>  		WARN_ON(!list_empty(&queue->ent_avail_queue));
> +		WARN_ON(!list_empty(&queue->ent_w_req_queue));
>  		WARN_ON(!list_empty(&queue->ent_commit_queue));
> +		WARN_ON(!list_empty(&queue->ent_in_userspace));
>  
> +		kfree(queue->fpq.processing);
>  		kfree(queue);
>  		ring->queues[qid] = NULL;
>  	}
> @@ -101,20 +117,34 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
>  {
>  	struct fuse_conn *fc = ring->fc;
>  	struct fuse_ring_queue *queue;
> +	struct list_head *pq;
>  
>  	queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT);
>  	if (!queue)
>  		return NULL;
> +	pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL);
> +	if (!pq) {
> +		kfree(queue);
> +		return NULL;
> +	}
> +
>  	queue->qid = qid;
>  	queue->ring = ring;
>  	spin_lock_init(&queue->lock);
>  
>  	INIT_LIST_HEAD(&queue->ent_avail_queue);
>  	INIT_LIST_HEAD(&queue->ent_commit_queue);
> +	INIT_LIST_HEAD(&queue->ent_w_req_queue);
> +	INIT_LIST_HEAD(&queue->ent_in_userspace);
> +	INIT_LIST_HEAD(&queue->fuse_req_queue);
> +
> +	queue->fpq.processing = pq;
> +	fuse_pqueue_init(&queue->fpq);
>  
>  	spin_lock(&fc->lock);
>  	if (ring->queues[qid]) {
>  		spin_unlock(&fc->lock);
> +		kfree(queue->fpq.processing);
>  		kfree(queue);
>  		return ring->queues[qid];
>  	}
> @@ -128,6 +158,214 @@ static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
>  	return queue;
>  }
>  
> +/*
> + * Checks for errors and stores it into the request
> + */
> +static int fuse_uring_out_header_has_err(struct fuse_out_header *oh,
> +					 struct fuse_req *req,
> +					 struct fuse_conn *fc)
> +{
> +	int err;
> +
> +	err = -EINVAL;
> +	if (oh->unique == 0) {
> +		/* Not supportd through io-uring yet */

typo: "supported"

> +		pr_warn_once("notify through fuse-io-uring not supported\n");
> +		goto seterr;
> +	}
> +
> +	err = -EINVAL;

Not really needed, it already has this value.

> +	if (oh->error <= -ERESTARTSYS || oh->error > 0)
> +		goto seterr;
> +
> +	if (oh->error) {
> +		err = oh->error;
> +		goto err;
> +	}
> +
> +	err = -ENOENT;
> +	if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) {
> +		pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n",
> +				    req->in.h.unique,
> +				    oh->unique & ~FUSE_INT_REQ_BIT);
> +		goto seterr;
> +	}
> +
> +	/*
> +	 * Is it an interrupt reply ID?
> +	 * XXX: Not supported through fuse-io-uring yet, it should not even
> +	 *      find the request - should not happen.
> +	 */
> +	WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
> +
> +	return 0;
> +
> +seterr:
> +	oh->error = err;
> +err:
> +	return err;
> +}
> +
> +static int fuse_uring_copy_from_ring(struct fuse_ring *ring,
> +				     struct fuse_req *req,
> +				     struct fuse_ring_ent *ent)
> +{
> +	struct fuse_copy_state cs;
> +	struct fuse_args *args = req->args;
> +	struct iov_iter iter;
> +	int err, res;

nit: no need for two variables; one of the 'int' variables could be
removed.  There are other functions with a similar pattern, but this was
the first one that caught my attention.

> +	struct fuse_uring_ent_in_out ring_in_out;
> +
> +	res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out,
> +			     sizeof(ring_in_out));
> +	if (res)
> +		return -EFAULT;
> +
> +	err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz,
> +			  &iter);
> +	if (err)
> +		return err;
> +
> +	fuse_copy_init(&cs, 0, &iter);
> +	cs.is_uring = 1;
> +	cs.req = req;
> +
> +	return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz);
> +}
> +
> + /*
> +  * Copy data from the req to the ring buffer
> +  */

nit: extra space in comment indentation.

> 
> +static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
> +				   struct fuse_ring_ent *ent)
> +{
> +	struct fuse_copy_state cs;
> +	struct fuse_args *args = req->args;
> +	struct fuse_in_arg *in_args = args->in_args;
> +	int num_args = args->in_numargs;
> +	int err, res;
> +	struct iov_iter iter;
> +	struct fuse_uring_ent_in_out ent_in_out = {
> +		.flags = 0,
> +		.commit_id = ent->commit_id,
> +	};
> +
> +	if (WARN_ON(ent_in_out.commit_id == 0))
> +		return -EINVAL;
> +
> +	err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter);
> +	if (err) {
> +		pr_info_ratelimited("fuse: Import of user buffer failed\n");
> +		return err;
> +	}
> +
> +	fuse_copy_init(&cs, 1, &iter);
> +	cs.is_uring = 1;
> +	cs.req = req;
> +
> +	if (num_args > 0) {
> +		/*
> +		 * Expectation is that the first argument is the per op header.
> +		 * Some op code have that as zero.
> +		 */
> +		if (args->in_args[0].size > 0) {
> +			res = copy_to_user(&ent->headers->op_in, in_args->value,
> +					   in_args->size);
> +			err = res > 0 ? -EFAULT : res;
> +			if (err) {
> +				pr_info_ratelimited(
> +					"Copying the header failed.\n");
> +				return err;
> +			}
> +		}
> +		in_args++;
> +		num_args--;
> +	}
> +
> +	/* copy the payload */
> +	err = fuse_copy_args(&cs, num_args, args->in_pages,
> +			     (struct fuse_arg *)in_args, 0);
> +	if (err) {
> +		pr_info_ratelimited("%s fuse_copy_args failed\n", __func__);
> +		return err;
> +	}
> +
> +	ent_in_out.payload_sz = cs.ring.copied_sz;
> +	res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out,
> +			   sizeof(ent_in_out));
> +	err = res > 0 ? -EFAULT : res;
> +	if (err)
> +		return err;

Simply return err? :-)

> +
> +	return 0;
> +}
> +
> +static int
> +fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent)
> +{
> +	struct fuse_ring_queue *queue = ring_ent->queue;
> +	struct fuse_ring *ring = queue->ring;
> +	struct fuse_req *req = ring_ent->fuse_req;
> +	int err, res;
> +
> +	err = -EIO;
> +	if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) {
> +		pr_err("qid=%d ring-req=%p invalid state %d on send\n",
> +		       queue->qid, ring_ent, ring_ent->state);
> +		err = -EIO;

'err' initialized twice.  One of these could be removed.

> +		goto err;
> +	}
> +
> +	/* copy the request */
> +	err = fuse_uring_copy_to_ring(ring, req, ring_ent);
> +	if (unlikely(err)) {
> +		pr_info_ratelimited("Copy to ring failed: %d\n", err);
> +		goto err;
> +	}
> +
> +	/* copy fuse_in_header */
> +	res = copy_to_user(&ring_ent->headers->in_out, &req->in.h,
> +			   sizeof(req->in.h));
> +	err = res > 0 ? -EFAULT : res;
> +	if (err)
> +		goto err;
> +
> +	set_bit(FR_SENT, &req->flags);
> +	return 0;
> +
> +err:
> +	fuse_uring_req_end(ring_ent, true, err);
> +	return err;
> +}
> +
> +/*
> + * Write data to the ring buffer and send the request to userspace,
> + * userspace will read it
> + * This is comparable with classical read(/dev/fuse)
> + */
> +static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent,
> +					unsigned int issue_flags)
> +{
> +	int err = 0;
> +	struct fuse_ring_queue *queue = ring_ent->queue;
> +
> +	err = fuse_uring_prepare_send(ring_ent);
> +	if (err)
> +		goto err;

Since this is the only place where this label is used, it could simply
return 'err' and the label removed.

> +
> +	spin_lock(&queue->lock);
> +	ring_ent->state = FRRS_USERSPACE;
> +	list_move(&ring_ent->list, &queue->ent_in_userspace);
> +	spin_unlock(&queue->lock);
> +
> +	io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
> +	ring_ent->cmd = NULL;
> +	return 0;
> +
> +err:
> +	return err;
> +}
> +
>  /*
>   * Make a ring entry available for fuse_req assignment
>   */
> @@ -138,6 +376,210 @@ static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent,
>  	ring_ent->state = FRRS_AVAILABLE;
>  }
>  
> +/* Used to find the request on SQE commit */
> +static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent,
> +				 struct fuse_req *req)
> +{
> +	struct fuse_ring_queue *queue = ring_ent->queue;
> +	struct fuse_pqueue *fpq = &queue->fpq;
> +	unsigned int hash;
> +
> +	/* commit_id is the unique id of the request */
> +	ring_ent->commit_id = req->in.h.unique;
> +
> +	req->ring_entry = ring_ent;
> +	hash = fuse_req_hash(ring_ent->commit_id);
> +	list_move_tail(&req->list, &fpq->processing[hash]);
> +}
> +
> +/*
> + * Assign a fuse queue entry to the given entry
> + */
> +static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent,
> +					   struct fuse_req *req)
> +{
> +	struct fuse_ring_queue *queue = ring_ent->queue;
> +
> +	lockdep_assert_held(&queue->lock);
> +
> +	if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE &&
> +			 ring_ent->state != FRRS_COMMIT)) {
> +		pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid,
> +			ring_ent->state);
> +	}
> +	list_del_init(&req->list);
> +	clear_bit(FR_PENDING, &req->flags);
> +	ring_ent->fuse_req = req;
> +	ring_ent->state = FRRS_FUSE_REQ;
> +	list_move(&ring_ent->list, &queue->ent_w_req_queue);
> +	fuse_uring_add_to_pq(ring_ent, req);
> +}
> +
> +/*
> + * Release the ring entry and fetch the next fuse request if available
> + *
> + * @return true if a new request has been fetched
> + */
> +static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent)
> +	__must_hold(&queue->lock)
> +{
> +	struct fuse_req *req;
> +	struct fuse_ring_queue *queue = ring_ent->queue;
> +	struct list_head *req_queue = &queue->fuse_req_queue;
> +
> +	lockdep_assert_held(&queue->lock);
> +
> +	/* get and assign the next entry while it is still holding the lock */
> +	req = list_first_entry_or_null(req_queue, struct fuse_req, list);
> +	if (req) {
> +		fuse_uring_add_req_to_ring_ent(ring_ent, req);
> +		return true;
> +	}
> +
> +	return false;
> +}
> +
> +/*
> + * Read data from the ring buffer, which user space has written to
> + * This is comparible with handling of classical write(/dev/fuse).

nit: "comparable"

> + * Also make the ring request available again for new fuse requests.
> + */
> +static void fuse_uring_commit(struct fuse_ring_ent *ring_ent,
> +			      unsigned int issue_flags)
> +{
> +	struct fuse_ring *ring = ring_ent->queue->ring;
> +	struct fuse_conn *fc = ring->fc;
> +	struct fuse_req *req = ring_ent->fuse_req;
> +	ssize_t err = 0;
> +	bool set_err = false;
> +
> +	err = copy_from_user(&req->out.h, &ring_ent->headers->in_out,
> +			     sizeof(req->out.h));
> +	if (err) {
> +		req->out.h.error = err;
> +		goto out;
> +	}
> +
> +	err = fuse_uring_out_header_has_err(&req->out.h, req, fc);
> +	if (err) {
> +		/* req->out.h.error already set */
> +		goto out;
> +	}
> +
> +	err = fuse_uring_copy_from_ring(ring, req, ring_ent);
> +	if (err)
> +		set_err = true;
> +
> +out:
> +	fuse_uring_req_end(ring_ent, set_err, err);
> +}
> +
> +/*
> + * Get the next fuse req and send it
> + */
> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent,
> +				     struct fuse_ring_queue *queue,
> +				     unsigned int issue_flags)
> +{
> +	int err;
> +	bool has_next;
> +
> +retry:
> +	spin_lock(&queue->lock);
> +	fuse_uring_ent_avail(ring_ent, queue);
> +	has_next = fuse_uring_ent_assign_req(ring_ent);
> +	spin_unlock(&queue->lock);
> +
> +	if (has_next) {
> +		err = fuse_uring_send_next_to_ring(ring_ent, issue_flags);
> +		if (err)
> +			goto retry;

I wonder whether this is safe.  Maybe this is *obviously* safe, but I'm
still trying to understand this patchset; so, for me, it is not :-)

Would it be worth it trying to limit the maximum number of retries?

> +	}
> +}
> +
> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent)
> +{
> +	struct fuse_ring_queue *queue = ent->queue;
> +
> +	lockdep_assert_held(&queue->lock);
> +
> +	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
> +		return -EIO;
> +
> +	ent->state = FRRS_COMMIT;
> +	list_move(&ent->list, &queue->ent_commit_queue);
> +
> +	return 0;
> +}
> +
> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */
> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
> +				   struct fuse_conn *fc)
> +{
> +	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
> +	struct fuse_ring_ent *ring_ent;
> +	int err;
> +	struct fuse_ring *ring = fc->ring;
> +	struct fuse_ring_queue *queue;
> +	uint64_t commit_id = READ_ONCE(cmd_req->commit_id);
> +	unsigned int qid = READ_ONCE(cmd_req->qid);
> +	struct fuse_pqueue *fpq;
> +	struct fuse_req *req;
> +
> +	err = -ENOTCONN;
> +	if (!ring)
> +		return err;
> +
> +	if (qid >= ring->nr_queues)
> +		return -EINVAL;
> +
> +	queue = ring->queues[qid];
> +	if (!queue)
> +		return err;
> +	fpq = &queue->fpq;
> +
> +	spin_lock(&queue->lock);
> +	/* Find a request based on the unique ID of the fuse request
> +	 * This should get revised, as it needs a hash calculation and list
> +	 * search. And full struct fuse_pqueue is needed (memory overhead).
> +	 * As well as the link from req to ring_ent.
> +	 */
> +	req = fuse_request_find(fpq, commit_id);
> +	err = -ENOENT;
> +	if (!req) {
> +		pr_info("qid=%d commit_id %llu not found\n", queue->qid,
> +			commit_id);
> +		spin_unlock(&queue->lock);
> +		return err;
> +	}
> +	list_del_init(&req->list);
> +	ring_ent = req->ring_entry;
> +	req->ring_entry = NULL;
> +
> +	err = fuse_ring_ent_set_commit(ring_ent);
> +	if (err != 0) {

I'm probably missing something, but because we removed 'req' from the list
above, aren't we leaking it if we get an error here?

Cheers,
Bernd Schubert Jan. 7, 2025, 3:59 p.m. UTC | #2
On 1/7/25 15:42, Luis Henriques wrote:
> Hi,
> 
> On Tue, Jan 07 2025, Bernd Schubert wrote:
> 
>> This adds support for fuse request completion through ring SQEs
>> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing
>> the ring entry it becomes available for new fuse requests.
>> Handling of requests through the ring (SQE/CQE handling)
>> is complete now.
>>
>> Fuse request data are copied through the mmaped ring buffer,
>> there is no support for any zero copy yet.
> 
> Please find below a few more comments.

Thanks, I fixed all comments, except of retry in fuse_uring_next_fuse_req.


[...]

> 
> Also, please note that I'm trying to understand this patchset (and the
> whole fuse-over-io-uring thing), so most of my comments are minor nits.
> And those that are not may simply be wrong!  I'm just noting them as I
> navigate through the code.
> 
> (And by the way, thanks for this work!)
> 
>> +/*
>> + * Get the next fuse req and send it
>> + */
>> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent,
>> +				     struct fuse_ring_queue *queue,
>> +				     unsigned int issue_flags)
>> +{
>> +	int err;
>> +	bool has_next;
>> +
>> +retry:
>> +	spin_lock(&queue->lock);
>> +	fuse_uring_ent_avail(ring_ent, queue);
>> +	has_next = fuse_uring_ent_assign_req(ring_ent);
>> +	spin_unlock(&queue->lock);
>> +
>> +	if (has_next) {
>> +		err = fuse_uring_send_next_to_ring(ring_ent, issue_flags);
>> +		if (err)
>> +			goto retry;
> 
> I wonder whether this is safe.  Maybe this is *obviously* safe, but I'm
> still trying to understand this patchset; so, for me, it is not :-)
> 
> Would it be worth it trying to limit the maximum number of retries?

No, we cannot limit retries. Let's do a simple example with one ring
entry and also just one queue. Multiple applications create fuse
requests. The first application fills the only available ring entry
and submits it, the others just get queued in queue->fuse_req_queue.
After that the application just waits request_wait_answer()

On commit of the first request the ring task has to take the next
request from queue->fuse_req_queue - if something fails with that
request it has to complete it and proceed to the next request.
If we would introduce a max-retries here, it would put the ring entry
on hold (FRRS_AVAILABLE) and until another application comes, it would
forever wait there. The applications waiting in request_wait_answer
would never complete either.


> 
>> +	}
>> +}
>> +
>> +static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent)
>> +{
>> +	struct fuse_ring_queue *queue = ent->queue;
>> +
>> +	lockdep_assert_held(&queue->lock);
>> +
>> +	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
>> +		return -EIO;
>> +
>> +	ent->state = FRRS_COMMIT;
>> +	list_move(&ent->list, &queue->ent_commit_queue);
>> +
>> +	return 0;
>> +}
>> +
>> +/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */
>> +static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
>> +				   struct fuse_conn *fc)
>> +{
>> +	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
>> +	struct fuse_ring_ent *ring_ent;
>> +	int err;
>> +	struct fuse_ring *ring = fc->ring;
>> +	struct fuse_ring_queue *queue;
>> +	uint64_t commit_id = READ_ONCE(cmd_req->commit_id);
>> +	unsigned int qid = READ_ONCE(cmd_req->qid);
>> +	struct fuse_pqueue *fpq;
>> +	struct fuse_req *req;
>> +
>> +	err = -ENOTCONN;
>> +	if (!ring)
>> +		return err;
>> +
>> +	if (qid >= ring->nr_queues)
>> +		return -EINVAL;
>> +
>> +	queue = ring->queues[qid];
>> +	if (!queue)
>> +		return err;
>> +	fpq = &queue->fpq;
>> +
>> +	spin_lock(&queue->lock);
>> +	/* Find a request based on the unique ID of the fuse request
>> +	 * This should get revised, as it needs a hash calculation and list
>> +	 * search. And full struct fuse_pqueue is needed (memory overhead).
>> +	 * As well as the link from req to ring_ent.
>> +	 */
>> +	req = fuse_request_find(fpq, commit_id);
>> +	err = -ENOENT;
>> +	if (!req) {
>> +		pr_info("qid=%d commit_id %llu not found\n", queue->qid,
>> +			commit_id);
>> +		spin_unlock(&queue->lock);
>> +		return err;
>> +	}
>> +	list_del_init(&req->list);
>> +	ring_ent = req->ring_entry;
>> +	req->ring_entry = NULL;
>> +
>> +	err = fuse_ring_ent_set_commit(ring_ent);
>> +	if (err != 0) {
> 
> I'm probably missing something, but because we removed 'req' from the list
> above, aren't we leaking it if we get an error here?

Hmm, yeah, that is debatable. We basically have a grave error here.
Either kernel or userspace are doing something wrong. Though probably
you are right and we should end the request with EIO.


Thanks,
Bernd
Luis Henriques Jan. 7, 2025, 4:21 p.m. UTC | #3
On Tue, Jan 07 2025, Bernd Schubert wrote:

> On 1/7/25 15:42, Luis Henriques wrote:
>> Hi,
>> On Tue, Jan 07 2025, Bernd Schubert wrote:
>> 
>>> This adds support for fuse request completion through ring SQEs
>>> (FUSE_URING_CMD_COMMIT_AND_FETCH handling). After committing
>>> the ring entry it becomes available for new fuse requests.
>>> Handling of requests through the ring (SQE/CQE handling)
>>> is complete now.
>>>
>>> Fuse request data are copied through the mmaped ring buffer,
>>> there is no support for any zero copy yet.
>> Please find below a few more comments.
>
> Thanks, I fixed all comments, except of retry in fuse_uring_next_fuse_req.

Awesome, thanks for taking those comments into account.

> [...]
>
>> Also, please note that I'm trying to understand this patchset (and the
>> whole fuse-over-io-uring thing), so most of my comments are minor nits.
>> And those that are not may simply be wrong!  I'm just noting them as I
>> navigate through the code.
>> (And by the way, thanks for this work!)
>> 
>>> +/*
>>> + * Get the next fuse req and send it
>>> + */
>>> +static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent,
>>> +				     struct fuse_ring_queue *queue,
>>> +				     unsigned int issue_flags)
>>> +{
>>> +	int err;
>>> +	bool has_next;
>>> +
>>> +retry:
>>> +	spin_lock(&queue->lock);
>>> +	fuse_uring_ent_avail(ring_ent, queue);
>>> +	has_next = fuse_uring_ent_assign_req(ring_ent);
>>> +	spin_unlock(&queue->lock);
>>> +
>>> +	if (has_next) {
>>> +		err = fuse_uring_send_next_to_ring(ring_ent, issue_flags);
>>> +		if (err)
>>> +			goto retry;
>> I wonder whether this is safe.  Maybe this is *obviously* safe, but I'm
>> still trying to understand this patchset; so, for me, it is not :-)
>> Would it be worth it trying to limit the maximum number of retries?
>
> No, we cannot limit retries. Let's do a simple example with one ring
> entry and also just one queue. Multiple applications create fuse
> requests. The first application fills the only available ring entry
> and submits it, the others just get queued in queue->fuse_req_queue.
> After that the application just waits request_wait_answer()
>
> On commit of the first request the ring task has to take the next
> request from queue->fuse_req_queue - if something fails with that
> request it has to complete it and proceed to the next request.
> If we would introduce a max-retries here, it would put the ring entry
> on hold (FRRS_AVAILABLE) and until another application comes, it would
> forever wait there. The applications waiting in request_wait_answer
> would never complete either.

Oh! OK, I see it now.  I totally misunderstood it then.  Thanks for taking
your taking explaining it.

Cheers,
diff mbox series

Patch

diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index b44ba4033615e01041313c040035b6da6af0ee17..f44e66a7ea577390da87e9ac7d118a9416898c28 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -26,6 +26,19 @@  bool fuse_uring_enabled(void)
 	return enable_uring;
 }
 
+static void fuse_uring_req_end(struct fuse_ring_ent *ring_ent, bool set_err,
+			       int error)
+{
+	struct fuse_req *req = ring_ent->fuse_req;
+
+	if (set_err)
+		req->out.h.error = error;
+
+	clear_bit(FR_SENT, &req->flags);
+	fuse_request_end(ring_ent->fuse_req);
+	ring_ent->fuse_req = NULL;
+}
+
 void fuse_uring_destruct(struct fuse_conn *fc)
 {
 	struct fuse_ring *ring = fc->ring;
@@ -41,8 +54,11 @@  void fuse_uring_destruct(struct fuse_conn *fc)
 			continue;
 
 		WARN_ON(!list_empty(&queue->ent_avail_queue));
+		WARN_ON(!list_empty(&queue->ent_w_req_queue));
 		WARN_ON(!list_empty(&queue->ent_commit_queue));
+		WARN_ON(!list_empty(&queue->ent_in_userspace));
 
+		kfree(queue->fpq.processing);
 		kfree(queue);
 		ring->queues[qid] = NULL;
 	}
@@ -101,20 +117,34 @@  static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
 {
 	struct fuse_conn *fc = ring->fc;
 	struct fuse_ring_queue *queue;
+	struct list_head *pq;
 
 	queue = kzalloc(sizeof(*queue), GFP_KERNEL_ACCOUNT);
 	if (!queue)
 		return NULL;
+	pq = kcalloc(FUSE_PQ_HASH_SIZE, sizeof(struct list_head), GFP_KERNEL);
+	if (!pq) {
+		kfree(queue);
+		return NULL;
+	}
+
 	queue->qid = qid;
 	queue->ring = ring;
 	spin_lock_init(&queue->lock);
 
 	INIT_LIST_HEAD(&queue->ent_avail_queue);
 	INIT_LIST_HEAD(&queue->ent_commit_queue);
+	INIT_LIST_HEAD(&queue->ent_w_req_queue);
+	INIT_LIST_HEAD(&queue->ent_in_userspace);
+	INIT_LIST_HEAD(&queue->fuse_req_queue);
+
+	queue->fpq.processing = pq;
+	fuse_pqueue_init(&queue->fpq);
 
 	spin_lock(&fc->lock);
 	if (ring->queues[qid]) {
 		spin_unlock(&fc->lock);
+		kfree(queue->fpq.processing);
 		kfree(queue);
 		return ring->queues[qid];
 	}
@@ -128,6 +158,214 @@  static struct fuse_ring_queue *fuse_uring_create_queue(struct fuse_ring *ring,
 	return queue;
 }
 
+/*
+ * Checks for errors and stores it into the request
+ */
+static int fuse_uring_out_header_has_err(struct fuse_out_header *oh,
+					 struct fuse_req *req,
+					 struct fuse_conn *fc)
+{
+	int err;
+
+	err = -EINVAL;
+	if (oh->unique == 0) {
+		/* Not supportd through io-uring yet */
+		pr_warn_once("notify through fuse-io-uring not supported\n");
+		goto seterr;
+	}
+
+	err = -EINVAL;
+	if (oh->error <= -ERESTARTSYS || oh->error > 0)
+		goto seterr;
+
+	if (oh->error) {
+		err = oh->error;
+		goto err;
+	}
+
+	err = -ENOENT;
+	if ((oh->unique & ~FUSE_INT_REQ_BIT) != req->in.h.unique) {
+		pr_warn_ratelimited("unique mismatch, expected: %llu got %llu\n",
+				    req->in.h.unique,
+				    oh->unique & ~FUSE_INT_REQ_BIT);
+		goto seterr;
+	}
+
+	/*
+	 * Is it an interrupt reply ID?
+	 * XXX: Not supported through fuse-io-uring yet, it should not even
+	 *      find the request - should not happen.
+	 */
+	WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
+
+	return 0;
+
+seterr:
+	oh->error = err;
+err:
+	return err;
+}
+
+static int fuse_uring_copy_from_ring(struct fuse_ring *ring,
+				     struct fuse_req *req,
+				     struct fuse_ring_ent *ent)
+{
+	struct fuse_copy_state cs;
+	struct fuse_args *args = req->args;
+	struct iov_iter iter;
+	int err, res;
+	struct fuse_uring_ent_in_out ring_in_out;
+
+	res = copy_from_user(&ring_in_out, &ent->headers->ring_ent_in_out,
+			     sizeof(ring_in_out));
+	if (res)
+		return -EFAULT;
+
+	err = import_ubuf(ITER_SOURCE, ent->payload, ring->max_payload_sz,
+			  &iter);
+	if (err)
+		return err;
+
+	fuse_copy_init(&cs, 0, &iter);
+	cs.is_uring = 1;
+	cs.req = req;
+
+	return fuse_copy_out_args(&cs, args, ring_in_out.payload_sz);
+}
+
+ /*
+  * Copy data from the req to the ring buffer
+  */
+static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
+				   struct fuse_ring_ent *ent)
+{
+	struct fuse_copy_state cs;
+	struct fuse_args *args = req->args;
+	struct fuse_in_arg *in_args = args->in_args;
+	int num_args = args->in_numargs;
+	int err, res;
+	struct iov_iter iter;
+	struct fuse_uring_ent_in_out ent_in_out = {
+		.flags = 0,
+		.commit_id = ent->commit_id,
+	};
+
+	if (WARN_ON(ent_in_out.commit_id == 0))
+		return -EINVAL;
+
+	err = import_ubuf(ITER_DEST, ent->payload, ring->max_payload_sz, &iter);
+	if (err) {
+		pr_info_ratelimited("fuse: Import of user buffer failed\n");
+		return err;
+	}
+
+	fuse_copy_init(&cs, 1, &iter);
+	cs.is_uring = 1;
+	cs.req = req;
+
+	if (num_args > 0) {
+		/*
+		 * Expectation is that the first argument is the per op header.
+		 * Some op code have that as zero.
+		 */
+		if (args->in_args[0].size > 0) {
+			res = copy_to_user(&ent->headers->op_in, in_args->value,
+					   in_args->size);
+			err = res > 0 ? -EFAULT : res;
+			if (err) {
+				pr_info_ratelimited(
+					"Copying the header failed.\n");
+				return err;
+			}
+		}
+		in_args++;
+		num_args--;
+	}
+
+	/* copy the payload */
+	err = fuse_copy_args(&cs, num_args, args->in_pages,
+			     (struct fuse_arg *)in_args, 0);
+	if (err) {
+		pr_info_ratelimited("%s fuse_copy_args failed\n", __func__);
+		return err;
+	}
+
+	ent_in_out.payload_sz = cs.ring.copied_sz;
+	res = copy_to_user(&ent->headers->ring_ent_in_out, &ent_in_out,
+			   sizeof(ent_in_out));
+	err = res > 0 ? -EFAULT : res;
+	if (err)
+		return err;
+
+	return 0;
+}
+
+static int
+fuse_uring_prepare_send(struct fuse_ring_ent *ring_ent)
+{
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	struct fuse_ring *ring = queue->ring;
+	struct fuse_req *req = ring_ent->fuse_req;
+	int err, res;
+
+	err = -EIO;
+	if (WARN_ON(ring_ent->state != FRRS_FUSE_REQ)) {
+		pr_err("qid=%d ring-req=%p invalid state %d on send\n",
+		       queue->qid, ring_ent, ring_ent->state);
+		err = -EIO;
+		goto err;
+	}
+
+	/* copy the request */
+	err = fuse_uring_copy_to_ring(ring, req, ring_ent);
+	if (unlikely(err)) {
+		pr_info_ratelimited("Copy to ring failed: %d\n", err);
+		goto err;
+	}
+
+	/* copy fuse_in_header */
+	res = copy_to_user(&ring_ent->headers->in_out, &req->in.h,
+			   sizeof(req->in.h));
+	err = res > 0 ? -EFAULT : res;
+	if (err)
+		goto err;
+
+	set_bit(FR_SENT, &req->flags);
+	return 0;
+
+err:
+	fuse_uring_req_end(ring_ent, true, err);
+	return err;
+}
+
+/*
+ * Write data to the ring buffer and send the request to userspace,
+ * userspace will read it
+ * This is comparable with classical read(/dev/fuse)
+ */
+static int fuse_uring_send_next_to_ring(struct fuse_ring_ent *ring_ent,
+					unsigned int issue_flags)
+{
+	int err = 0;
+	struct fuse_ring_queue *queue = ring_ent->queue;
+
+	err = fuse_uring_prepare_send(ring_ent);
+	if (err)
+		goto err;
+
+	spin_lock(&queue->lock);
+	ring_ent->state = FRRS_USERSPACE;
+	list_move(&ring_ent->list, &queue->ent_in_userspace);
+	spin_unlock(&queue->lock);
+
+	io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
+	ring_ent->cmd = NULL;
+	return 0;
+
+err:
+	return err;
+}
+
 /*
  * Make a ring entry available for fuse_req assignment
  */
@@ -138,6 +376,210 @@  static void fuse_uring_ent_avail(struct fuse_ring_ent *ring_ent,
 	ring_ent->state = FRRS_AVAILABLE;
 }
 
+/* Used to find the request on SQE commit */
+static void fuse_uring_add_to_pq(struct fuse_ring_ent *ring_ent,
+				 struct fuse_req *req)
+{
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	struct fuse_pqueue *fpq = &queue->fpq;
+	unsigned int hash;
+
+	/* commit_id is the unique id of the request */
+	ring_ent->commit_id = req->in.h.unique;
+
+	req->ring_entry = ring_ent;
+	hash = fuse_req_hash(ring_ent->commit_id);
+	list_move_tail(&req->list, &fpq->processing[hash]);
+}
+
+/*
+ * Assign a fuse queue entry to the given entry
+ */
+static void fuse_uring_add_req_to_ring_ent(struct fuse_ring_ent *ring_ent,
+					   struct fuse_req *req)
+{
+	struct fuse_ring_queue *queue = ring_ent->queue;
+
+	lockdep_assert_held(&queue->lock);
+
+	if (WARN_ON_ONCE(ring_ent->state != FRRS_AVAILABLE &&
+			 ring_ent->state != FRRS_COMMIT)) {
+		pr_warn("%s qid=%d state=%d\n", __func__, ring_ent->queue->qid,
+			ring_ent->state);
+	}
+	list_del_init(&req->list);
+	clear_bit(FR_PENDING, &req->flags);
+	ring_ent->fuse_req = req;
+	ring_ent->state = FRRS_FUSE_REQ;
+	list_move(&ring_ent->list, &queue->ent_w_req_queue);
+	fuse_uring_add_to_pq(ring_ent, req);
+}
+
+/*
+ * Release the ring entry and fetch the next fuse request if available
+ *
+ * @return true if a new request has been fetched
+ */
+static bool fuse_uring_ent_assign_req(struct fuse_ring_ent *ring_ent)
+	__must_hold(&queue->lock)
+{
+	struct fuse_req *req;
+	struct fuse_ring_queue *queue = ring_ent->queue;
+	struct list_head *req_queue = &queue->fuse_req_queue;
+
+	lockdep_assert_held(&queue->lock);
+
+	/* get and assign the next entry while it is still holding the lock */
+	req = list_first_entry_or_null(req_queue, struct fuse_req, list);
+	if (req) {
+		fuse_uring_add_req_to_ring_ent(ring_ent, req);
+		return true;
+	}
+
+	return false;
+}
+
+/*
+ * Read data from the ring buffer, which user space has written to
+ * This is comparible with handling of classical write(/dev/fuse).
+ * Also make the ring request available again for new fuse requests.
+ */
+static void fuse_uring_commit(struct fuse_ring_ent *ring_ent,
+			      unsigned int issue_flags)
+{
+	struct fuse_ring *ring = ring_ent->queue->ring;
+	struct fuse_conn *fc = ring->fc;
+	struct fuse_req *req = ring_ent->fuse_req;
+	ssize_t err = 0;
+	bool set_err = false;
+
+	err = copy_from_user(&req->out.h, &ring_ent->headers->in_out,
+			     sizeof(req->out.h));
+	if (err) {
+		req->out.h.error = err;
+		goto out;
+	}
+
+	err = fuse_uring_out_header_has_err(&req->out.h, req, fc);
+	if (err) {
+		/* req->out.h.error already set */
+		goto out;
+	}
+
+	err = fuse_uring_copy_from_ring(ring, req, ring_ent);
+	if (err)
+		set_err = true;
+
+out:
+	fuse_uring_req_end(ring_ent, set_err, err);
+}
+
+/*
+ * Get the next fuse req and send it
+ */
+static void fuse_uring_next_fuse_req(struct fuse_ring_ent *ring_ent,
+				     struct fuse_ring_queue *queue,
+				     unsigned int issue_flags)
+{
+	int err;
+	bool has_next;
+
+retry:
+	spin_lock(&queue->lock);
+	fuse_uring_ent_avail(ring_ent, queue);
+	has_next = fuse_uring_ent_assign_req(ring_ent);
+	spin_unlock(&queue->lock);
+
+	if (has_next) {
+		err = fuse_uring_send_next_to_ring(ring_ent, issue_flags);
+		if (err)
+			goto retry;
+	}
+}
+
+static int fuse_ring_ent_set_commit(struct fuse_ring_ent *ent)
+{
+	struct fuse_ring_queue *queue = ent->queue;
+
+	lockdep_assert_held(&queue->lock);
+
+	if (WARN_ON_ONCE(ent->state != FRRS_USERSPACE))
+		return -EIO;
+
+	ent->state = FRRS_COMMIT;
+	list_move(&ent->list, &queue->ent_commit_queue);
+
+	return 0;
+}
+
+/* FUSE_URING_CMD_COMMIT_AND_FETCH handler */
+static int fuse_uring_commit_fetch(struct io_uring_cmd *cmd, int issue_flags,
+				   struct fuse_conn *fc)
+{
+	const struct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe);
+	struct fuse_ring_ent *ring_ent;
+	int err;
+	struct fuse_ring *ring = fc->ring;
+	struct fuse_ring_queue *queue;
+	uint64_t commit_id = READ_ONCE(cmd_req->commit_id);
+	unsigned int qid = READ_ONCE(cmd_req->qid);
+	struct fuse_pqueue *fpq;
+	struct fuse_req *req;
+
+	err = -ENOTCONN;
+	if (!ring)
+		return err;
+
+	if (qid >= ring->nr_queues)
+		return -EINVAL;
+
+	queue = ring->queues[qid];
+	if (!queue)
+		return err;
+	fpq = &queue->fpq;
+
+	spin_lock(&queue->lock);
+	/* Find a request based on the unique ID of the fuse request
+	 * This should get revised, as it needs a hash calculation and list
+	 * search. And full struct fuse_pqueue is needed (memory overhead).
+	 * As well as the link from req to ring_ent.
+	 */
+	req = fuse_request_find(fpq, commit_id);
+	err = -ENOENT;
+	if (!req) {
+		pr_info("qid=%d commit_id %llu not found\n", queue->qid,
+			commit_id);
+		spin_unlock(&queue->lock);
+		return err;
+	}
+	list_del_init(&req->list);
+	ring_ent = req->ring_entry;
+	req->ring_entry = NULL;
+
+	err = fuse_ring_ent_set_commit(ring_ent);
+	if (err != 0) {
+		pr_info_ratelimited("qid=%d commit_id %llu state %d",
+				    queue->qid, commit_id, ring_ent->state);
+		spin_unlock(&queue->lock);
+		return err;
+	}
+
+	ring_ent->cmd = cmd;
+	spin_unlock(&queue->lock);
+
+	/* without the queue lock, as other locks are taken */
+	fuse_uring_commit(ring_ent, issue_flags);
+
+	/*
+	 * Fetching the next request is absolutely required as queued
+	 * fuse requests would otherwise not get processed - committing
+	 * and fetching is done in one step vs legacy fuse, which has separated
+	 * read (fetch request) and write (commit result).
+	 */
+	fuse_uring_next_fuse_req(ring_ent, queue, issue_flags);
+	return 0;
+}
+
 /*
  * fuse_uring_req_fetch command handling
  */
@@ -325,6 +767,14 @@  int __maybe_unused fuse_uring_cmd(struct io_uring_cmd *cmd,
 			return err;
 		}
 		break;
+	case FUSE_IO_URING_CMD_COMMIT_AND_FETCH:
+		err = fuse_uring_commit_fetch(cmd, issue_flags, fc);
+		if (err) {
+			pr_info_once("FUSE_IO_URING_COMMIT_AND_FETCH failed err=%d\n",
+				     err);
+			return err;
+		}
+		break;
 	default:
 		return -EINVAL;
 	}
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index 4e46dd65196d26dabc62dada33b17de9aa511c08..80f1c62d4df7f0ca77c4d5179068df6ffdbf7d85 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -20,6 +20,9 @@  enum fuse_ring_req_state {
 	/* The ring entry is waiting for new fuse requests */
 	FRRS_AVAILABLE,
 
+	/* The ring entry got assigned a fuse req */
+	FRRS_FUSE_REQ,
+
 	/* The ring entry is in or on the way to user space */
 	FRRS_USERSPACE,
 };
@@ -70,7 +73,16 @@  struct fuse_ring_queue {
 	 * entries in the process of being committed or in the process
 	 * to be sent to userspace
 	 */
+	struct list_head ent_w_req_queue;
 	struct list_head ent_commit_queue;
+
+	/* entries in userspace */
+	struct list_head ent_in_userspace;
+
+	/* fuse requests waiting for an entry slot */
+	struct list_head fuse_req_queue;
+
+	struct fuse_pqueue fpq;
 };
 
 /**
diff --git a/fs/fuse/fuse_i.h b/fs/fuse/fuse_i.h
index e545b0864dd51e82df61cc39bdf65d3d36a418dc..e71556894bc25808581424ec7bdd4afeebc81f15 100644
--- a/fs/fuse/fuse_i.h
+++ b/fs/fuse/fuse_i.h
@@ -438,6 +438,10 @@  struct fuse_req {
 
 	/** fuse_mount this request belongs to */
 	struct fuse_mount *fm;
+
+#ifdef CONFIG_FUSE_IO_URING
+	void *ring_entry;
+#endif
 };
 
 struct fuse_iqueue;