diff mbox series

[RFC,v2,14/19] fuse: {uring} Allow to queue to the ring

Message ID 20240529-fuse-uring-for-6-9-rfc2-out-v1-14-d149476b1d65@ddn.com (mailing list archive)
State New
Headers show
Series fuse: fuse-over-io-uring | expand

Commit Message

Bernd Schubert May 29, 2024, 6 p.m. UTC
This enables enqueuing requests through fuse uring queues.

For initial simplicity requests are always allocated the normal way
then added to ring queues lists and only then copied to ring queue
entries. Later on the allocation and adding the requests to a list
can be avoided, by directly using a ring entry. This introduces
some code complexity and is therefore not done for now.

Signed-off-by: Bernd Schubert <bschubert@ddn.com>
---
 fs/fuse/dev.c         | 80 +++++++++++++++++++++++++++++++++++++++-----
 fs/fuse/dev_uring.c   | 92 ++++++++++++++++++++++++++++++++++++++++++---------
 fs/fuse/dev_uring_i.h | 17 ++++++++++
 3 files changed, 165 insertions(+), 24 deletions(-)

Comments

Josef Bacik May 30, 2024, 8:32 p.m. UTC | #1
On Wed, May 29, 2024 at 08:00:49PM +0200, Bernd Schubert wrote:
> This enables enqueuing requests through fuse uring queues.
> 
> For initial simplicity requests are always allocated the normal way
> then added to ring queues lists and only then copied to ring queue
> entries. Later on the allocation and adding the requests to a list
> can be avoided, by directly using a ring entry. This introduces
> some code complexity and is therefore not done for now.
> 
> Signed-off-by: Bernd Schubert <bschubert@ddn.com>
> ---
>  fs/fuse/dev.c         | 80 +++++++++++++++++++++++++++++++++++++++-----
>  fs/fuse/dev_uring.c   | 92 ++++++++++++++++++++++++++++++++++++++++++---------
>  fs/fuse/dev_uring_i.h | 17 ++++++++++
>  3 files changed, 165 insertions(+), 24 deletions(-)
> 
> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
> index 6ffd216b27c8..c7fd3849a105 100644
> --- a/fs/fuse/dev.c
> +++ b/fs/fuse/dev.c
> @@ -218,13 +218,29 @@ const struct fuse_iqueue_ops fuse_dev_fiq_ops = {
>  };
>  EXPORT_SYMBOL_GPL(fuse_dev_fiq_ops);
>  
> -static void queue_request_and_unlock(struct fuse_iqueue *fiq,
> -				     struct fuse_req *req)
> +
> +static void queue_request_and_unlock(struct fuse_conn *fc,
> +				     struct fuse_req *req, bool allow_uring)
>  __releases(fiq->lock)
>  {
> +	struct fuse_iqueue *fiq = &fc->iq;
> +
>  	req->in.h.len = sizeof(struct fuse_in_header) +
>  		fuse_len_args(req->args->in_numargs,
>  			      (struct fuse_arg *) req->args->in_args);
> +
> +	if (allow_uring && fuse_uring_ready(fc)) {
> +		int res;
> +
> +		/* this lock is not needed at all for ring req handling */
> +		spin_unlock(&fiq->lock);
> +		res = fuse_uring_queue_fuse_req(fc, req);
> +		if (!res)
> +			return;
> +
> +		/* fallthrough, handled through /dev/fuse read/write */

We need the lock here because we're modifying &fiq->pending, this will end in
tears.

> +	}
> +
>  	list_add_tail(&req->list, &fiq->pending);
>  	fiq->ops->wake_pending_and_unlock(fiq);
>  }
> @@ -261,7 +277,7 @@ static void flush_bg_queue(struct fuse_conn *fc)
>  		fc->active_background++;
>  		spin_lock(&fiq->lock);
>  		req->in.h.unique = fuse_get_unique(fiq);
> -		queue_request_and_unlock(fiq, req);
> +		queue_request_and_unlock(fc, req, true);
>  	}
>  }
>  
> @@ -405,7 +421,8 @@ static void request_wait_answer(struct fuse_req *req)
>  
>  static void __fuse_request_send(struct fuse_req *req)
>  {
> -	struct fuse_iqueue *fiq = &req->fm->fc->iq;
> +	struct fuse_conn *fc = req->fm->fc;
> +	struct fuse_iqueue *fiq = &fc->iq;
>  
>  	BUG_ON(test_bit(FR_BACKGROUND, &req->flags));
>  	spin_lock(&fiq->lock);
> @@ -417,7 +434,7 @@ static void __fuse_request_send(struct fuse_req *req)
>  		/* acquire extra reference, since request is still needed
>  		   after fuse_request_end() */
>  		__fuse_get_request(req);
> -		queue_request_and_unlock(fiq, req);
> +		queue_request_and_unlock(fc, req, true);
>  
>  		request_wait_answer(req);
>  		/* Pairs with smp_wmb() in fuse_request_end() */
> @@ -487,6 +504,10 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
>  	if (args->force) {
>  		atomic_inc(&fc->num_waiting);
>  		req = fuse_request_alloc(fm, GFP_KERNEL | __GFP_NOFAIL);
> +		if (unlikely(!req)) {
> +			ret = -ENOTCONN;
> +			goto err;
> +		}

This is extraneous, and not possible since we're doing __GFP_NOFAIL.

>  
>  		if (!args->nocreds)
>  			fuse_force_creds(req);
> @@ -514,16 +535,55 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
>  	}
>  	fuse_put_request(req);
>  
> +err:
>  	return ret;
>  }
>  
> -static bool fuse_request_queue_background(struct fuse_req *req)
> +static bool fuse_request_queue_background_uring(struct fuse_conn *fc,
> +					       struct fuse_req *req)
> +{
> +	struct fuse_iqueue *fiq = &fc->iq;
> +	int err;
> +
> +	req->in.h.unique = fuse_get_unique(fiq);
> +	req->in.h.len = sizeof(struct fuse_in_header) +
> +		fuse_len_args(req->args->in_numargs,
> +			      (struct fuse_arg *) req->args->in_args);
> +
> +	err = fuse_uring_queue_fuse_req(fc, req);
> +	if (!err) {

I'd rather

if (err)
	return false;

Then the rest of this code.

Also generally speaking I think you're correct, below isn't needed because the
queues themselves have their own limits, so I think just delete this bit.

> +		/* XXX remove and lets the users of that use per queue values -
> +		 * avoid the shared spin lock...
> +		 * Is this needed at all?
> +		 */
> +		spin_lock(&fc->bg_lock);
> +		fc->num_background++;
> +		fc->active_background++;
> +
> +
> +		/* XXX block when per ring queues get occupied */
> +		if (fc->num_background == fc->max_background)
> +			fc->blocked = 1;
> +		spin_unlock(&fc->bg_lock);
> +	}
> +
> +	return err ? false : true;
> +}
> +
> +/*
> + * @return true if queued
> + */
> +static int fuse_request_queue_background(struct fuse_req *req)
>  {
>  	struct fuse_mount *fm = req->fm;
>  	struct fuse_conn *fc = fm->fc;
>  	bool queued = false;
>  
>  	WARN_ON(!test_bit(FR_BACKGROUND, &req->flags));
> +
> +	if (fuse_uring_ready(fc))
> +		return fuse_request_queue_background_uring(fc, req);
> +
>  	if (!test_bit(FR_WAITING, &req->flags)) {
>  		__set_bit(FR_WAITING, &req->flags);
>  		atomic_inc(&fc->num_waiting);
> @@ -576,7 +636,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
>  				    struct fuse_args *args, u64 unique)
>  {
>  	struct fuse_req *req;
> -	struct fuse_iqueue *fiq = &fm->fc->iq;
> +	struct fuse_conn *fc = fm->fc;
> +	struct fuse_iqueue *fiq = &fc->iq;
>  	int err = 0;
>  
>  	req = fuse_get_req(fm, false);
> @@ -590,7 +651,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
>  
>  	spin_lock(&fiq->lock);
>  	if (fiq->connected) {
> -		queue_request_and_unlock(fiq, req);
> +		/* uring for notify not supported yet */
> +		queue_request_and_unlock(fc, req, false);
>  	} else {
>  		err = -ENODEV;
>  		spin_unlock(&fiq->lock);
> @@ -2205,6 +2267,7 @@ void fuse_abort_conn(struct fuse_conn *fc)
>  		fuse_uring_set_stopped(fc);
>  
>  		fuse_set_initialized(fc);
> +

Extraneous newline.

>  		list_for_each_entry(fud, &fc->devices, entry) {
>  			struct fuse_pqueue *fpq = &fud->pq;
>  
> @@ -2478,6 +2541,7 @@ static long fuse_uring_ioctl(struct file *file, __u32 __user *argp)
>  		if (res != 0)
>  			return res;
>  		break;
> +

Extraneous newline.

>  		case FUSE_URING_IOCTL_CMD_QUEUE_CFG:
>  			fud->uring_dev = 1;
>  			res = fuse_uring_queue_cfg(fc->ring, &cfg.qconf);
> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
> index 6001ba4d6e82..fe80e66150c3 100644
> --- a/fs/fuse/dev_uring.c
> +++ b/fs/fuse/dev_uring.c
> @@ -32,8 +32,7 @@
>  #include <linux/io_uring/cmd.h>
>  
>  static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
> -					    bool set_err, int error,
> -					    unsigned int issue_flags);
> +					    bool set_err, int error);
>  
>  static void fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
>  {
> @@ -683,8 +682,7 @@ static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
>   * userspace will read it
>   * This is comparable with classical read(/dev/fuse)
>   */
> -static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
> -				    unsigned int issue_flags, bool send_in_task)
> +static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent)
>  {
>  	struct fuse_ring *ring = ring_ent->queue->ring;
>  	struct fuse_ring_req *rreq = ring_ent->rreq;
> @@ -721,20 +719,17 @@ static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
>  	rreq->in = req->in.h;
>  	set_bit(FR_SENT, &req->flags);
>  
> -	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu issue_flags=%u\n",
> +	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu\n",
>  		 __func__, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
> -		 rreq->in.opcode, rreq->in.unique, issue_flags);
> +		 rreq->in.opcode, rreq->in.unique);
>  
> -	if (send_in_task)
> -		io_uring_cmd_complete_in_task(ring_ent->cmd,
> -					      fuse_uring_async_send_to_ring);
> -	else
> -		io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
> +	io_uring_cmd_complete_in_task(ring_ent->cmd,
> +				      fuse_uring_async_send_to_ring);
>  
>  	return;
>  
>  err:
> -	fuse_uring_req_end_and_get_next(ring_ent, true, err, issue_flags);
> +	fuse_uring_req_end_and_get_next(ring_ent, true, err);
>  }
>  
>  /*
> @@ -811,8 +806,7 @@ static bool fuse_uring_ent_release_and_fetch(struct fuse_ring_ent *ring_ent)
>   * has lock/unlock/lock to avoid holding the lock on calling fuse_request_end
>   */
>  static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
> -					    bool set_err, int error,
> -					    unsigned int issue_flags)
> +					    bool set_err, int error)
>  {
>  	struct fuse_req *req = ring_ent->fuse_req;
>  	int has_next;
> @@ -828,7 +822,7 @@ static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
>  	has_next = fuse_uring_ent_release_and_fetch(ring_ent);
>  	if (has_next) {
>  		/* called within uring context - use provided flags */
> -		fuse_uring_send_to_ring(ring_ent, issue_flags, false);
> +		fuse_uring_send_to_ring(ring_ent);
>  	}
>  }
>  
> @@ -863,7 +857,7 @@ static void fuse_uring_commit_and_release(struct fuse_dev *fud,
>  out:
>  	pr_devel("%s:%d ret=%zd op=%d req-ret=%d\n", __func__, __LINE__, err,
>  		 req->args->opcode, req->out.h.error);
> -	fuse_uring_req_end_and_get_next(ring_ent, set_err, err, issue_flags);
> +	fuse_uring_req_end_and_get_next(ring_ent, set_err, err);
>  }
>  
>  /*
> @@ -1101,3 +1095,69 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
>  	goto out;
>  }
>  
> +int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
> +{
> +	struct fuse_ring *ring = fc->ring;
> +	struct fuse_ring_queue *queue;
> +	int qid = 0;
> +	struct fuse_ring_ent *ring_ent = NULL;
> +	int res;
> +	bool async = test_bit(FR_BACKGROUND, &req->flags);
> +	struct list_head *req_queue, *ent_queue;
> +
> +	if (ring->per_core_queue) {
> +		/*
> +		 * async requests are best handled on another core, the current
> +		 * core can do application/page handling, while the async request
> +		 * is handled on another core in userspace.
> +		 * For sync request the application has to wait - no processing, so
> +		 * the request should continue on the current core and avoid context
> +		 * switches.
> +		 * XXX This should be on the same numa node and not busy - is there
> +		 * a scheduler function available  that could make this decision?
> +		 * It should also not persistently switch between cores - makes
> +		 * it hard for the scheduler.
> +		 */
> +		qid = task_cpu(current);
> +
> +		if (unlikely(qid >= ring->nr_queues)) {
> +			WARN_ONCE(1,
> +				  "Core number (%u) exceeds nr ueues (%zu)\n",
> +				  qid, ring->nr_queues);
> +			qid = 0;
> +		}
> +	}
> +
> +	queue = fuse_uring_get_queue(ring, qid);
> +	req_queue = async ? &queue->async_fuse_req_queue :
> +			    &queue->sync_fuse_req_queue;
> +	ent_queue = async ? &queue->async_ent_avail_queue :
> +			    &queue->sync_ent_avail_queue;
> +
> +	spin_lock(&queue->lock);
> +
> +	if (unlikely(queue->stopped)) {
> +		res = -ENOTCONN;
> +		goto err_unlock;

This is the only place we use err_unlock, just do

if (unlikely(queue->stopped)) {
	spin_unlock(&queue->lock);
	return -ENOTCONN;
}

and then you can get rid of res.  Thanks,

Josef
Bernd Schubert May 30, 2024, 9:26 p.m. UTC | #2
On 5/30/24 22:32, Josef Bacik wrote:
> On Wed, May 29, 2024 at 08:00:49PM +0200, Bernd Schubert wrote:
>> This enables enqueuing requests through fuse uring queues.
>>
>> For initial simplicity requests are always allocated the normal way
>> then added to ring queues lists and only then copied to ring queue
>> entries. Later on the allocation and adding the requests to a list
>> can be avoided, by directly using a ring entry. This introduces
>> some code complexity and is therefore not done for now.
>>
>> Signed-off-by: Bernd Schubert <bschubert@ddn.com>
>> ---
>>  fs/fuse/dev.c         | 80 +++++++++++++++++++++++++++++++++++++++-----
>>  fs/fuse/dev_uring.c   | 92 ++++++++++++++++++++++++++++++++++++++++++---------
>>  fs/fuse/dev_uring_i.h | 17 ++++++++++
>>  3 files changed, 165 insertions(+), 24 deletions(-)
>>
>> diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
>> index 6ffd216b27c8..c7fd3849a105 100644
>> --- a/fs/fuse/dev.c
>> +++ b/fs/fuse/dev.c
>> @@ -218,13 +218,29 @@ const struct fuse_iqueue_ops fuse_dev_fiq_ops = {
>>  };
>>  EXPORT_SYMBOL_GPL(fuse_dev_fiq_ops);
>>  
>> -static void queue_request_and_unlock(struct fuse_iqueue *fiq,
>> -				     struct fuse_req *req)
>> +
>> +static void queue_request_and_unlock(struct fuse_conn *fc,
>> +				     struct fuse_req *req, bool allow_uring)
>>  __releases(fiq->lock)
>>  {
>> +	struct fuse_iqueue *fiq = &fc->iq;
>> +
>>  	req->in.h.len = sizeof(struct fuse_in_header) +
>>  		fuse_len_args(req->args->in_numargs,
>>  			      (struct fuse_arg *) req->args->in_args);
>> +
>> +	if (allow_uring && fuse_uring_ready(fc)) {
>> +		int res;
>> +
>> +		/* this lock is not needed at all for ring req handling */
>> +		spin_unlock(&fiq->lock);
>> +		res = fuse_uring_queue_fuse_req(fc, req);
>> +		if (!res)
>> +			return;
>> +
>> +		/* fallthrough, handled through /dev/fuse read/write */
> 
> We need the lock here because we're modifying &fiq->pending, this will end in
> tears.

Ouch right, sorry that I had missed that. I will actually remove the
fallthrough altogether, not needed anymore.

> 
>> +	}
>> +
>>  	list_add_tail(&req->list, &fiq->pending);
>>  	fiq->ops->wake_pending_and_unlock(fiq);
>>  }
>> @@ -261,7 +277,7 @@ static void flush_bg_queue(struct fuse_conn *fc)
>>  		fc->active_background++;
>>  		spin_lock(&fiq->lock);
>>  		req->in.h.unique = fuse_get_unique(fiq);
>> -		queue_request_and_unlock(fiq, req);
>> +		queue_request_and_unlock(fc, req, true);
>>  	}
>>  }
>>  
>> @@ -405,7 +421,8 @@ static void request_wait_answer(struct fuse_req *req)
>>  
>>  static void __fuse_request_send(struct fuse_req *req)
>>  {
>> -	struct fuse_iqueue *fiq = &req->fm->fc->iq;
>> +	struct fuse_conn *fc = req->fm->fc;
>> +	struct fuse_iqueue *fiq = &fc->iq;
>>  
>>  	BUG_ON(test_bit(FR_BACKGROUND, &req->flags));
>>  	spin_lock(&fiq->lock);
>> @@ -417,7 +434,7 @@ static void __fuse_request_send(struct fuse_req *req)
>>  		/* acquire extra reference, since request is still needed
>>  		   after fuse_request_end() */
>>  		__fuse_get_request(req);
>> -		queue_request_and_unlock(fiq, req);
>> +		queue_request_and_unlock(fc, req, true);
>>  
>>  		request_wait_answer(req);
>>  		/* Pairs with smp_wmb() in fuse_request_end() */
>> @@ -487,6 +504,10 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
>>  	if (args->force) {
>>  		atomic_inc(&fc->num_waiting);
>>  		req = fuse_request_alloc(fm, GFP_KERNEL | __GFP_NOFAIL);
>> +		if (unlikely(!req)) {
>> +			ret = -ENOTCONN;
>> +			goto err;
>> +		}
> 
> This is extraneous, and not possible since we're doing __GFP_NOFAIL.
> 
>>  
>>  		if (!args->nocreds)
>>  			fuse_force_creds(req);
>> @@ -514,16 +535,55 @@ ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
>>  	}
>>  	fuse_put_request(req);
>>  
>> +err:
>>  	return ret;
>>  }
>>  
>> -static bool fuse_request_queue_background(struct fuse_req *req)
>> +static bool fuse_request_queue_background_uring(struct fuse_conn *fc,
>> +					       struct fuse_req *req)
>> +{
>> +	struct fuse_iqueue *fiq = &fc->iq;
>> +	int err;
>> +
>> +	req->in.h.unique = fuse_get_unique(fiq);
>> +	req->in.h.len = sizeof(struct fuse_in_header) +
>> +		fuse_len_args(req->args->in_numargs,
>> +			      (struct fuse_arg *) req->args->in_args);
>> +
>> +	err = fuse_uring_queue_fuse_req(fc, req);
>> +	if (!err) {
> 
> I'd rather
> 
> if (err)
> 	return false;
> 
> Then the rest of this code.
> 
> Also generally speaking I think you're correct, below isn't needed because the
> queues themselves have their own limits, so I think just delete this bit.
> 
>> +		/* XXX remove and lets the users of that use per queue values -
>> +		 * avoid the shared spin lock...
>> +		 * Is this needed at all?
>> +		 */
>> +		spin_lock(&fc->bg_lock);
>> +		fc->num_background++;
>> +		fc->active_background++;


I now actually think we still need it, because in the current version it
queues to queue->async_fuse_req_queue  / queue->sync_fuse_req_queue.

>> +
>> +
>> +		/* XXX block when per ring queues get occupied */
>> +		if (fc->num_background == fc->max_background)
>> +			fc->blocked = 1;

I need to double check again, but I think I can just remove both XXX.

I also just see an issue with fc->active_background, fuse_request_end()
is decreasing it unconditionally, but with uring we never increase it
(and don't need it). I think I need an FR_URING flag.


>> +		spin_unlock(&fc->bg_lock);
>> +	}
>> +
>> +	return err ? false : true;
>> +}
>> +
>> +/*
>> + * @return true if queued
>> + */
>> +static int fuse_request_queue_background(struct fuse_req *req)
>>  {
>>  	struct fuse_mount *fm = req->fm;
>>  	struct fuse_conn *fc = fm->fc;
>>  	bool queued = false;
>>  
>>  	WARN_ON(!test_bit(FR_BACKGROUND, &req->flags));
>> +
>> +	if (fuse_uring_ready(fc))
>> +		return fuse_request_queue_background_uring(fc, req);
>> +
>>  	if (!test_bit(FR_WAITING, &req->flags)) {
>>  		__set_bit(FR_WAITING, &req->flags);
>>  		atomic_inc(&fc->num_waiting);
>> @@ -576,7 +636,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
>>  				    struct fuse_args *args, u64 unique)
>>  {
>>  	struct fuse_req *req;
>> -	struct fuse_iqueue *fiq = &fm->fc->iq;
>> +	struct fuse_conn *fc = fm->fc;
>> +	struct fuse_iqueue *fiq = &fc->iq;
>>  	int err = 0;
>>  
>>  	req = fuse_get_req(fm, false);
>> @@ -590,7 +651,8 @@ static int fuse_simple_notify_reply(struct fuse_mount *fm,
>>  
>>  	spin_lock(&fiq->lock);
>>  	if (fiq->connected) {
>> -		queue_request_and_unlock(fiq, req);
>> +		/* uring for notify not supported yet */
>> +		queue_request_and_unlock(fc, req, false);
>>  	} else {
>>  		err = -ENODEV;
>>  		spin_unlock(&fiq->lock);
>> @@ -2205,6 +2267,7 @@ void fuse_abort_conn(struct fuse_conn *fc)
>>  		fuse_uring_set_stopped(fc);
>>  
>>  		fuse_set_initialized(fc);
>> +
> 
> Extraneous newline.
> 
>>  		list_for_each_entry(fud, &fc->devices, entry) {
>>  			struct fuse_pqueue *fpq = &fud->pq;
>>  
>> @@ -2478,6 +2541,7 @@ static long fuse_uring_ioctl(struct file *file, __u32 __user *argp)
>>  		if (res != 0)
>>  			return res;
>>  		break;
>> +
> 
> Extraneous newline.
> 

Sorry, these two slipped through.

>>  		case FUSE_URING_IOCTL_CMD_QUEUE_CFG:
>>  			fud->uring_dev = 1;
>>  			res = fuse_uring_queue_cfg(fc->ring, &cfg.qconf);
>> diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
>> index 6001ba4d6e82..fe80e66150c3 100644
>> --- a/fs/fuse/dev_uring.c
>> +++ b/fs/fuse/dev_uring.c
>> @@ -32,8 +32,7 @@
>>  #include <linux/io_uring/cmd.h>
>>  
>>  static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
>> -					    bool set_err, int error,
>> -					    unsigned int issue_flags);
>> +					    bool set_err, int error);
>>  
>>  static void fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
>>  {
>> @@ -683,8 +682,7 @@ static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
>>   * userspace will read it
>>   * This is comparable with classical read(/dev/fuse)
>>   */
>> -static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
>> -				    unsigned int issue_flags, bool send_in_task)
>> +static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent)
>>  {
>>  	struct fuse_ring *ring = ring_ent->queue->ring;
>>  	struct fuse_ring_req *rreq = ring_ent->rreq;
>> @@ -721,20 +719,17 @@ static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
>>  	rreq->in = req->in.h;
>>  	set_bit(FR_SENT, &req->flags);
>>  
>> -	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu issue_flags=%u\n",
>> +	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu\n",
>>  		 __func__, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
>> -		 rreq->in.opcode, rreq->in.unique, issue_flags);
>> +		 rreq->in.opcode, rreq->in.unique);
>>  
>> -	if (send_in_task)
>> -		io_uring_cmd_complete_in_task(ring_ent->cmd,
>> -					      fuse_uring_async_send_to_ring);
>> -	else
>> -		io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
>> +	io_uring_cmd_complete_in_task(ring_ent->cmd,
>> +				      fuse_uring_async_send_to_ring);

Oops, here went something wrong, in the previous patch, which had
introduce the "if (send_in_task)" - this part of later patch.

>>  
>>  	return;
>>  
>>  err:
>> -	fuse_uring_req_end_and_get_next(ring_ent, true, err, issue_flags);
>> +	fuse_uring_req_end_and_get_next(ring_ent, true, err);
>>  }
>>  
>>  /*
>> @@ -811,8 +806,7 @@ static bool fuse_uring_ent_release_and_fetch(struct fuse_ring_ent *ring_ent)
>>   * has lock/unlock/lock to avoid holding the lock on calling fuse_request_end
>>   */
>>  static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
>> -					    bool set_err, int error,
>> -					    unsigned int issue_flags)
>> +					    bool set_err, int error)
>>  {
>>  	struct fuse_req *req = ring_ent->fuse_req;
>>  	int has_next;
>> @@ -828,7 +822,7 @@ static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
>>  	has_next = fuse_uring_ent_release_and_fetch(ring_ent);
>>  	if (has_next) {
>>  		/* called within uring context - use provided flags */
>> -		fuse_uring_send_to_ring(ring_ent, issue_flags, false);
>> +		fuse_uring_send_to_ring(ring_ent);
>>  	}
>>  }
>>  
>> @@ -863,7 +857,7 @@ static void fuse_uring_commit_and_release(struct fuse_dev *fud,
>>  out:
>>  	pr_devel("%s:%d ret=%zd op=%d req-ret=%d\n", __func__, __LINE__, err,
>>  		 req->args->opcode, req->out.h.error);
>> -	fuse_uring_req_end_and_get_next(ring_ent, set_err, err, issue_flags);
>> +	fuse_uring_req_end_and_get_next(ring_ent, set_err, err);
>>  }
>>  
>>  /*
>> @@ -1101,3 +1095,69 @@ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
>>  	goto out;
>>  }
>>  
>> +int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
>> +{
>> +	struct fuse_ring *ring = fc->ring;
>> +	struct fuse_ring_queue *queue;
>> +	int qid = 0;
>> +	struct fuse_ring_ent *ring_ent = NULL;
>> +	int res;
>> +	bool async = test_bit(FR_BACKGROUND, &req->flags);
>> +	struct list_head *req_queue, *ent_queue;
>> +
>> +	if (ring->per_core_queue) {
>> +		/*
>> +		 * async requests are best handled on another core, the current
>> +		 * core can do application/page handling, while the async request
>> +		 * is handled on another core in userspace.
>> +		 * For sync request the application has to wait - no processing, so
>> +		 * the request should continue on the current core and avoid context
>> +		 * switches.
>> +		 * XXX This should be on the same numa node and not busy - is there
>> +		 * a scheduler function available  that could make this decision?
>> +		 * It should also not persistently switch between cores - makes
>> +		 * it hard for the scheduler.
>> +		 */
>> +		qid = task_cpu(current);
>> +
>> +		if (unlikely(qid >= ring->nr_queues)) {
>> +			WARN_ONCE(1,
>> +				  "Core number (%u) exceeds nr ueues (%zu)\n",
>> +				  qid, ring->nr_queues);
>> +			qid = 0;
>> +		}
>> +	}
>> +
>> +	queue = fuse_uring_get_queue(ring, qid);
>> +	req_queue = async ? &queue->async_fuse_req_queue :
>> +			    &queue->sync_fuse_req_queue;
>> +	ent_queue = async ? &queue->async_ent_avail_queue :
>> +			    &queue->sync_ent_avail_queue;
>> +
>> +	spin_lock(&queue->lock);
>> +
>> +	if (unlikely(queue->stopped)) {
>> +		res = -ENOTCONN;
>> +		goto err_unlock;
> 
> This is the only place we use err_unlock, just do
> 
> if (unlikely(queue->stopped)) {
> 	spin_unlock(&queue->lock);
> 	return -ENOTCONN;
> }
> 
> and then you can get rid of res.  Thanks,


Thanks, will do.
(I personally typically avoid unlock/return in the middle of a function
as one can easily miss the unlock with new code additions - I have bad
experience with that).


Thanks,
Bernd
diff mbox series

Patch

diff --git a/fs/fuse/dev.c b/fs/fuse/dev.c
index 6ffd216b27c8..c7fd3849a105 100644
--- a/fs/fuse/dev.c
+++ b/fs/fuse/dev.c
@@ -218,13 +218,29 @@  const struct fuse_iqueue_ops fuse_dev_fiq_ops = {
 };
 EXPORT_SYMBOL_GPL(fuse_dev_fiq_ops);
 
-static void queue_request_and_unlock(struct fuse_iqueue *fiq,
-				     struct fuse_req *req)
+
+static void queue_request_and_unlock(struct fuse_conn *fc,
+				     struct fuse_req *req, bool allow_uring)
 __releases(fiq->lock)
 {
+	struct fuse_iqueue *fiq = &fc->iq;
+
 	req->in.h.len = sizeof(struct fuse_in_header) +
 		fuse_len_args(req->args->in_numargs,
 			      (struct fuse_arg *) req->args->in_args);
+
+	if (allow_uring && fuse_uring_ready(fc)) {
+		int res;
+
+		/* this lock is not needed at all for ring req handling */
+		spin_unlock(&fiq->lock);
+		res = fuse_uring_queue_fuse_req(fc, req);
+		if (!res)
+			return;
+
+		/* fallthrough, handled through /dev/fuse read/write */
+	}
+
 	list_add_tail(&req->list, &fiq->pending);
 	fiq->ops->wake_pending_and_unlock(fiq);
 }
@@ -261,7 +277,7 @@  static void flush_bg_queue(struct fuse_conn *fc)
 		fc->active_background++;
 		spin_lock(&fiq->lock);
 		req->in.h.unique = fuse_get_unique(fiq);
-		queue_request_and_unlock(fiq, req);
+		queue_request_and_unlock(fc, req, true);
 	}
 }
 
