diff mbox series

[RFC,3/3] drivers: spi: spi.c: Don't use the message queue if possible in spi_sync

Message ID 20220525142928.2335378-4-david@protonic.nl (mailing list archive)
State Superseded
Headers show
Series Optimize spi_sync path | expand

Commit Message

David Jander May 25, 2022, 2:29 p.m. UTC
The interaction with the controller message queue and its corresponding
auxiliary flags and variables requires the use of the queue_lock which is
costly. Since spi_sync will transfer the complete message anyway, and not
return until it is finished, there is no need to put the message into the
queue if the queue is empty. This can save a lot of overhead.

As an example of how significant this is, when using the MCP2518FD SPI CAN
controller on a i.MX8MM SoC, the time during which the interrupt line
stays active (during 3 relatively short spi_sync messages), is reduced
from 98us to 72us by this patch.

Signed-off-by: David Jander <david@protonic.nl>
---
 drivers/spi/spi.c       | 244 ++++++++++++++++++++++++----------------
 include/linux/spi/spi.h |  11 +-
 2 files changed, 157 insertions(+), 98 deletions(-)

Comments

David Jander May 25, 2022, 2:46 p.m. UTC | #1
On Wed, 25 May 2022 16:29:28 +0200
David Jander <david@protonic.nl> wrote:

> The interaction with the controller message queue and its corresponding
> auxiliary flags and variables requires the use of the queue_lock which is
> costly. Since spi_sync will transfer the complete message anyway, and not
> return until it is finished, there is no need to put the message into the
> queue if the queue is empty. This can save a lot of overhead.
> 
> As an example of how significant this is, when using the MCP2518FD SPI CAN
> controller on a i.MX8MM SoC, the time during which the interrupt line
> stays active (during 3 relatively short spi_sync messages), is reduced
> from 98us to 72us by this patch.
> 
> Signed-off-by: David Jander <david@protonic.nl>
> ---
>  drivers/spi/spi.c       | 244 ++++++++++++++++++++++++----------------
>  include/linux/spi/spi.h |  11 +-
>  2 files changed, 157 insertions(+), 98 deletions(-)
> 
> diff --git a/drivers/spi/spi.c b/drivers/spi/spi.c
> index 1d50051f3d57..401ac2f4758e 100644
> --- a/drivers/spi/spi.c
> +++ b/drivers/spi/spi.c
> @@ -1558,6 +1558,80 @@ static void spi_idle_runtime_pm(struct spi_controller *ctlr)
>  	}
>  }
>  
> +static int __spi_pump_transfer_message(struct spi_controller *ctlr,
> +		struct spi_message *msg, bool was_busy)
> +{
> +	struct spi_transfer *xfer;
> +	int ret;
> +
> +	if (!was_busy && ctlr->auto_runtime_pm) {
> +		ret = pm_runtime_get_sync(ctlr->dev.parent);
> +		if (ret < 0) {
> +			pm_runtime_put_noidle(ctlr->dev.parent);
> +			dev_err(&ctlr->dev, "Failed to power device: %d\n",
> +				ret);
> +			return ret;
> +		}
> +	}
> +
> +	if (!was_busy)
> +		trace_spi_controller_busy(ctlr);
> +
> +	if (!was_busy && ctlr->prepare_transfer_hardware) {
> +		ret = ctlr->prepare_transfer_hardware(ctlr);
> +		if (ret) {
> +			dev_err(&ctlr->dev,
> +				"failed to prepare transfer hardware: %d\n",
> +				ret);
> +
> +			if (ctlr->auto_runtime_pm)
> +				pm_runtime_put(ctlr->dev.parent);
> +
> +			msg->status = ret;
> +			spi_finalize_message(msg);
> +
> +			return ret;
> +		}
> +	}
> +
> +	trace_spi_message_start(msg);
> +
> +	if (ctlr->prepare_message) {
> +		ret = ctlr->prepare_message(ctlr, msg);
> +		if (ret) {
> +			dev_err(&ctlr->dev, "failed to prepare message: %d\n",
> +				ret);
> +			msg->status = ret;
> +			spi_finalize_message(msg);
> +			return ret;
> +		}
> +		msg->prepared = true;
> +	}
> +
> +	ret = spi_map_msg(ctlr, msg);
> +	if (ret) {
> +		msg->status = ret;
> +		spi_finalize_message(msg);
> +		return ret;
> +	}
> +
> +	if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) {
> +		list_for_each_entry(xfer, &msg->transfers, transfer_list) {
> +			xfer->ptp_sts_word_pre = 0;
> +			ptp_read_system_prets(xfer->ptp_sts);
> +		}
> +	}
> +
> +	ret = ctlr->transfer_one_message(ctlr, msg);
> +	if (ret) {
> +		dev_err(&ctlr->dev,
> +			"failed to transfer one message from queue\n");
> +		return ret;
> +	}
> +
> +	return 0;
> +}
> +
>  /**
>   * __spi_pump_messages - function which processes spi message queue
>   * @ctlr: controller to process queue for
> @@ -1573,7 +1647,6 @@ static void spi_idle_runtime_pm(struct spi_controller *ctlr)
>   */
>  static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
>  {
> -	struct spi_transfer *xfer;
>  	struct spi_message *msg;
>  	bool was_busy = false;
>  	unsigned long flags;
> @@ -1608,6 +1681,7 @@ static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
>  			    !ctlr->unprepare_transfer_hardware) {
>  				spi_idle_runtime_pm(ctlr);
>  				ctlr->busy = false;
> +				ctlr->queue_empty = true;
>  				trace_spi_controller_idle(ctlr);
>  			} else {
>  				kthread_queue_work(ctlr->kworker,
> @@ -1634,6 +1708,7 @@ static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
>  
>  		spin_lock_irqsave(&ctlr->queue_lock, flags);
>  		ctlr->idling = false;
> +		ctlr->queue_empty = true;
>  		spin_unlock_irqrestore(&ctlr->queue_lock, flags);
>  		return;
>  	}
> @@ -1650,75 +1725,7 @@ static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
>  	spin_unlock_irqrestore(&ctlr->queue_lock, flags);
>  
>  	mutex_lock(&ctlr->io_mutex);
> -
> -	if (!was_busy && ctlr->auto_runtime_pm) {
> -		ret = pm_runtime_get_sync(ctlr->dev.parent);
> -		if (ret < 0) {
> -			pm_runtime_put_noidle(ctlr->dev.parent);
> -			dev_err(&ctlr->dev, "Failed to power device: %d\n",
> -				ret);
> -			mutex_unlock(&ctlr->io_mutex);
> -			return;
> -		}
> -	}
> -
> -	if (!was_busy)
> -		trace_spi_controller_busy(ctlr);
> -
> -	if (!was_busy && ctlr->prepare_transfer_hardware) {
> -		ret = ctlr->prepare_transfer_hardware(ctlr);
> -		if (ret) {
> -			dev_err(&ctlr->dev,
> -				"failed to prepare transfer hardware: %d\n",
> -				ret);
> -
> -			if (ctlr->auto_runtime_pm)
> -				pm_runtime_put(ctlr->dev.parent);
> -
> -			msg->status = ret;
> -			spi_finalize_message(msg);
> -
> -			mutex_unlock(&ctlr->io_mutex);
> -			return;
> -		}
> -	}
> -
> -	trace_spi_message_start(msg);
> -
> -	if (ctlr->prepare_message) {
> -		ret = ctlr->prepare_message(ctlr, msg);
> -		if (ret) {
> -			dev_err(&ctlr->dev, "failed to prepare message: %d\n",
> -				ret);
> -			msg->status = ret;
> -			spi_finalize_message(msg);
> -			goto out;
> -		}
> -		msg->prepared = true;
> -	}
> -
> -	ret = spi_map_msg(ctlr, msg);
> -	if (ret) {
> -		msg->status = ret;
> -		spi_finalize_message(msg);
> -		goto out;
> -	}
> -
> -	if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) {
> -		list_for_each_entry(xfer, &msg->transfers, transfer_list) {
> -			xfer->ptp_sts_word_pre = 0;
> -			ptp_read_system_prets(xfer->ptp_sts);
> -		}
> -	}
> -
> -	ret = ctlr->transfer_one_message(ctlr, msg);
> -	if (ret) {
> -		dev_err(&ctlr->dev,
> -			"failed to transfer one message from queue\n");
> -		goto out;
> -	}
> -
> -out:
> +	ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
>  	mutex_unlock(&ctlr->io_mutex);
>  
>  	/* Prod the scheduler in case transfer_one() was busy waiting */
> @@ -1848,6 +1855,7 @@ static int spi_init_queue(struct spi_controller *ctlr)
>  {
>  	ctlr->running = false;
>  	ctlr->busy = false;
> +	ctlr->queue_empty = true;
>  
>  	ctlr->kworker = kthread_create_worker(0, dev_name(&ctlr->dev));
>  	if (IS_ERR(ctlr->kworker)) {
> @@ -1941,11 +1949,15 @@ void spi_finalize_message(struct spi_message *mesg)
>  
>  	mesg->prepared = false;
>  
> -	spin_lock_irqsave(&ctlr->queue_lock, flags);
> -	ctlr->cur_msg = NULL;
> -	ctlr->fallback = false;
> -	kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
> -	spin_unlock_irqrestore(&ctlr->queue_lock, flags);
> +	if (!mesg->sync) {
> +		spin_lock_irqsave(&ctlr->queue_lock, flags);
> +		WARN(ctlr->cur_msg != mesg,
> +			"Finalizing queued message that is not the current head of queue!");
> +		ctlr->cur_msg = NULL;
> +		ctlr->fallback = false;
> +		kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
> +		spin_unlock_irqrestore(&ctlr->queue_lock, flags);
> +	}
>  
>  	trace_spi_message_done(mesg);
>  
> @@ -2048,6 +2060,7 @@ static int __spi_queued_transfer(struct spi_device *spi,
>  	msg->status = -EINPROGRESS;
>  
>  	list_add_tail(&msg->queue, &ctlr->queue);
> +	ctlr->queue_empty = false;
>  	if (!ctlr->busy && need_pump)
>  		kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
>  
> @@ -3937,6 +3950,42 @@ static int spi_async_locked(struct spi_device *spi, struct spi_message *message)
>  
>  }
>  
> +static void __spi_transfer_message_noqueue(struct spi_controller *ctlr, struct spi_message *msg)
> +{
> +	bool was_busy;
> +	int ret;
> +
> +	mutex_lock(&ctlr->io_mutex);
> +
> +	/* If another context is idling the device then wait */
> +	while (ctlr->idling) {
> +		printk(KERN_INFO "spi sync message processing: controller is idling!\n");
> +		usleep_range(10000, 11000);
> +	}

This is dead ugly of course, and it needs to be removed. Not yet sure how,
hence the RFC. Maybe the idle -> not busy transition can be included inside
the io_mutex? That way this while will never be hit and can be removed...

Best regards,

> +	was_busy = READ_ONCE(ctlr->busy);
> +
> +	ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
> +	if (ret)
> +		goto out;
> +
> +	if (!was_busy) {
> +		kfree(ctlr->dummy_rx);
> +		ctlr->dummy_rx = NULL;
> +		kfree(ctlr->dummy_tx);
> +		ctlr->dummy_tx = NULL;
> +		if (ctlr->unprepare_transfer_hardware &&
> +		    ctlr->unprepare_transfer_hardware(ctlr))
> +			dev_err(&ctlr->dev,
> +				"failed to unprepare transfer hardware\n");
> +		spi_idle_runtime_pm(ctlr);
> +	}
> +
> +out:
> +	mutex_unlock(&ctlr->io_mutex);
> +	return;
> +}
> +
>  /*-------------------------------------------------------------------------*/
>  
>  /*
> @@ -3955,51 +4004,52 @@ static int __spi_sync(struct spi_device *spi, struct spi_message *message)
>  	DECLARE_COMPLETION_ONSTACK(done);
>  	int status;
>  	struct spi_controller *ctlr = spi->controller;
> -	unsigned long flags;
>  
>  	status = __spi_validate(spi, message);
>  	if (status != 0)
>  		return status;
>  
> -	message->complete = spi_complete;
> -	message->context = &done;
>  	message->spi = spi;
>  
>  	SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync);
>  	SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync);
>  
>  	/*
> -	 * If we're not using the legacy transfer method then we will
> -	 * try to transfer in the calling context so special case.
> -	 * This code would be less tricky if we could remove the
> -	 * support for driver implemented message queues.
> +	 * Checking queue_empty here only guarantees async/sync message
> +	 * ordering when coming from the same context. It does not need to
> +	 * guard against reentrancy from a different context. The io_mutex
> +	 * will catch those cases.
>  	 */
> -	if (ctlr->transfer == spi_queued_transfer) {
> -		spin_lock_irqsave(&ctlr->bus_lock_spinlock, flags);
> +	if (READ_ONCE(ctlr->queue_empty)) {
> +		message->sync = true;
> +		message->actual_length = 0;
> +		message->status = -EINPROGRESS;
>  
>  		trace_spi_message_submit(message);
>  
> -		status = __spi_queued_transfer(spi, message, false);
> +		SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync_immediate);
> +		SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync_immediate);
>  
> -		spin_unlock_irqrestore(&ctlr->bus_lock_spinlock, flags);
> -	} else {
> -		status = spi_async_locked(spi, message);
> +		__spi_transfer_message_noqueue(ctlr, message);
> +
> +		return message->status;
>  	}
>  
> +	/*
> +	 * There are messages in the async queue that could have originated
> +	 * from the same context, so we need to preserve ordering.
> +	 * Therefor we send the message to the async queue and wait until they
> +	 * are completed.
> +	 */
> +	message->complete = spi_complete;
> +	message->context = &done;
> +	status = spi_async_locked(spi, message);
>  	if (status == 0) {
> -		/* Push out the messages in the calling context if we can */
> -		if (ctlr->transfer == spi_queued_transfer) {
> -			SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics,
> -						       spi_sync_immediate);
> -			SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics,
> -						       spi_sync_immediate);
> -			__spi_pump_messages(ctlr, false);
> -		}
> -
>  		wait_for_completion(&done);
>  		status = message->status;
>  	}
>  	message->context = NULL;
> +
>  	return status;
>  }
>  
> diff --git a/include/linux/spi/spi.h b/include/linux/spi/spi.h
> index 43ec1e262913..9caed8537755 100644
> --- a/include/linux/spi/spi.h
> +++ b/include/linux/spi/spi.h
> @@ -449,6 +449,8 @@ extern struct spi_device *spi_new_ancillary_device(struct spi_device *spi, u8 ch
>   * @irq_flags: Interrupt enable state during PTP system timestamping
>   * @fallback: fallback to pio if dma transfer return failure with
>   *	SPI_TRANS_FAIL_NO_START.
> + * @queue_empty: signal green light for opportunistically skipping the queue
> + * 	for spi_sync transfers.
>   *
>   * Each SPI controller can communicate with one or more @spi_device
>   * children.  These make a small bus, sharing MOSI, MISO and SCK signals
> @@ -665,6 +667,9 @@ struct spi_controller {
>  
>  	/* Interrupt enable state during PTP system timestamping */
>  	unsigned long		irq_flags;
> +
> +	/* Flag for enabling opportunistic skipping of the queue in spi_sync */
> +	bool			queue_empty;
>  };
>  
>  static inline void *spi_controller_get_devdata(struct spi_controller *ctlr)
> @@ -974,6 +979,7 @@ struct spi_transfer {
>   * @state: for use by whichever driver currently owns the message
>   * @resources: for resource management when the spi message is processed
>   * @prepared: spi_prepare_message was called for the this message
> + * @sync: this message took the direct sync path skipping the async queue
>   *
>   * A @spi_message is used to execute an atomic sequence of data transfers,
>   * each represented by a struct spi_transfer.  The sequence is "atomic"
> @@ -1025,7 +1031,10 @@ struct spi_message {
>  	struct list_head        resources;
>  
>  	/* spi_prepare_message was called for this message */
> -	bool                    prepared;
> +	bool			prepared;
> +
> +	/* this message is skipping the async queue */
> +	bool			sync;
>  };
>  
>  static inline void spi_message_init_no_memset(struct spi_message *m)
Mark Brown June 7, 2022, 6:30 p.m. UTC | #2
On Wed, May 25, 2022 at 04:46:03PM +0200, David Jander wrote:
> David Jander <david@protonic.nl> wrote:

> > +static void __spi_transfer_message_noqueue(struct spi_controller *ctlr, struct spi_message *msg)
> > +{
> > +	bool was_busy;
> > +	int ret;
> > +
> > +	mutex_lock(&ctlr->io_mutex);
> > +
> > +	/* If another context is idling the device then wait */
> > +	while (ctlr->idling) {
> > +		printk(KERN_INFO "spi sync message processing: controller is idling!\n");
> > +		usleep_range(10000, 11000);
> > +	}

> This is dead ugly of course, and it needs to be removed. Not yet sure how,
> hence the RFC. Maybe the idle -> not busy transition can be included inside
> the io_mutex? That way this while will never be hit and can be removed...

I'm not sure it's even quite right from a safety point of view - idling
is protected by queue_lock but this now only takes io_mutex.  Moving
idling (and all the was_busy stuff) within the io_mutex would definitely
resolve the issue, the async submission context is the only one that
really needs the spinlock and it doesn't care about idling.  I can't
think what you could do with the io_mutex when idling so it seems to
fit.
David Jander June 8, 2022, 7:54 a.m. UTC | #3
On Tue, 7 Jun 2022 19:30:55 +0100
Mark Brown <broonie@kernel.org> wrote:

> On Wed, May 25, 2022 at 04:46:03PM +0200, David Jander wrote:
> > David Jander <david@protonic.nl> wrote:  
> 
> > > +static void __spi_transfer_message_noqueue(struct spi_controller *ctlr, struct spi_message *msg)
> > > +{
> > > +	bool was_busy;
> > > +	int ret;
> > > +
> > > +	mutex_lock(&ctlr->io_mutex);
> > > +
> > > +	/* If another context is idling the device then wait */
> > > +	while (ctlr->idling) {
> > > +		printk(KERN_INFO "spi sync message processing: controller is idling!\n");
> > > +		usleep_range(10000, 11000);
> > > +	}  
> 
> > This is dead ugly of course, and it needs to be removed. Not yet sure how,
> > hence the RFC. Maybe the idle -> not busy transition can be included inside
> > the io_mutex? That way this while will never be hit and can be removed...  
> 
> I'm not sure it's even quite right from a safety point of view - idling
> is protected by queue_lock but this now only takes io_mutex.  

True. This is broken.

> Moving idling (and all the was_busy stuff) within the io_mutex would
> definitely resolve the issue, the async submission context is the only one
> that really needs the spinlock and it doesn't care about idling.  I can't
> think what you could do with the io_mutex when idling so it seems to
> fit.

Ok, so we could agree on a way to fix this particular issue: put the idling
transition into the io_mutex. Thanks.

Looking forward to read comments on the rest of the code, and the general idea
of what I am trying to accomplish.

Best regards,
Mark Brown June 8, 2022, 11:29 a.m. UTC | #4
On Wed, Jun 08, 2022 at 09:54:09AM +0200, David Jander wrote:
> Mark Brown <broonie@kernel.org> wrote:

> > Moving idling (and all the was_busy stuff) within the io_mutex would
> > definitely resolve the issue, the async submission context is the only one
> > that really needs the spinlock and it doesn't care about idling.  I can't
> > think what you could do with the io_mutex when idling so it seems to
> > fit.

> Ok, so we could agree on a way to fix this particular issue: put the idling
> transition into the io_mutex. Thanks.

> Looking forward to read comments on the rest of the code, and the general idea
> of what I am trying to accomplish.

I think the rest of it is fine or at least I'm finding it difficult to
see anything beyond the concurrency issues.  I think we need to do an
audit to find any users that are doing a spi_sync() to complete a
sequence of spi_async() operations but I'm not aware of any and if it
delivers the performance benefits it's probably worth changing that
aspect of the driver API.
Andy Shevchenko June 8, 2022, 1:43 p.m. UTC | #5
On Thu, May 26, 2022 at 2:46 AM David Jander <david@protonic.nl> wrote:
>
> The interaction with the controller message queue and its corresponding
> auxiliary flags and variables requires the use of the queue_lock which is
> costly. Since spi_sync will transfer the complete message anyway, and not
> return until it is finished, there is no need to put the message into the
> queue if the queue is empty. This can save a lot of overhead.
>
> As an example of how significant this is, when using the MCP2518FD SPI CAN
> controller on a i.MX8MM SoC, the time during which the interrupt line
> stays active (during 3 relatively short spi_sync messages), is reduced
> from 98us to 72us by this patch.

...

> +       /* If another context is idling the device then wait */
> +       while (ctlr->idling) {
> +               printk(KERN_INFO "spi sync message processing: controller is idling!\n");

printk() when we have a device pointer, why (despite of pr_info() existence)?

> +               usleep_range(10000, 11000);
> +       }
> +
> +       was_busy = READ_ONCE(ctlr->busy);
> +
> +       ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
> +       if (ret)
> +               goto out;
> +
> +       if (!was_busy) {
> +               kfree(ctlr->dummy_rx);
> +               ctlr->dummy_rx = NULL;
> +               kfree(ctlr->dummy_tx);
> +               ctlr->dummy_tx = NULL;
> +               if (ctlr->unprepare_transfer_hardware &&
> +                   ctlr->unprepare_transfer_hardware(ctlr))
> +                       dev_err(&ctlr->dev,
> +                               "failed to unprepare transfer hardware\n");
> +               spi_idle_runtime_pm(ctlr);
> +       }
David Jander June 8, 2022, 2:55 p.m. UTC | #6
On Wed, 8 Jun 2022 15:43:22 +0200
Andy Shevchenko <andy.shevchenko@gmail.com> wrote:

> On Thu, May 26, 2022 at 2:46 AM David Jander <david@protonic.nl> wrote:
> >
> > The interaction with the controller message queue and its corresponding
> > auxiliary flags and variables requires the use of the queue_lock which is
> > costly. Since spi_sync will transfer the complete message anyway, and not
> > return until it is finished, there is no need to put the message into the
> > queue if the queue is empty. This can save a lot of overhead.
> >
> > As an example of how significant this is, when using the MCP2518FD SPI CAN
> > controller on a i.MX8MM SoC, the time during which the interrupt line
> > stays active (during 3 relatively short spi_sync messages), is reduced
> > from 98us to 72us by this patch.  
> 
> ...
> 
> > +       /* If another context is idling the device then wait */
> > +       while (ctlr->idling) {
> > +               printk(KERN_INFO "spi sync message processing: controller is idling!\n");  
> 
> printk() when we have a device pointer, why (despite of pr_info() existence)?

Sorry for that. I often use printk() explicitly to remind me of things that
need to get removed, but in this case I left this broken piece of code on
purpose for the discussion and immediately addressed it in a reply to this
patch (hence the RFC tag in the subject).
Thanks for being vigilant, and sorry for the noise.

> > +               usleep_range(10000, 11000);
> > +       }
> > +
> > +       was_busy = READ_ONCE(ctlr->busy);
> > +
> > +       ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
> > +       if (ret)
> > +               goto out;
> > +
> > +       if (!was_busy) {
> > +               kfree(ctlr->dummy_rx);
> > +               ctlr->dummy_rx = NULL;
> > +               kfree(ctlr->dummy_tx);
> > +               ctlr->dummy_tx = NULL;
> > +               if (ctlr->unprepare_transfer_hardware &&
> > +                   ctlr->unprepare_transfer_hardware(ctlr))
> > +                       dev_err(&ctlr->dev,
> > +                               "failed to unprepare transfer hardware\n");
> > +               spi_idle_runtime_pm(ctlr);
> > +       }  
> 

Best regards,
David Jander June 9, 2022, 3:34 p.m. UTC | #7
On Wed, 8 Jun 2022 12:29:16 +0100
Mark Brown <broonie@kernel.org> wrote:

> On Wed, Jun 08, 2022 at 09:54:09AM +0200, David Jander wrote:
> > Mark Brown <broonie@kernel.org> wrote:  
> 
> > > Moving idling (and all the was_busy stuff) within the io_mutex would
> > > definitely resolve the issue, the async submission context is the only one
> > > that really needs the spinlock and it doesn't care about idling.  I can't
> > > think what you could do with the io_mutex when idling so it seems to
> > > fit.  
> 
> > Ok, so we could agree on a way to fix this particular issue: put the idling
> > transition into the io_mutex. Thanks.  
> 
> > Looking forward to read comments on the rest of the code, and the general idea
> > of what I am trying to accomplish.  
> 
> I think the rest of it is fine or at least I'm finding it difficult to
> see anything beyond the concurrency issues.  I think we need to do an
> audit to find any users that are doing a spi_sync() to complete a
> sequence of spi_async() operations but I'm not aware of any and if it
> delivers the performance benefits it's probably worth changing that
> aspect of the driver API.

I just discovered a different issue (hit upon by Oleksij Rempel while
assisting with testing):

Apparently some drivers tend to rely on the fact that master->cur_msg is not
NULL and always points to the message being transferred.
This could be a show-stopper to this patch set, if it cannot be solved.
I am currently analyzing the different cases, to see if and how they could
eventually get fixed. The crux of the issue is the fact that there are two
different API's towards the driver:

 1. transfer_one(): This call does not provide a reference to the message that
 contains the transfers. So all information stored only in the underlying
 spi_message are not accessible to the driver. Apparently some work around
 this by accessing master->cur_msg.

 2. transfer_one_message(): I suspect this is a newer API. It takes the
 spi_message as argument, thus giving the driver access to all information it
 needs (like return status, and the complete list of transfers).

One driver in particular spi-atmel.c, still used the deprecated
master->cur_msg->is_dma_mapped flag. This is trivially fixed by replacing this
with master->cur_msg_mapped, which is still available even in the sync path. It
is still kinda ugly though, since it is a rather indirect way of obtaining
information about the current transfer buffers it is handling.

Some drivers look at master->cur_msg in interrupt mode if there was an error
interrupt, to decide whether there is an ongoing transfer and sometimes also
to store the error code in master->cur_msg->status. This could be solved by
storing a reference to the current message in the private struct, but like in
the other cases, there is no way to obtain that spi_message pointer from the
transfer_one() call.

In other words, there seem to be both a spi_transfer based API and a
spi_message based API, but problems occur when the spi_transfer based calls
need to know things about the underlying spi_message, which is sometimes
artificially generated in functions like spi_sync_transfer(), so it always
exists.

Any suggestion which is the most desirable course of action?
Try to fix the API inconsistency of wanting to access spi_message when all you
asked for is a spi_transfer, try to work around it or just toss this patch,
give up and call it a failed attempt because the impact is too big?

Best regards,
Mark Brown June 9, 2022, 4:31 p.m. UTC | #8
On Thu, Jun 09, 2022 at 05:34:21PM +0200, David Jander wrote:
> Mark Brown <broonie@kernel.org> wrote:
> > On Wed, Jun 08, 2022 at 09:54:09AM +0200, David Jander wrote:
> > > Mark Brown <broonie@kernel.org> wrote:  

> > I think the rest of it is fine or at least I'm finding it difficult to
> > see anything beyond the concurrency issues.  I think we need to do an
> > audit to find any users that are doing a spi_sync() to complete a
> > sequence of spi_async() operations but I'm not aware of any and if it
> > delivers the performance benefits it's probably worth changing that
> > aspect of the driver API.

> I just discovered a different issue (hit upon by Oleksij Rempel while
> assisting with testing):

> Apparently some drivers tend to rely on the fact that master->cur_msg is not
> NULL and always points to the message being transferred.
> This could be a show-stopper to this patch set, if it cannot be solved.
> I am currently analyzing the different cases, to see if and how they could
> eventually get fixed. The crux of the issue is the fact that there are two
> different API's towards the driver:

That seems resolvable?  If we have two things actually handling a
message at once then we're in for a bad time so we should be able to
arrange for cur_msg to be set in the sync path - the usage in the
message pump between popping off the queue and getting to actually
starting the transfer could be a local variable with the changes to the
sync path I think?

>  1. transfer_one(): This call does not provide a reference to the message that
>  contains the transfers. So all information stored only in the underlying
>  spi_message are not accessible to the driver. Apparently some work around
>  this by accessing master->cur_msg.

>  2. transfer_one_message(): I suspect this is a newer API. It takes the
>  spi_message as argument, thus giving the driver access to all information it
>  needs (like return status, and the complete list of transfers).

It's the other way around - transfer_one() is the result of providing a
transfer_one_message() which factors out more of the code given that a
huge proportion of drivers are for hardware which works at the transfer
level and doesn't understand messages, just as transfer_one_message()
and the message queue are factoring out code which was originally open
coded in drivers.
David Jander June 10, 2022, 7:27 a.m. UTC | #9
On Thu, 9 Jun 2022 17:31:24 +0100
Mark Brown <broonie@kernel.org> wrote:

> On Thu, Jun 09, 2022 at 05:34:21PM +0200, David Jander wrote:
> > Mark Brown <broonie@kernel.org> wrote:  
> > > On Wed, Jun 08, 2022 at 09:54:09AM +0200, David Jander wrote:  
> > > > Mark Brown <broonie@kernel.org> wrote:    
> 
> > > I think the rest of it is fine or at least I'm finding it difficult to
> > > see anything beyond the concurrency issues.  I think we need to do an
> > > audit to find any users that are doing a spi_sync() to complete a
> > > sequence of spi_async() operations but I'm not aware of any and if it
> > > delivers the performance benefits it's probably worth changing that
> > > aspect of the driver API.  
> 
> > I just discovered a different issue (hit upon by Oleksij Rempel while
> > assisting with testing):  
> 
> > Apparently some drivers tend to rely on the fact that master->cur_msg is not
> > NULL and always points to the message being transferred.
> > This could be a show-stopper to this patch set, if it cannot be solved.
> > I am currently analyzing the different cases, to see if and how they could
> > eventually get fixed. The crux of the issue is the fact that there are two
> > different API's towards the driver:  
> 
> That seems resolvable?  If we have two things actually handling a
> message at once then we're in for a bad time so we should be able to
> arrange for cur_msg to be set in the sync path - the usage in the
> message pump between popping off the queue and getting to actually
> starting the transfer could be a local variable with the changes to the
> sync path I think?

Ok, I first thought that this wouldn't be possible without taking the
necessary spinlock, but looking a little closer, I think I understand now.
One question to confirm I understand the code correctly:
An SPI driver that implements its own transfer_one_message() is required to
_always_ call spi_finalize_current_message() _before_ returning, right?
If this is a guarantee and we take the io_mutex at the beginning of
__spi_pump_messages(), then ctlr->cur_msg is only manipulated with the
io_mutex held, and that would make it safe to be used in the sync path, which
is also behind the io_mutex.
Would appreciate if you could confirm this, just to be sure I understand the
code correctly.
The fact that spi_finalize_current_message() is a separate API function, and
not called directly from __spi_pump_messages() had me confused that it might
be called in a different context (from IRQ thread or something like that)
possibly after __spi_pump_messages() had already returned. But that doesn't
make much sense... right?

> >  1. transfer_one(): This call does not provide a reference to the message that
> >  contains the transfers. So all information stored only in the underlying
> >  spi_message are not accessible to the driver. Apparently some work around
> >  this by accessing master->cur_msg.
> 
> >  2. transfer_one_message(): I suspect this is a newer API. It takes the
> >  spi_message as argument, thus giving the driver access to all information it
> >  needs (like return status, and the complete list of transfers).
> 
> It's the other way around - transfer_one() is the result of providing a
> transfer_one_message() which factors out more of the code given that a
> huge proportion of drivers are for hardware which works at the transfer
> level and doesn't understand messages, just as transfer_one_message()
> and the message queue are factoring out code which was originally open
> coded in drivers.

Ah, thanks for the context. This makes sense or course.

Best regards,
Mark Brown June 10, 2022, 1:41 p.m. UTC | #10
On Fri, Jun 10, 2022 at 09:27:53AM +0200, David Jander wrote:
> Mark Brown <broonie@kernel.org> wrote:
> > On Thu, Jun 09, 2022 at 05:34:21PM +0200, David Jander wrote:

> > > Apparently some drivers tend to rely on the fact that master->cur_msg is not
> > > NULL and always points to the message being transferred.
> > > This could be a show-stopper to this patch set, if it cannot be solved.

> > That seems resolvable?  If we have two things actually handling a

> Ok, I first thought that this wouldn't be possible without taking the
> necessary spinlock, but looking a little closer, I think I understand now.
> One question to confirm I understand the code correctly:
> An SPI driver that implements its own transfer_one_message() is required to
> _always_ call spi_finalize_current_message() _before_ returning, right?

Yes, it should.

> If this is a guarantee and we take the io_mutex at the beginning of
> __spi_pump_messages(), then ctlr->cur_msg is only manipulated with the
> io_mutex held, and that would make it safe to be used in the sync path, which
> is also behind the io_mutex.
> Would appreciate if you could confirm this, just to be sure I understand the
> code correctly.

I think that should work.  If there's something missed it should be
possible to make things work that way.

> The fact that spi_finalize_current_message() is a separate API function, and
> not called directly from __spi_pump_messages() had me confused that it might
> be called in a different context (from IRQ thread or something like that)
> possibly after __spi_pump_messages() had already returned. But that doesn't
> make much sense... right?

It *can* be called from interrupt context, the driver can signal the
caller that the message completed without having to bounce up to the
message pump first which helps minimise context switching in the async
case, whatever was waiting for the message can get on with things
without us having to switch through the SPI thread.  However something
should still be in io_mutex when the completion happens at present.  A
brief audit of drivers suggests that this is not in fact reliably the
case though and we've got drivers that were never tested with any queued
transfers AFAICT.  I'll look at that.

Notionally the idea was that we'd be able to start the next transfer
directly from interrupt context on suitable hardware, however nothing I
had access to at the time I was working on this was actually capable of
doing that so I never implemented anything.

Actually, on that point - are you aware of any boards (ideally not
requiring some non-standard DT) that are readily and affordably
available and have useful CAN controllers?  I'm looking to improve my
test setup and they keep coming up.
Mark Brown June 10, 2022, 6:17 p.m. UTC | #11
On Fri, Jun 10, 2022 at 02:41:34PM +0100, Mark Brown wrote:
> On Fri, Jun 10, 2022 at 09:27:53AM +0200, David Jander wrote:
> > Mark Brown <broonie@kernel.org> wrote:

> > Ok, I first thought that this wouldn't be possible without taking the
> > necessary spinlock, but looking a little closer, I think I understand now.
> > One question to confirm I understand the code correctly:
> > An SPI driver that implements its own transfer_one_message() is required to
> > _always_ call spi_finalize_current_message() _before_ returning, right?

> Yes, it should.

Sorry, my mistake - I misremembered how that worked.  They might return
without having completed the message since the message pump is a
workqueue so it'll just go idle, spi_sync() will work because the caller
will block on the completion in the message.  It's cur_msg that's
stopping any new work being scheduled.  I was confusing this with the
handling of individual transfers, it's been a while since I got deep
into this.

The bit about allowing us to finalise in any context still applies - the
goal with that interface is to avoid the need to context switch back to
the message pump to report that the message completed, and ideally
immediately start work on the next message if the controller can cope
with that.

> > If this is a guarantee and we take the io_mutex at the beginning of
> > __spi_pump_messages(), then ctlr->cur_msg is only manipulated with the
> > io_mutex held, and that would make it safe to be used in the sync path, which
> > is also behind the io_mutex.
> > Would appreciate if you could confirm this, just to be sure I understand the
> > code correctly.

> I think that should work.  If there's something missed it should be
> possible to make things work that way.

Can we move the cleanup of cur_msg out of spi_finalize_current_message()
and into the context holding the io_mutex with that blocking on a
completion?  Don't know what the overhead of that is like, I think it
should be fine for the case where we don't actually block on the
completion and it shouldn't be any worse in the case where we're
completing in the interrupt.  Also the kthread_queue_work() that's in
there could be moved out to only the queued case since with your new
approach for spi_sync() we'll idle the hardware in the calling context
and don't need to schedule the thread at all, that should save some
overhead.

Sorry about misremembering this bit.
David Jander June 13, 2022, 9:05 a.m. UTC | #12
On Fri, 10 Jun 2022 19:17:22 +0100
Mark Brown <broonie@kernel.org> wrote:

> On Fri, Jun 10, 2022 at 02:41:34PM +0100, Mark Brown wrote:
> > On Fri, Jun 10, 2022 at 09:27:53AM +0200, David Jander wrote:  
> > > Mark Brown <broonie@kernel.org> wrote:  
> 
> > > Ok, I first thought that this wouldn't be possible without taking the
> > > necessary spinlock, but looking a little closer, I think I understand now.
> > > One question to confirm I understand the code correctly:
> > > An SPI driver that implements its own transfer_one_message() is required to
> > > _always_ call spi_finalize_current_message() _before_ returning, right?  
> 
> > Yes, it should.  
> 
> Sorry, my mistake - I misremembered how that worked.  They might return
> without having completed the message since the message pump is a
> workqueue so it'll just go idle, spi_sync() will work because the caller
> will block on the completion in the message.  It's cur_msg that's
> stopping any new work being scheduled.  I was confusing this with the
> handling of individual transfers, it's been a while since I got deep
> into this.
>
> The bit about allowing us to finalise in any context still applies - the
> goal with that interface is to avoid the need to context switch back to
> the message pump to report that the message completed, and ideally
> immediately start work on the next message if the controller can cope
> with that.

Thinking out loud here, so correct me if I am wrong.
Basically what is happening in that scenario, if we had several async
messages queued up, is this:

 1. __pump_messages() gets first message off queue, stores in cur_msg.
 2. __pump_messages() calls ->transfer_one_message().
 3. transfer_one_message() triggers an IRQ or other context *A*.
 4. transfer_one_message() returns before message is finished.
 5. work queue idles. If someone calls spi_async() now, it will not queue
 work, since ctlr->busy is still true.
 6. *A*: IRQ or other context calls spi_finalize_current_message()
 7. spi_finalize_current_message() schedules new work.
 8. Goto 1.

 Total ctx switches message->message: 2 (at steps 3->6 and 7->8).

Right now, the io_mutex is taken at step 1. and released after step 4. If we
wanted to get an unqueued sync message in between that makes use of cur_msg
safely, then we'd have to change this to take the io_mutex in step 1. and only
release it in step 7. The first obvious problem with that is that the locking
looks unclean. It will be hard to follow, which is probably sub-optimal.
Another potential issue would be in case transfer_one_message() calls
spi_finalize_current_message() before it is actually done accessing the
hardware for some reason, leading to the io_mutex being released too early?
Another solution is what you propose below...

> > > If this is a guarantee and we take the io_mutex at the beginning of
> > > __spi_pump_messages(), then ctlr->cur_msg is only manipulated with the
> > > io_mutex held, and that would make it safe to be used in the sync path, which
> > > is also behind the io_mutex.
> > > Would appreciate if you could confirm this, just to be sure I understand the
> > > code correctly.  
> 
> > I think that should work.  If there's something missed it should be
> > possible to make things work that way.  
> 
> Can we move the cleanup of cur_msg out of spi_finalize_current_message()
> and into the context holding the io_mutex with that blocking on a
> completion?  Don't know what the overhead of that is like, I think it
> should be fine for the case where we don't actually block on the
> completion and it shouldn't be any worse in the case where we're
> completing in the interrupt.  Also the kthread_queue_work() that's in
> there could be moved out to only the queued case since with your new
> approach for spi_sync() we'll idle the hardware in the calling context
> and don't need to schedule the thread at all, that should save some
> overhead.

Ack.
To compare this to my listing above, the difference is that your idea has
cleaner locking:

 1. __pump_messages() gets first message off queue, stores in cur_msg.
 2. __pump_messages() calls ->transfer_one_message().
 3. transfer_one_message() triggers an IRQ or other context *A*.
 4. transfer_one_message() returns before message is finished.
 5. __pump_messages() waits for completion *B*.
 6. If someone calls spi_async() now, it will not queue work, because
 ctlr->busy is still true, and the io_mutex is locked anyway.
 7. *A*: IRQ or other context calls spi_finalize_current_message()
 8. spi_finalize_current_message() completes the completion *B*
 9. *B*: __pump_messages() wakes up on completion, queues next work.
 10. Goto 1.

 Total ctx switches message->message: 2 (at steps 3->7 and 8->9).

So, AFAICS, it looks like the same in terms of overhead. In the presence of an
SMP system, there are only 2 context switches from message to message in both
cases... although each call wait_for_completion() and complete() are
internally spin-locked, so that's that. Won't affect sync path though.

In this scenario, the io_mutex is taken at step 1 and released at step 9,
inside the same function. The io_mutex is released either after
transfer_one_message() returns, or after spi_finalize_current_message() is
called, whatever comes _LAST_. I think I like this. Any potential problems
I am missing?

> Sorry about misremembering this bit.

Never mind ;-)

Best regards,
Mark Brown June 13, 2022, 11:56 a.m. UTC | #13
On Mon, Jun 13, 2022 at 11:05:50AM +0200, David Jander wrote:

> Thinking out loud here, so correct me if I am wrong.
> Basically what is happening in that scenario, if we had several async
> messages queued up, is this:

>  1. __pump_messages() gets first message off queue, stores in cur_msg.
>  2. __pump_messages() calls ->transfer_one_message().
>  3. transfer_one_message() triggers an IRQ or other context *A*.
>  4. transfer_one_message() returns before message is finished.
>  5. work queue idles. If someone calls spi_async() now, it will not queue
>  work, since ctlr->busy is still true.
>  6. *A*: IRQ or other context calls spi_finalize_current_message()
>  7. spi_finalize_current_message() schedules new work.
>  8. Goto 1.

>  Total ctx switches message->message: 2 (at steps 3->6 and 7->8).

Yes, plus also any transition to the context that submitted the message
from completing the message.

> In this scenario, the io_mutex is taken at step 1 and released at step 9,
> inside the same function. The io_mutex is released either after
> transfer_one_message() returns, or after spi_finalize_current_message() is
> called, whatever comes _LAST_. I think I like this. Any potential problems
> I am missing?

I think that about covers it, I'm not seeing any issues right now.
Thomas Kopp July 15, 2022, 7:47 a.m. UTC | #14
Hi David, All,
 
I'd like to test this patchseries but am not sure what the current state is.
Is there an updated patch series with the changes discussed after the initial RFC or should I just test that?

Thanks,
Thomas
Thomas Kopp July 15, 2022, 9:02 a.m. UTC | #15
Hi All,
 
oops, found the V2 - sorry for the noise.

Best Regards,
Thomas
diff mbox series

Patch

diff --git a/drivers/spi/spi.c b/drivers/spi/spi.c
index 1d50051f3d57..401ac2f4758e 100644
--- a/drivers/spi/spi.c
+++ b/drivers/spi/spi.c
@@ -1558,6 +1558,80 @@  static void spi_idle_runtime_pm(struct spi_controller *ctlr)
 	}
 }
 
+static int __spi_pump_transfer_message(struct spi_controller *ctlr,
+		struct spi_message *msg, bool was_busy)
+{
+	struct spi_transfer *xfer;
+	int ret;
+
+	if (!was_busy && ctlr->auto_runtime_pm) {
+		ret = pm_runtime_get_sync(ctlr->dev.parent);
+		if (ret < 0) {
+			pm_runtime_put_noidle(ctlr->dev.parent);
+			dev_err(&ctlr->dev, "Failed to power device: %d\n",
+				ret);
+			return ret;
+		}
+	}
+
+	if (!was_busy)
+		trace_spi_controller_busy(ctlr);
+
+	if (!was_busy && ctlr->prepare_transfer_hardware) {
+		ret = ctlr->prepare_transfer_hardware(ctlr);
+		if (ret) {
+			dev_err(&ctlr->dev,
+				"failed to prepare transfer hardware: %d\n",
+				ret);
+
+			if (ctlr->auto_runtime_pm)
+				pm_runtime_put(ctlr->dev.parent);
+
+			msg->status = ret;
+			spi_finalize_message(msg);
+
+			return ret;
+		}
+	}
+
+	trace_spi_message_start(msg);
+
+	if (ctlr->prepare_message) {
+		ret = ctlr->prepare_message(ctlr, msg);
+		if (ret) {
+			dev_err(&ctlr->dev, "failed to prepare message: %d\n",
+				ret);
+			msg->status = ret;
+			spi_finalize_message(msg);
+			return ret;
+		}
+		msg->prepared = true;
+	}
+
+	ret = spi_map_msg(ctlr, msg);
+	if (ret) {
+		msg->status = ret;
+		spi_finalize_message(msg);
+		return ret;
+	}
+
+	if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) {
+		list_for_each_entry(xfer, &msg->transfers, transfer_list) {
+			xfer->ptp_sts_word_pre = 0;
+			ptp_read_system_prets(xfer->ptp_sts);
+		}
+	}
+
+	ret = ctlr->transfer_one_message(ctlr, msg);
+	if (ret) {
+		dev_err(&ctlr->dev,
+			"failed to transfer one message from queue\n");
+		return ret;
+	}
+
+	return 0;
+}
+
 /**
  * __spi_pump_messages - function which processes spi message queue
  * @ctlr: controller to process queue for
@@ -1573,7 +1647,6 @@  static void spi_idle_runtime_pm(struct spi_controller *ctlr)
  */
 static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
 {
-	struct spi_transfer *xfer;
 	struct spi_message *msg;
 	bool was_busy = false;
 	unsigned long flags;
@@ -1608,6 +1681,7 @@  static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
 			    !ctlr->unprepare_transfer_hardware) {
 				spi_idle_runtime_pm(ctlr);
 				ctlr->busy = false;
+				ctlr->queue_empty = true;
 				trace_spi_controller_idle(ctlr);
 			} else {
 				kthread_queue_work(ctlr->kworker,
@@ -1634,6 +1708,7 @@  static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
 
 		spin_lock_irqsave(&ctlr->queue_lock, flags);
 		ctlr->idling = false;
+		ctlr->queue_empty = true;
 		spin_unlock_irqrestore(&ctlr->queue_lock, flags);
 		return;
 	}
@@ -1650,75 +1725,7 @@  static void __spi_pump_messages(struct spi_controller *ctlr, bool in_kthread)
 	spin_unlock_irqrestore(&ctlr->queue_lock, flags);
 
 	mutex_lock(&ctlr->io_mutex);
-
-	if (!was_busy && ctlr->auto_runtime_pm) {
-		ret = pm_runtime_get_sync(ctlr->dev.parent);
-		if (ret < 0) {
-			pm_runtime_put_noidle(ctlr->dev.parent);
-			dev_err(&ctlr->dev, "Failed to power device: %d\n",
-				ret);
-			mutex_unlock(&ctlr->io_mutex);
-			return;
-		}
-	}
-
-	if (!was_busy)
-		trace_spi_controller_busy(ctlr);
-
-	if (!was_busy && ctlr->prepare_transfer_hardware) {
-		ret = ctlr->prepare_transfer_hardware(ctlr);
-		if (ret) {
-			dev_err(&ctlr->dev,
-				"failed to prepare transfer hardware: %d\n",
-				ret);
-
-			if (ctlr->auto_runtime_pm)
-				pm_runtime_put(ctlr->dev.parent);
-
-			msg->status = ret;
-			spi_finalize_message(msg);
-
-			mutex_unlock(&ctlr->io_mutex);
-			return;
-		}
-	}
-
-	trace_spi_message_start(msg);
-
-	if (ctlr->prepare_message) {
-		ret = ctlr->prepare_message(ctlr, msg);
-		if (ret) {
-			dev_err(&ctlr->dev, "failed to prepare message: %d\n",
-				ret);
-			msg->status = ret;
-			spi_finalize_message(msg);
-			goto out;
-		}
-		msg->prepared = true;
-	}
-
-	ret = spi_map_msg(ctlr, msg);
-	if (ret) {
-		msg->status = ret;
-		spi_finalize_message(msg);
-		goto out;
-	}
-
-	if (!ctlr->ptp_sts_supported && !ctlr->transfer_one) {
-		list_for_each_entry(xfer, &msg->transfers, transfer_list) {
-			xfer->ptp_sts_word_pre = 0;
-			ptp_read_system_prets(xfer->ptp_sts);
-		}
-	}
-
-	ret = ctlr->transfer_one_message(ctlr, msg);
-	if (ret) {
-		dev_err(&ctlr->dev,
-			"failed to transfer one message from queue\n");
-		goto out;
-	}
-
-out:
+	ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
 	mutex_unlock(&ctlr->io_mutex);
 
 	/* Prod the scheduler in case transfer_one() was busy waiting */
@@ -1848,6 +1855,7 @@  static int spi_init_queue(struct spi_controller *ctlr)
 {
 	ctlr->running = false;
 	ctlr->busy = false;
+	ctlr->queue_empty = true;
 
 	ctlr->kworker = kthread_create_worker(0, dev_name(&ctlr->dev));
 	if (IS_ERR(ctlr->kworker)) {
@@ -1941,11 +1949,15 @@  void spi_finalize_message(struct spi_message *mesg)
 
 	mesg->prepared = false;
 
-	spin_lock_irqsave(&ctlr->queue_lock, flags);
-	ctlr->cur_msg = NULL;
-	ctlr->fallback = false;
-	kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
-	spin_unlock_irqrestore(&ctlr->queue_lock, flags);
+	if (!mesg->sync) {
+		spin_lock_irqsave(&ctlr->queue_lock, flags);
+		WARN(ctlr->cur_msg != mesg,
+			"Finalizing queued message that is not the current head of queue!");
+		ctlr->cur_msg = NULL;
+		ctlr->fallback = false;
+		kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
+		spin_unlock_irqrestore(&ctlr->queue_lock, flags);
+	}
 
 	trace_spi_message_done(mesg);
 
@@ -2048,6 +2060,7 @@  static int __spi_queued_transfer(struct spi_device *spi,
 	msg->status = -EINPROGRESS;
 
 	list_add_tail(&msg->queue, &ctlr->queue);
+	ctlr->queue_empty = false;
 	if (!ctlr->busy && need_pump)
 		kthread_queue_work(ctlr->kworker, &ctlr->pump_messages);
 
@@ -3937,6 +3950,42 @@  static int spi_async_locked(struct spi_device *spi, struct spi_message *message)
 
 }
 
+static void __spi_transfer_message_noqueue(struct spi_controller *ctlr, struct spi_message *msg)
+{
+	bool was_busy;
+	int ret;
+
+	mutex_lock(&ctlr->io_mutex);
+
+	/* If another context is idling the device then wait */
+	while (ctlr->idling) {
+		printk(KERN_INFO "spi sync message processing: controller is idling!\n");
+		usleep_range(10000, 11000);
+	}
+
+	was_busy = READ_ONCE(ctlr->busy);
+
+	ret = __spi_pump_transfer_message(ctlr, msg, was_busy);
+	if (ret)
+		goto out;
+
+	if (!was_busy) {
+		kfree(ctlr->dummy_rx);
+		ctlr->dummy_rx = NULL;
+		kfree(ctlr->dummy_tx);
+		ctlr->dummy_tx = NULL;
+		if (ctlr->unprepare_transfer_hardware &&
+		    ctlr->unprepare_transfer_hardware(ctlr))
+			dev_err(&ctlr->dev,
+				"failed to unprepare transfer hardware\n");
+		spi_idle_runtime_pm(ctlr);
+	}
+
+out:
+	mutex_unlock(&ctlr->io_mutex);
+	return;
+}
+
 /*-------------------------------------------------------------------------*/
 
 /*
@@ -3955,51 +4004,52 @@  static int __spi_sync(struct spi_device *spi, struct spi_message *message)
 	DECLARE_COMPLETION_ONSTACK(done);
 	int status;
 	struct spi_controller *ctlr = spi->controller;
-	unsigned long flags;
 
 	status = __spi_validate(spi, message);
 	if (status != 0)
 		return status;
 
-	message->complete = spi_complete;
-	message->context = &done;
 	message->spi = spi;
 
 	SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync);
 	SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync);
 
 	/*
-	 * If we're not using the legacy transfer method then we will
-	 * try to transfer in the calling context so special case.
-	 * This code would be less tricky if we could remove the
-	 * support for driver implemented message queues.
+	 * Checking queue_empty here only guarantees async/sync message
+	 * ordering when coming from the same context. It does not need to
+	 * guard against reentrancy from a different context. The io_mutex
+	 * will catch those cases.
 	 */
-	if (ctlr->transfer == spi_queued_transfer) {
-		spin_lock_irqsave(&ctlr->bus_lock_spinlock, flags);
+	if (READ_ONCE(ctlr->queue_empty)) {
+		message->sync = true;
+		message->actual_length = 0;
+		message->status = -EINPROGRESS;
 
 		trace_spi_message_submit(message);
 
-		status = __spi_queued_transfer(spi, message, false);
+		SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics, spi_sync_immediate);
+		SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics, spi_sync_immediate);
 
-		spin_unlock_irqrestore(&ctlr->bus_lock_spinlock, flags);
-	} else {
-		status = spi_async_locked(spi, message);
+		__spi_transfer_message_noqueue(ctlr, message);
+
+		return message->status;
 	}
 
+	/*
+	 * There are messages in the async queue that could have originated
+	 * from the same context, so we need to preserve ordering.
+	 * Therefor we send the message to the async queue and wait until they
+	 * are completed.
+	 */
+	message->complete = spi_complete;
+	message->context = &done;
+	status = spi_async_locked(spi, message);
 	if (status == 0) {
-		/* Push out the messages in the calling context if we can */
-		if (ctlr->transfer == spi_queued_transfer) {
-			SPI_STATISTICS_INCREMENT_FIELD(ctlr->pcpu_statistics,
-						       spi_sync_immediate);
-			SPI_STATISTICS_INCREMENT_FIELD(spi->pcpu_statistics,
-						       spi_sync_immediate);
-			__spi_pump_messages(ctlr, false);
-		}
-
 		wait_for_completion(&done);
 		status = message->status;
 	}
 	message->context = NULL;
+
 	return status;
 }
 
diff --git a/include/linux/spi/spi.h b/include/linux/spi/spi.h
index 43ec1e262913..9caed8537755 100644
--- a/include/linux/spi/spi.h
+++ b/include/linux/spi/spi.h
@@ -449,6 +449,8 @@  extern struct spi_device *spi_new_ancillary_device(struct spi_device *spi, u8 ch
  * @irq_flags: Interrupt enable state during PTP system timestamping
  * @fallback: fallback to pio if dma transfer return failure with
  *	SPI_TRANS_FAIL_NO_START.
+ * @queue_empty: signal green light for opportunistically skipping the queue
+ * 	for spi_sync transfers.
  *
  * Each SPI controller can communicate with one or more @spi_device
  * children.  These make a small bus, sharing MOSI, MISO and SCK signals
@@ -665,6 +667,9 @@  struct spi_controller {
 
 	/* Interrupt enable state during PTP system timestamping */
 	unsigned long		irq_flags;
+
+	/* Flag for enabling opportunistic skipping of the queue in spi_sync */
+	bool			queue_empty;
 };
 
 static inline void *spi_controller_get_devdata(struct spi_controller *ctlr)
@@ -974,6 +979,7 @@  struct spi_transfer {
  * @state: for use by whichever driver currently owns the message
  * @resources: for resource management when the spi message is processed
  * @prepared: spi_prepare_message was called for the this message
+ * @sync: this message took the direct sync path skipping the async queue
  *
  * A @spi_message is used to execute an atomic sequence of data transfers,
  * each represented by a struct spi_transfer.  The sequence is "atomic"
@@ -1025,7 +1031,10 @@  struct spi_message {
 	struct list_head        resources;
 
 	/* spi_prepare_message was called for this message */
-	bool                    prepared;
+	bool			prepared;
+
+	/* this message is skipping the async queue */
+	bool			sync;
 };
 
 static inline void spi_message_init_no_memset(struct spi_message *m)