@@ -405,7 +421,8 @@  static void request_wait_answer(struct fuse_req *req)
 
 static void __fuse_request_send(struct fuse_req *req)
 {
-	struct fuse_iqueue *fiq = &req->fm->fc->iq;
+	struct fuse_conn *fc = req->fm->fc;
+	struct fuse_iqueue *fiq = &fc->iq;
 
 	BUG_ON(test_bit(FR_BACKGROUND, &req->flags));
 	spin_lock(&fiq->lock);
@@ -417,7 +434,7 @@  static void __fuse_request_send(struct fuse_req *req)
 		/* acquire extra reference, since request is still needed
 		   after fuse_request_end() */
 		__fuse_get_request(req);
-		queue_request_and_unlock(fiq, req);
+		queue_request_and_unlock(fc, req, true);
 
 		request_wait_answer(req);
 		/* Pairs with smp_wmb() in fuse_request_end() */
@@ -487,6 +504,10 @@  ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
 	if (args->force) {
 		atomic_inc(&fc->num_waiting);
 		req = fuse_request_alloc(fm, GFP_KERNEL | __GFP_NOFAIL);
+		if (unlikely(!req)) {
+			ret = -ENOTCONN;
+			goto err;
+		}
 
 		if (!args->nocreds)
 			fuse_force_creds(req);
@@ -514,16 +535,55 @@  ssize_t fuse_simple_request(struct fuse_mount *fm, struct fuse_args *args)
 	}
 	fuse_put_request(req);
 
+err:
 	return ret;
 }
 
-static bool fuse_request_queue_background(struct fuse_req *req)
+static bool fuse_request_queue_background_uring(struct fuse_conn *fc,
+					       struct fuse_req *req)
+{
+	struct fuse_iqueue *fiq = &fc->iq;
+	int err;
+
+	req->in.h.unique = fuse_get_unique(fiq);
+	req->in.h.len = sizeof(struct fuse_in_header) +
+		fuse_len_args(req->args->in_numargs,
+			      (struct fuse_arg *) req->args->in_args);
+
+	err = fuse_uring_queue_fuse_req(fc, req);
+	if (!err) {
+		/* XXX remove and lets the users of that use per queue values -
+		 * avoid the shared spin lock...
+		 * Is this needed at all?
+		 */
+		spin_lock(&fc->bg_lock);
+		fc->num_background++;
+		fc->active_background++;
+
+
+		/* XXX block when per ring queues get occupied */
+		if (fc->num_background == fc->max_background)
+			fc->blocked = 1;
+		spin_unlock(&fc->bg_lock);
+	}
+
+	return err ? false : true;
+}
+
+/*
+ * @return true if queued
+ */
+static int fuse_request_queue_background(struct fuse_req *req)
 {
 	struct fuse_mount *fm = req->fm;
 	struct fuse_conn *fc = fm->fc;
 	bool queued = false;
 
 	WARN_ON(!test_bit(FR_BACKGROUND, &req->flags));
+
+	if (fuse_uring_ready(fc))
+		return fuse_request_queue_background_uring(fc, req);
+
 	if (!test_bit(FR_WAITING, &req->flags)) {
 		__set_bit(FR_WAITING, &req->flags);
 		atomic_inc(&fc->num_waiting);
@@ -576,7 +636,8 @@  static int fuse_simple_notify_reply(struct fuse_mount *fm,
 				    struct fuse_args *args, u64 unique)
 {
 	struct fuse_req *req;
-	struct fuse_iqueue *fiq = &fm->fc->iq;
+	struct fuse_conn *fc = fm->fc;
+	struct fuse_iqueue *fiq = &fc->iq;
 	int err = 0;
 
 	req = fuse_get_req(fm, false);
@@ -590,7 +651,8 @@  static int fuse_simple_notify_reply(struct fuse_mount *fm,
 
 	spin_lock(&fiq->lock);
 	if (fiq->connected) {
-		queue_request_and_unlock(fiq, req);
+		/* uring for notify not supported yet */
+		queue_request_and_unlock(fc, req, false);
 	} else {
 		err = -ENODEV;
 		spin_unlock(&fiq->lock);
@@ -2205,6 +2267,7 @@  void fuse_abort_conn(struct fuse_conn *fc)
 		fuse_uring_set_stopped(fc);
 
 		fuse_set_initialized(fc);
+
 		list_for_each_entry(fud, &fc->devices, entry) {
 			struct fuse_pqueue *fpq = &fud->pq;
 
@@ -2478,6 +2541,7 @@  static long fuse_uring_ioctl(struct file *file, __u32 __user *argp)
 		if (res != 0)
 			return res;
 		break;
+
 		case FUSE_URING_IOCTL_CMD_QUEUE_CFG:
 			fud->uring_dev = 1;
 			res = fuse_uring_queue_cfg(fc->ring, &cfg.qconf);
diff --git a/fs/fuse/dev_uring.c b/fs/fuse/dev_uring.c
index 6001ba4d6e82..fe80e66150c3 100644
--- a/fs/fuse/dev_uring.c
+++ b/fs/fuse/dev_uring.c
@@ -32,8 +32,7 @@ 
 #include <linux/io_uring/cmd.h>
 
 static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
-					    bool set_err, int error,
-					    unsigned int issue_flags);
+					    bool set_err, int error);
 
 static void fuse_ring_ring_ent_unset_userspace(struct fuse_ring_ent *ent)
 {
@@ -683,8 +682,7 @@  static int fuse_uring_copy_to_ring(struct fuse_ring *ring, struct fuse_req *req,
  * userspace will read it
  * This is comparable with classical read(/dev/fuse)
  */
-static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
-				    unsigned int issue_flags, bool send_in_task)
+static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent)
 {
 	struct fuse_ring *ring = ring_ent->queue->ring;
 	struct fuse_ring_req *rreq = ring_ent->rreq;
@@ -721,20 +719,17 @@  static void fuse_uring_send_to_ring(struct fuse_ring_ent *ring_ent,
 	rreq->in = req->in.h;
 	set_bit(FR_SENT, &req->flags);
 
-	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu issue_flags=%u\n",
+	pr_devel("%s qid=%d tag=%d state=%lu cmd-done op=%d unique=%llu\n",
 		 __func__, ring_ent->queue->qid, ring_ent->tag, ring_ent->state,
-		 rreq->in.opcode, rreq->in.unique, issue_flags);
+		 rreq->in.opcode, rreq->in.unique);
 
-	if (send_in_task)
-		io_uring_cmd_complete_in_task(ring_ent->cmd,
-					      fuse_uring_async_send_to_ring);
-	else
-		io_uring_cmd_done(ring_ent->cmd, 0, 0, issue_flags);
+	io_uring_cmd_complete_in_task(ring_ent->cmd,
+				      fuse_uring_async_send_to_ring);
 
 	return;
 
 err:
-	fuse_uring_req_end_and_get_next(ring_ent, true, err, issue_flags);
+	fuse_uring_req_end_and_get_next(ring_ent, true, err);
 }
 
 /*
@@ -811,8 +806,7 @@  static bool fuse_uring_ent_release_and_fetch(struct fuse_ring_ent *ring_ent)
  * has lock/unlock/lock to avoid holding the lock on calling fuse_request_end
  */
 static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
-					    bool set_err, int error,
-					    unsigned int issue_flags)
+					    bool set_err, int error)
 {
 	struct fuse_req *req = ring_ent->fuse_req;
 	int has_next;
@@ -828,7 +822,7 @@  static void fuse_uring_req_end_and_get_next(struct fuse_ring_ent *ring_ent,
 	has_next = fuse_uring_ent_release_and_fetch(ring_ent);
 	if (has_next) {
 		/* called within uring context - use provided flags */
-		fuse_uring_send_to_ring(ring_ent, issue_flags, false);
+		fuse_uring_send_to_ring(ring_ent);
 	}
 }
 
@@ -863,7 +857,7 @@  static void fuse_uring_commit_and_release(struct fuse_dev *fud,
 out:
 	pr_devel("%s:%d ret=%zd op=%d req-ret=%d\n", __func__, __LINE__, err,
 		 req->args->opcode, req->out.h.error);
-	fuse_uring_req_end_and_get_next(ring_ent, set_err, err, issue_flags);
+	fuse_uring_req_end_and_get_next(ring_ent, set_err, err);
 }
 
 /*
@@ -1101,3 +1095,69 @@  int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
 	goto out;
 }
 
+int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
+{
+	struct fuse_ring *ring = fc->ring;
+	struct fuse_ring_queue *queue;
+	int qid = 0;
+	struct fuse_ring_ent *ring_ent = NULL;
+	int res;
+	bool async = test_bit(FR_BACKGROUND, &req->flags);
+	struct list_head *req_queue, *ent_queue;
+
+	if (ring->per_core_queue) {
+		/*
+		 * async requests are best handled on another core, the current
+		 * core can do application/page handling, while the async request
+		 * is handled on another core in userspace.
+		 * For sync request the application has to wait - no processing, so
+		 * the request should continue on the current core and avoid context
+		 * switches.
+		 * XXX This should be on the same numa node and not busy - is there
+		 * a scheduler function available  that could make this decision?
+		 * It should also not persistently switch between cores - makes
+		 * it hard for the scheduler.
+		 */
+		qid = task_cpu(current);
+
+		if (unlikely(qid >= ring->nr_queues)) {
+			WARN_ONCE(1,
+				  "Core number (%u) exceeds nr ueues (%zu)\n",
+				  qid, ring->nr_queues);
+			qid = 0;
+		}
+	}
+
+	queue = fuse_uring_get_queue(ring, qid);
+	req_queue = async ? &queue->async_fuse_req_queue :
+			    &queue->sync_fuse_req_queue;
+	ent_queue = async ? &queue->async_ent_avail_queue :
+			    &queue->sync_ent_avail_queue;
+
+	spin_lock(&queue->lock);
+
+	if (unlikely(queue->stopped)) {
+		res = -ENOTCONN;
+		goto err_unlock;
+	}
+
+	if (list_empty(ent_queue)) {
+		list_add_tail(&req->list, req_queue);
+	} else {
+		ring_ent =
+			list_first_entry(ent_queue, struct fuse_ring_ent, list);
+		list_del(&ring_ent->list);
+		fuse_uring_add_req_to_ring_ent(ring_ent, req);
+	}
+	spin_unlock(&queue->lock);
+
+	if (ring_ent != NULL)
+		fuse_uring_send_to_ring(ring_ent);
+
+	return 0;
+
+err_unlock:
+	spin_unlock(&queue->lock);
+	return res;
+}
+
diff --git a/fs/fuse/dev_uring_i.h b/fs/fuse/dev_uring_i.h
index e5fc84e2f3ea..5d7e1e6e7a82 100644
--- a/fs/fuse/dev_uring_i.h
+++ b/fs/fuse/dev_uring_i.h
@@ -208,6 +208,7 @@  int fuse_uring_queue_cfg(struct fuse_ring *ring,
 void fuse_uring_ring_destruct(struct fuse_ring *ring);
 void fuse_uring_stop_queues(struct fuse_ring *ring);
 int fuse_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags);
+int fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req);
 
 static inline void fuse_uring_conn_init(struct fuse_ring *ring,
 					struct fuse_conn *fc)
@@ -331,6 +332,11 @@  static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
 			   atomic_read(&ring->queue_refs) == 0);
 }
 
+static inline bool fuse_uring_ready(struct fuse_conn *fc)
+{
+	return fc->ring && fc->ring->ready;
+}
+
 #else /* CONFIG_FUSE_IO_URING */
 
 struct fuse_ring;
@@ -366,6 +372,17 @@  static inline void fuse_uring_wait_stopped_queues(struct fuse_conn *fc)
 {
 }
 
+static inline bool fuse_uring_ready(struct fuse_conn *fc)
+{
+	return false;
+}
+
+static inline int
+fuse_uring_queue_fuse_req(struct fuse_conn *fc, struct fuse_req *req)
+{
+	return -EPFNOSUPPORT;
+}
+
 #endif /* CONFIG_FUSE_IO_URING */
 
 #endif /* _FS_FUSE_DEV_URING_I_H */