diff mbox series

[v3,3/4] media: pisp-be: Split jobs creation and scheduling

Message ID 20240827074018.534354-4-jacopo.mondi@ideasonboard.com (mailing list archive)
State New
Headers show
Series media: pisp-be: Split jobs creation and scheduling | expand

Commit Message

Jacopo Mondi Aug. 27, 2024, 7:40 a.m. UTC
Currently the 'pispbe_schedule()' function does two things:

1) Tries to assemble a job by inspecting all the video node queues
   to make sure all the required buffers are available
2) Submit the job to the hardware

The pispbe_schedule() function is called at:

- video device start_streaming() time
- video device qbuf() time
- irq handler

As assembling a job requires inspecting all queues, it is a rather
time consuming operation which is better not run in IRQ context.

To avoid the executing the time consuming job creation in interrupt
context split the job creation and job scheduling in two distinct
operations. When a well-formed job is created, append it to the
newly introduced 'pispbe->job_queue' where it will be dequeued from
by the scheduling routine.

At start_streaming() and qbuf() time immediately try to schedule a job
if one has been created as the irq handler routing is only called when
a job has completed, and we can't solely rely on it for scheduling new
jobs.

Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
---
 .../platform/raspberrypi/pisp_be/pisp_be.c    | 137 +++++++++++-------
 1 file changed, 83 insertions(+), 54 deletions(-)

Comments

Laurent Pinchart Aug. 31, 2024, 2:39 p.m. UTC | #1
Hi Jacopo,

Thank you for the patch.

On Tue, Aug 27, 2024 at 09:40:17AM +0200, Jacopo Mondi wrote:
> Currently the 'pispbe_schedule()' function does two things:
> 
> 1) Tries to assemble a job by inspecting all the video node queues
>    to make sure all the required buffers are available
> 2) Submit the job to the hardware
> 
> The pispbe_schedule() function is called at:
> 
> - video device start_streaming() time
> - video device qbuf() time
> - irq handler
> 
> As assembling a job requires inspecting all queues, it is a rather
> time consuming operation which is better not run in IRQ context.
> 
> To avoid the executing the time consuming job creation in interrupt
> context split the job creation and job scheduling in two distinct
> operations. When a well-formed job is created, append it to the
> newly introduced 'pispbe->job_queue' where it will be dequeued from
> by the scheduling routine.
> 
> At start_streaming() and qbuf() time immediately try to schedule a job
> if one has been created as the irq handler routing is only called when
> a job has completed, and we can't solely rely on it for scheduling new
> jobs.
> 
> Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> ---
>  .../platform/raspberrypi/pisp_be/pisp_be.c    | 137 +++++++++++-------
>  1 file changed, 83 insertions(+), 54 deletions(-)
> 
> diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> index 73a5c88e25d0..f42541bb4827 100644
> --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> @@ -190,6 +190,8 @@ struct pispbe_hw_enables {
>  
>  /* Records a job configuration and memory addresses. */
>  struct pispbe_job_descriptor {
> +	struct list_head queue;
> +	struct pispbe_buffer *buffers[PISPBE_NUM_NODES];
>  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
>  	struct pisp_be_tiles_config *config;

A candidate for a separate patch, I think storage for
pisp_be_tiles_config should be moved from pisbbe_dev.config to
pispbe_buffer, like we do for the rkisp1 driver. The memory would be
allocated at buffer allocation time (for config node buffers only, of
course).

>  	struct pispbe_hw_enables hw_enables;
> @@ -215,8 +217,10 @@ struct pispbe_dev {
>  	unsigned int sequence;
>  	u32 streaming_map;
>  	struct pispbe_job queued_job, running_job;
> -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> +	/* protects "hw_busy" flag, streaming_map and job_queue*/

s/job_queue/job_queue /

I don't think streaming_map needs to be protected by the spinlock
anymore. It is now accessed only from the qbuf, start_streaming and
stop_streaming operations, which are all protected by the vb2 queue
mutex... scratch that, we use a different mutex for different video
devices.

> +	spinlock_t hw_lock;
>  	bool hw_busy; /* non-zero if a job is queued or is being started */
> +	struct list_head job_queue;
>  	int irq;
>  	u32 hw_version;
>  	u8 done, started;
> @@ -440,41 +444,50 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
>   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
>   * available if the blocks are enabled in the config.
>   *
> - * Needs to be called with hw_lock held.
> + * If all the buffers required to form a job are available, append the
> + * job descriptor to the job queue to be later queued to the HW.
>   *
>   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
>   */
> -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> -			      struct pispbe_job_descriptor *job)
> +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
>  {
>  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> +	struct pispbe_job_descriptor *job;
> +	unsigned int streaming_map;
>  	unsigned int config_index;
>  	struct pispbe_node *node;
> -	unsigned long flags;
>  
> -	lockdep_assert_held(&pispbe->hw_lock);
> +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {

Do you need the irqsave version ?

> +		static const u32 mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
>  
> -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> +		if ((pispbe->streaming_map & mask) != mask)
> +			return -ENODEV;
>  
> -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> -		pispbe->streaming_map) !=
> -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> -		return -ENODEV;
> +		/*
> +		 * Take a copy of streaming_map: nodes activated after this
> +		 * point are ignored when preparing this job.
> +		 */
> +		streaming_map = pispbe->streaming_map;
> +	}
> +
> +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> +	if (!job)
> +		return -ENOMEM;
>  
>  	node = &pispbe->node[CONFIG_NODE];
> -	spin_lock_irqsave(&node->ready_lock, flags);
> -	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> -						    struct pispbe_buffer,
> -						    ready_list);
> -	if (buf[CONFIG_NODE]) {
> +
> +	scoped_guard(spinlock_irqsave, &node->ready_lock) {

Same here.

> +		buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> +							    struct pispbe_buffer,
> +							    ready_list);
> +		if (!buf[CONFIG_NODE]) {
> +			kfree(job);
> +			return -ENODEV;
> +		}
> +
>  		list_del(&buf[CONFIG_NODE]->ready_list);
> -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> +		job->buffers[CONFIG_NODE] = buf[CONFIG_NODE];
>  	}
> -	spin_unlock_irqrestore(&node->ready_lock, flags);
> -
> -	/* Exit early if no config buffer has been queued. */
> -	if (!buf[CONFIG_NODE])
> -		return -ENODEV;
>  
>  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
>  	job->config = &pispbe->config[config_index];
> @@ -495,7 +508,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  			continue;
>  
>  		buf[i] = NULL;
> -		if (!(pispbe->streaming_map & BIT(i)))
> +		if (!(streaming_map & BIT(i)))
>  			continue;
>  
>  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> @@ -522,25 +535,27 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  		node = &pispbe->node[i];
>  
>  		/* Pull a buffer from each V4L2 queue to form the queued job */
> -		spin_lock_irqsave(&node->ready_lock, flags);
> +		spin_lock(&node->ready_lock);
>  		buf[i] = list_first_entry_or_null(&node->ready_queue,
>  						  struct pispbe_buffer,
>  						  ready_list);
>  		if (buf[i]) {
>  			list_del(&buf[i]->ready_list);
> -			pispbe->queued_job.buf[i] = buf[i];
> +			job->buffers[i] = buf[i];
>  		}
> -		spin_unlock_irqrestore(&node->ready_lock, flags);
> +		spin_unlock(&node->ready_lock);
>  
>  		if (!buf[i] && !ignore_buffers)

The ignore_buffers logic seems weird to me. I don't understand why we
take buffers out of the ready_queue for the nodes that are not used by
this job. The problem isn't related to this patch, so it can be fixed
separately.

>  			goto err_return_buffers;
>  	}
>  
> -	pispbe->queued_job.valid = true;
> -
>  	/* Convert buffers to DMA addresses for the hardware */
>  	pispbe_xlate_addrs(pispbe, job, buf);
>  
> +	spin_lock(&pispbe->hw_lock);
> +	list_add_tail(&job->queue, &pispbe->job_queue);
> +	spin_unlock(&pispbe->hw_lock);
> +
>  	return 0;
>  
>  err_return_buffers:
> @@ -551,33 +566,41 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  			continue;
>  
>  		/* Return the buffer to the ready_list queue */
> -		spin_lock_irqsave(&n->ready_lock, flags);
> +		spin_lock(&n->ready_lock);
>  		list_add(&buf[i]->ready_list, &n->ready_queue);
> -		spin_unlock_irqrestore(&n->ready_lock, flags);
> +		spin_unlock(&n->ready_lock);

Should pispbe_node_buffer_queue() and pispbe_node_stop_streaming()
switch away from spin_lock_irqsave() too then ? Actually, can't we drop
the lock completely, now that ready_queue is only accessed from vb2 ops
all protected by the vb2 queue lock ?

>  	}
>  
> -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> +	kfree(job);
>  
>  	return -ENODEV;
>  }
>  
>  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  {
> -	struct pispbe_job_descriptor job;
> -	unsigned long flags;
> -	int ret;
> +	struct pispbe_job_descriptor *job;
>  
> -	spin_lock_irqsave(&pispbe->hw_lock, flags);
> +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
> +		if (clear_hw_busy)
> +			pispbe->hw_busy = false;
>  
> -	if (clear_hw_busy)
> -		pispbe->hw_busy = false;
> +		if (pispbe->hw_busy)
> +			return;
>  
> -	if (pispbe->hw_busy)
> -		goto unlock_and_return;
> +		job = list_first_entry_or_null(&pispbe->job_queue,
> +					       struct pispbe_job_descriptor,
> +					       queue);
> +		if (!job)
> +			return;
>  
> -	ret = pispbe_prepare_job(pispbe, &job);
> -	if (ret)
> -		goto unlock_and_return;
> +		list_del(&job->queue);
> +
> +		for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> +			pispbe->queued_job.buf[i] = job->buffers[i];
> +		 pispbe->queued_job.valid = true;

Ideally this should be replaced with a pointer to the job descriptor,
but it can be done in a second step.

Ideally also, this should go to pispbe_queue_job(). I however don't
understand the logic that handles queued_job in pispbe_isr() and why the
interrupt handler can complete the queued job in some circumstances, so
I'm not sure how it would interact with moving this code to the
pispbe_queue_job() function.

> +
> +		pispbe->hw_busy = true;
> +	}
>  
>  	/*
>  	 * We can kick the job off without the hw_lock, as this can
> @@ -585,16 +608,8 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  	 * only when the following job has been queued and an interrupt
>  	 * is rised.
>  	 */
> -	pispbe->hw_busy = true;
> -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> -
> -	pispbe_queue_job(pispbe, &job);
> -
> -	return;
> -
> -unlock_and_return:
> -	/* No job has been queued, just release the lock and return. */
> -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> +	pispbe_queue_job(pispbe, job);
> +	kfree(job);
>  }
>  
>  static void pispbe_isr_jobdone(struct pispbe_dev *pispbe,
> @@ -857,7 +872,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
>  	 * Every time we add a buffer, check if there's now some work for the hw
>  	 * to do.
>  	 */
> -	pispbe_schedule(pispbe, false);
> +	if (!pispbe_prepare_job(pispbe))
> +		pispbe_schedule(pispbe, false);
>  }
>  
>  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> @@ -883,7 +899,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
>  		node->pispbe->streaming_map);
>  
>  	/* Maybe we're ready to run. */
> -	pispbe_schedule(pispbe, false);
> +	if (!pispbe_prepare_job(pispbe))
> +		pispbe_schedule(pispbe, false);
>  
>  	return 0;
>  
> @@ -935,6 +952,16 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
>  
>  	spin_lock_irqsave(&pispbe->hw_lock, flags);
>  	pispbe->streaming_map &= ~BIT(node->id);
> +
> +	/* Release all jobs once all nodes have stopped streaming. */
> +	if (pispbe->streaming_map == 0) {
> +		struct pispbe_job_descriptor *job, *temp;
> +
> +		list_for_each_entry_safe(job, temp, &pispbe->job_queue, queue) {
> +			list_del(&job->queue);
> +			kfree(job);
> +		}
> +	}
>  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
>  
>  	pm_runtime_mark_last_busy(pispbe->dev);
> @@ -1677,6 +1704,8 @@ static int pispbe_probe(struct platform_device *pdev)
>  	if (!pispbe)
>  		return -ENOMEM;
>  
> +	INIT_LIST_HEAD(&pispbe->job_queue);
> +
>  	dev_set_drvdata(&pdev->dev, pispbe);
>  	pispbe->dev = &pdev->dev;
>  	platform_set_drvdata(pdev, pispbe);
Jacopo Mondi Aug. 31, 2024, 3:15 p.m. UTC | #2
On Sat, Aug 31, 2024 at 05:39:09PM GMT, Laurent Pinchart wrote:
> Hi Jacopo,
>
> Thank you for the patch.
>
> On Tue, Aug 27, 2024 at 09:40:17AM +0200, Jacopo Mondi wrote:
> > Currently the 'pispbe_schedule()' function does two things:
> >
> > 1) Tries to assemble a job by inspecting all the video node queues
> >    to make sure all the required buffers are available
> > 2) Submit the job to the hardware
> >
> > The pispbe_schedule() function is called at:
> >
> > - video device start_streaming() time
> > - video device qbuf() time
> > - irq handler
> >
> > As assembling a job requires inspecting all queues, it is a rather
> > time consuming operation which is better not run in IRQ context.
> >
> > To avoid the executing the time consuming job creation in interrupt
> > context split the job creation and job scheduling in two distinct
> > operations. When a well-formed job is created, append it to the
> > newly introduced 'pispbe->job_queue' where it will be dequeued from
> > by the scheduling routine.
> >
> > At start_streaming() and qbuf() time immediately try to schedule a job
> > if one has been created as the irq handler routing is only called when
> > a job has completed, and we can't solely rely on it for scheduling new
> > jobs.
> >
> > Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> > ---
> >  .../platform/raspberrypi/pisp_be/pisp_be.c    | 137 +++++++++++-------
> >  1 file changed, 83 insertions(+), 54 deletions(-)
> >
> > diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > index 73a5c88e25d0..f42541bb4827 100644
> > --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > @@ -190,6 +190,8 @@ struct pispbe_hw_enables {
> >
> >  /* Records a job configuration and memory addresses. */
> >  struct pispbe_job_descriptor {
> > +	struct list_head queue;
> > +	struct pispbe_buffer *buffers[PISPBE_NUM_NODES];
> >  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
> >  	struct pisp_be_tiles_config *config;
>
> A candidate for a separate patch, I think storage for
> pisp_be_tiles_config should be moved from pisbbe_dev.config to
> pispbe_buffer, like we do for the rkisp1 driver. The memory would be
> allocated at buffer allocation time (for config node buffers only, of
> course).
>
> >  	struct pispbe_hw_enables hw_enables;
> > @@ -215,8 +217,10 @@ struct pispbe_dev {
> >  	unsigned int sequence;
> >  	u32 streaming_map;
> >  	struct pispbe_job queued_job, running_job;
> > -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> > +	/* protects "hw_busy" flag, streaming_map and job_queue*/
>
> s/job_queue/job_queue /
>
> I don't think streaming_map needs to be protected by the spinlock
> anymore. It is now accessed only from the qbuf, start_streaming and
> stop_streaming operations, which are all protected by the vb2 queue
> mutex... scratch that, we use a different mutex for different video
> devices.
>
> > +	spinlock_t hw_lock;
> >  	bool hw_busy; /* non-zero if a job is queued or is being started */
> > +	struct list_head job_queue;
> >  	int irq;
> >  	u32 hw_version;
> >  	u8 done, started;
> > @@ -440,41 +444,50 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
> >   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
> >   * available if the blocks are enabled in the config.
> >   *
> > - * Needs to be called with hw_lock held.
> > + * If all the buffers required to form a job are available, append the
> > + * job descriptor to the job queue to be later queued to the HW.
> >   *
> >   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
> >   */
> > -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > -			      struct pispbe_job_descriptor *job)
> > +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
> >  {
> >  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> > +	struct pispbe_job_descriptor *job;
> > +	unsigned int streaming_map;
> >  	unsigned int config_index;
> >  	struct pispbe_node *node;
> > -	unsigned long flags;
> >
> > -	lockdep_assert_held(&pispbe->hw_lock);
> > +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
>
> Do you need the irqsave version ?
>

Ah no but I did
        scoped_guard(spin_lock
and it didn't compile so I though only spinlock_irqsave was valid :/

> > +		static const u32 mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
> >
> > -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> > +		if ((pispbe->streaming_map & mask) != mask)
> > +			return -ENODEV;
> >
> > -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> > -		pispbe->streaming_map) !=
> > -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> > -		return -ENODEV;
> > +		/*
> > +		 * Take a copy of streaming_map: nodes activated after this
> > +		 * point are ignored when preparing this job.
> > +		 */
> > +		streaming_map = pispbe->streaming_map;
> > +	}
> > +
> > +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> > +	if (!job)
> > +		return -ENOMEM;
> >
> >  	node = &pispbe->node[CONFIG_NODE];
> > -	spin_lock_irqsave(&node->ready_lock, flags);
> > -	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > -						    struct pispbe_buffer,
> > -						    ready_list);
> > -	if (buf[CONFIG_NODE]) {
> > +
> > +	scoped_guard(spinlock_irqsave, &node->ready_lock) {
>
> Same here.
>
> > +		buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > +							    struct pispbe_buffer,
> > +							    ready_list);
> > +		if (!buf[CONFIG_NODE]) {
> > +			kfree(job);
> > +			return -ENODEV;
> > +		}
> > +
> >  		list_del(&buf[CONFIG_NODE]->ready_list);
> > -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> > +		job->buffers[CONFIG_NODE] = buf[CONFIG_NODE];
> >  	}
> > -	spin_unlock_irqrestore(&node->ready_lock, flags);
> > -
> > -	/* Exit early if no config buffer has been queued. */
> > -	if (!buf[CONFIG_NODE])
> > -		return -ENODEV;
> >
> >  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
> >  	job->config = &pispbe->config[config_index];
> > @@ -495,7 +508,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  			continue;
> >
> >  		buf[i] = NULL;
> > -		if (!(pispbe->streaming_map & BIT(i)))
> > +		if (!(streaming_map & BIT(i)))
> >  			continue;
> >
> >  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> > @@ -522,25 +535,27 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  		node = &pispbe->node[i];
> >
> >  		/* Pull a buffer from each V4L2 queue to form the queued job */
> > -		spin_lock_irqsave(&node->ready_lock, flags);
> > +		spin_lock(&node->ready_lock);
> >  		buf[i] = list_first_entry_or_null(&node->ready_queue,
> >  						  struct pispbe_buffer,
> >  						  ready_list);
> >  		if (buf[i]) {
> >  			list_del(&buf[i]->ready_list);
> > -			pispbe->queued_job.buf[i] = buf[i];
> > +			job->buffers[i] = buf[i];
> >  		}
> > -		spin_unlock_irqrestore(&node->ready_lock, flags);
> > +		spin_unlock(&node->ready_lock);
> >
> >  		if (!buf[i] && !ignore_buffers)
>
> The ignore_buffers logic seems weird to me. I don't understand why we
> take buffers out of the ready_queue for the nodes that are not used by
> this job. The problem isn't related to this patch, so it can be fixed
> separately.
>
> >  			goto err_return_buffers;
> >  	}
> >
> > -	pispbe->queued_job.valid = true;
> > -
> >  	/* Convert buffers to DMA addresses for the hardware */
> >  	pispbe_xlate_addrs(pispbe, job, buf);
> >
> > +	spin_lock(&pispbe->hw_lock);
> > +	list_add_tail(&job->queue, &pispbe->job_queue);
> > +	spin_unlock(&pispbe->hw_lock);
> > +
> >  	return 0;
> >
> >  err_return_buffers:
> > @@ -551,33 +566,41 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  			continue;
> >
> >  		/* Return the buffer to the ready_list queue */
> > -		spin_lock_irqsave(&n->ready_lock, flags);
> > +		spin_lock(&n->ready_lock);
> >  		list_add(&buf[i]->ready_list, &n->ready_queue);
> > -		spin_unlock_irqrestore(&n->ready_lock, flags);
> > +		spin_unlock(&n->ready_lock);
>
> Should pispbe_node_buffer_queue() and pispbe_node_stop_streaming()
> switch away from spin_lock_irqsave() too then ? Actually, can't we drop

Isn't this done here ? :)

> the lock completely, now that ready_queue is only accessed from vb2 ops
> all protected by the vb2 queue lock ?
>

Ah probably yes. And probably in prepare_job() too as it is only
called from vb2 ops callbacks

This would really simplify the code

> >  	}
> >
> > -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> > +	kfree(job);
> >
> >  	return -ENODEV;
> >  }
> >
> >  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  {
> > -	struct pispbe_job_descriptor job;
> > -	unsigned long flags;
> > -	int ret;
> > +	struct pispbe_job_descriptor *job;
> >
> > -	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
> > +		if (clear_hw_busy)
> > +			pispbe->hw_busy = false;
> >
> > -	if (clear_hw_busy)
> > -		pispbe->hw_busy = false;
> > +		if (pispbe->hw_busy)
> > +			return;
> >
> > -	if (pispbe->hw_busy)
> > -		goto unlock_and_return;
> > +		job = list_first_entry_or_null(&pispbe->job_queue,
> > +					       struct pispbe_job_descriptor,
> > +					       queue);
> > +		if (!job)
> > +			return;
> >
> > -	ret = pispbe_prepare_job(pispbe, &job);
> > -	if (ret)
> > -		goto unlock_and_return;
> > +		list_del(&job->queue);
> > +
> > +		for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> > +			pispbe->queued_job.buf[i] = job->buffers[i];
> > +		 pispbe->queued_job.valid = true;
>
> Ideally this should be replaced with a pointer to the job descriptor,
> but it can be done in a second step.

I know, but it makes backporting the patch to the BSP which has
multi-group support quite impossible. As this doesn't have any
performance impact I would keep it this way to ease backporting

>
> Ideally also, this should go to pispbe_queue_job(). I however don't
> understand the logic that handles queued_job in pispbe_isr() and why the
> interrupt handler can complete the queued job in some circumstances, so
> I'm not sure how it would interact with moving this code to the
> pispbe_queue_job() function.
>

I would refrain from touching that part of the code for little gain :)

> > +
> > +		pispbe->hw_busy = true;
> > +	}
> >
> >  	/*
> >  	 * We can kick the job off without the hw_lock, as this can
> > @@ -585,16 +608,8 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  	 * only when the following job has been queued and an interrupt
> >  	 * is rised.
> >  	 */
> > -	pispbe->hw_busy = true;
> > -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > -
> > -	pispbe_queue_job(pispbe, &job);
> > -
> > -	return;
> > -
> > -unlock_and_return:
> > -	/* No job has been queued, just release the lock and return. */
> > -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > +	pispbe_queue_job(pispbe, job);
> > +	kfree(job);
> >  }
> >
> >  static void pispbe_isr_jobdone(struct pispbe_dev *pispbe,
> > @@ -857,7 +872,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
> >  	 * Every time we add a buffer, check if there's now some work for the hw
> >  	 * to do.
> >  	 */
> > -	pispbe_schedule(pispbe, false);
> > +	if (!pispbe_prepare_job(pispbe))
> > +		pispbe_schedule(pispbe, false);
> >  }
> >
> >  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > @@ -883,7 +899,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> >  		node->pispbe->streaming_map);
> >
> >  	/* Maybe we're ready to run. */
> > -	pispbe_schedule(pispbe, false);
> > +	if (!pispbe_prepare_job(pispbe))
> > +		pispbe_schedule(pispbe, false);
> >
> >  	return 0;
> >
> > @@ -935,6 +952,16 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
> >
> >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> >  	pispbe->streaming_map &= ~BIT(node->id);
> > +
> > +	/* Release all jobs once all nodes have stopped streaming. */
> > +	if (pispbe->streaming_map == 0) {
> > +		struct pispbe_job_descriptor *job, *temp;
> > +
> > +		list_for_each_entry_safe(job, temp, &pispbe->job_queue, queue) {
> > +			list_del(&job->queue);
> > +			kfree(job);
> > +		}
> > +	}
> >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> >
> >  	pm_runtime_mark_last_busy(pispbe->dev);
> > @@ -1677,6 +1704,8 @@ static int pispbe_probe(struct platform_device *pdev)
> >  	if (!pispbe)
> >  		return -ENOMEM;
> >
> > +	INIT_LIST_HEAD(&pispbe->job_queue);
> > +
> >  	dev_set_drvdata(&pdev->dev, pispbe);
> >  	pispbe->dev = &pdev->dev;
> >  	platform_set_drvdata(pdev, pispbe);
>
> --
> Regards,
>
> Laurent Pinchart
Laurent Pinchart Aug. 31, 2024, 3:27 p.m. UTC | #3
On Sat, Aug 31, 2024 at 05:15:06PM +0200, Jacopo Mondi wrote:
> On Sat, Aug 31, 2024 at 05:39:09PM GMT, Laurent Pinchart wrote:
> > On Tue, Aug 27, 2024 at 09:40:17AM +0200, Jacopo Mondi wrote:
> > > Currently the 'pispbe_schedule()' function does two things:
> > >
> > > 1) Tries to assemble a job by inspecting all the video node queues
> > >    to make sure all the required buffers are available
> > > 2) Submit the job to the hardware
> > >
> > > The pispbe_schedule() function is called at:
> > >
> > > - video device start_streaming() time
> > > - video device qbuf() time
> > > - irq handler
> > >
> > > As assembling a job requires inspecting all queues, it is a rather
> > > time consuming operation which is better not run in IRQ context.
> > >
> > > To avoid the executing the time consuming job creation in interrupt
> > > context split the job creation and job scheduling in two distinct
> > > operations. When a well-formed job is created, append it to the
> > > newly introduced 'pispbe->job_queue' where it will be dequeued from
> > > by the scheduling routine.
> > >
> > > At start_streaming() and qbuf() time immediately try to schedule a job
> > > if one has been created as the irq handler routing is only called when
> > > a job has completed, and we can't solely rely on it for scheduling new
> > > jobs.
> > >
> > > Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> > > ---
> > >  .../platform/raspberrypi/pisp_be/pisp_be.c    | 137 +++++++++++-------
> > >  1 file changed, 83 insertions(+), 54 deletions(-)
> > >
> > > diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > index 73a5c88e25d0..f42541bb4827 100644
> > > --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > @@ -190,6 +190,8 @@ struct pispbe_hw_enables {
> > >
> > >  /* Records a job configuration and memory addresses. */
> > >  struct pispbe_job_descriptor {
> > > +	struct list_head queue;
> > > +	struct pispbe_buffer *buffers[PISPBE_NUM_NODES];
> > >  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
> > >  	struct pisp_be_tiles_config *config;
> >
> > A candidate for a separate patch, I think storage for
> > pisp_be_tiles_config should be moved from pisbbe_dev.config to
> > pispbe_buffer, like we do for the rkisp1 driver. The memory would be
> > allocated at buffer allocation time (for config node buffers only, of
> > course).
> >
> > >  	struct pispbe_hw_enables hw_enables;
> > > @@ -215,8 +217,10 @@ struct pispbe_dev {
> > >  	unsigned int sequence;
> > >  	u32 streaming_map;
> > >  	struct pispbe_job queued_job, running_job;
> > > -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> > > +	/* protects "hw_busy" flag, streaming_map and job_queue*/
> >
> > s/job_queue/job_queue /
> >
> > I don't think streaming_map needs to be protected by the spinlock
> > anymore. It is now accessed only from the qbuf, start_streaming and
> > stop_streaming operations, which are all protected by the vb2 queue
> > mutex... scratch that, we use a different mutex for different video
> > devices.
> >
> > > +	spinlock_t hw_lock;
> > >  	bool hw_busy; /* non-zero if a job is queued or is being started */
> > > +	struct list_head job_queue;
> > >  	int irq;
> > >  	u32 hw_version;
> > >  	u8 done, started;
> > > @@ -440,41 +444,50 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
> > >   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
> > >   * available if the blocks are enabled in the config.
> > >   *
> > > - * Needs to be called with hw_lock held.
> > > + * If all the buffers required to form a job are available, append the
> > > + * job descriptor to the job queue to be later queued to the HW.
> > >   *
> > >   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
> > >   */
> > > -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > > -			      struct pispbe_job_descriptor *job)
> > > +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
> > >  {
> > >  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> > > +	struct pispbe_job_descriptor *job;
> > > +	unsigned int streaming_map;
> > >  	unsigned int config_index;
> > >  	struct pispbe_node *node;
> > > -	unsigned long flags;
> > >
> > > -	lockdep_assert_held(&pispbe->hw_lock);
> > > +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
> >
> > Do you need the irqsave version ?
> 
> Ah no but I did
>         scoped_guard(spin_lock
> and it didn't compile so I though only spinlock_irqsave was valid :/
> 
> > > +		static const u32 mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
> > >
> > > -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> > > +		if ((pispbe->streaming_map & mask) != mask)
> > > +			return -ENODEV;
> > >
> > > -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> > > -		pispbe->streaming_map) !=
> > > -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> > > -		return -ENODEV;
> > > +		/*
> > > +		 * Take a copy of streaming_map: nodes activated after this
> > > +		 * point are ignored when preparing this job.
> > > +		 */
> > > +		streaming_map = pispbe->streaming_map;
> > > +	}
> > > +
> > > +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> > > +	if (!job)
> > > +		return -ENOMEM;
> > >
> > >  	node = &pispbe->node[CONFIG_NODE];
> > > -	spin_lock_irqsave(&node->ready_lock, flags);
> > > -	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > > -						    struct pispbe_buffer,
> > > -						    ready_list);
> > > -	if (buf[CONFIG_NODE]) {
> > > +
> > > +	scoped_guard(spinlock_irqsave, &node->ready_lock) {
> >
> > Same here.
> >
> > > +		buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > > +							    struct pispbe_buffer,
> > > +							    ready_list);
> > > +		if (!buf[CONFIG_NODE]) {
> > > +			kfree(job);
> > > +			return -ENODEV;
> > > +		}
> > > +
> > >  		list_del(&buf[CONFIG_NODE]->ready_list);
> > > -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> > > +		job->buffers[CONFIG_NODE] = buf[CONFIG_NODE];
> > >  	}
> > > -	spin_unlock_irqrestore(&node->ready_lock, flags);
> > > -
> > > -	/* Exit early if no config buffer has been queued. */
> > > -	if (!buf[CONFIG_NODE])
> > > -		return -ENODEV;
> > >
> > >  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
> > >  	job->config = &pispbe->config[config_index];
> > > @@ -495,7 +508,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  			continue;
> > >
> > >  		buf[i] = NULL;
> > > -		if (!(pispbe->streaming_map & BIT(i)))
> > > +		if (!(streaming_map & BIT(i)))
> > >  			continue;
> > >
> > >  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> > > @@ -522,25 +535,27 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  		node = &pispbe->node[i];
> > >
> > >  		/* Pull a buffer from each V4L2 queue to form the queued job */
> > > -		spin_lock_irqsave(&node->ready_lock, flags);
> > > +		spin_lock(&node->ready_lock);
> > >  		buf[i] = list_first_entry_or_null(&node->ready_queue,
> > >  						  struct pispbe_buffer,
> > >  						  ready_list);
> > >  		if (buf[i]) {
> > >  			list_del(&buf[i]->ready_list);
> > > -			pispbe->queued_job.buf[i] = buf[i];
> > > +			job->buffers[i] = buf[i];
> > >  		}
> > > -		spin_unlock_irqrestore(&node->ready_lock, flags);
> > > +		spin_unlock(&node->ready_lock);
> > >
> > >  		if (!buf[i] && !ignore_buffers)
> >
> > The ignore_buffers logic seems weird to me. I don't understand why we
> > take buffers out of the ready_queue for the nodes that are not used by
> > this job. The problem isn't related to this patch, so it can be fixed
> > separately.
> >
> > >  			goto err_return_buffers;
> > >  	}
> > >
> > > -	pispbe->queued_job.valid = true;
> > > -
> > >  	/* Convert buffers to DMA addresses for the hardware */
> > >  	pispbe_xlate_addrs(pispbe, job, buf);
> > >
> > > +	spin_lock(&pispbe->hw_lock);
> > > +	list_add_tail(&job->queue, &pispbe->job_queue);
> > > +	spin_unlock(&pispbe->hw_lock);
> > > +
> > >  	return 0;
> > >
> > >  err_return_buffers:
> > > @@ -551,33 +566,41 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  			continue;
> > >
> > >  		/* Return the buffer to the ready_list queue */
> > > -		spin_lock_irqsave(&n->ready_lock, flags);
> > > +		spin_lock(&n->ready_lock);
> > >  		list_add(&buf[i]->ready_list, &n->ready_queue);
> > > -		spin_unlock_irqrestore(&n->ready_lock, flags);
> > > +		spin_unlock(&n->ready_lock);
> >
> > Should pispbe_node_buffer_queue() and pispbe_node_stop_streaming()
> > switch away from spin_lock_irqsave() too then ? Actually, can't we drop
> 
> Isn't this done here ? :)

Not that I can see. You only change the lock functions in
pispbe_prepare_job().

> > the lock completely, now that ready_queue is only accessed from vb2 ops
> > all protected by the vb2 queue lock ?
> 
> Ah probably yes. And probably in prepare_job() too as it is only
> called from vb2 ops callbacks
> 
> This would really simplify the code
> 
> > >  	}
> > >
> > > -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> > > +	kfree(job);
> > >
> > >  	return -ENODEV;
> > >  }
> > >
> > >  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  {
> > > -	struct pispbe_job_descriptor job;
> > > -	unsigned long flags;
> > > -	int ret;
> > > +	struct pispbe_job_descriptor *job;
> > >
> > > -	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > > +	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
> > > +		if (clear_hw_busy)
> > > +			pispbe->hw_busy = false;
> > >
> > > -	if (clear_hw_busy)
> > > -		pispbe->hw_busy = false;
> > > +		if (pispbe->hw_busy)
> > > +			return;
> > >
> > > -	if (pispbe->hw_busy)
> > > -		goto unlock_and_return;
> > > +		job = list_first_entry_or_null(&pispbe->job_queue,
> > > +					       struct pispbe_job_descriptor,
> > > +					       queue);
> > > +		if (!job)
> > > +			return;
> > >
> > > -	ret = pispbe_prepare_job(pispbe, &job);
> > > -	if (ret)
> > > -		goto unlock_and_return;
> > > +		list_del(&job->queue);
> > > +
> > > +		for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> > > +			pispbe->queued_job.buf[i] = job->buffers[i];
> > > +		 pispbe->queued_job.valid = true;
> >
> > Ideally this should be replaced with a pointer to the job descriptor,
> > but it can be done in a second step.
> 
> I know, but it makes backporting the patch to the BSP which has
> multi-group support quite impossible. As this doesn't have any
> performance impact I would keep it this way to ease backporting

You know my opinion about BSPs :-) I'm OK keeping this for a bit longer
until we implement proper multi-context support in mainline, assuming it
can be done in a reasonable time frame.

> > Ideally also, this should go to pispbe_queue_job(). I however don't
> > understand the logic that handles queued_job in pispbe_isr() and why the
> > interrupt handler can complete the queued job in some circumstances, so
> > I'm not sure how it would interact with moving this code to the
> > pispbe_queue_job() function.
> 
> I would refrain from touching that part of the code for little gain :)

Once queue_job gets replaced with a pointer that code will need to be
touched, but we can wait until then.

> > > +
> > > +		pispbe->hw_busy = true;
> > > +	}
> > >
> > >  	/*
> > >  	 * We can kick the job off without the hw_lock, as this can
> > > @@ -585,16 +608,8 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  	 * only when the following job has been queued and an interrupt
> > >  	 * is rised.
> > >  	 */
> > > -	pispbe->hw_busy = true;
> > > -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > > -
> > > -	pispbe_queue_job(pispbe, &job);
> > > -
> > > -	return;
> > > -
> > > -unlock_and_return:
> > > -	/* No job has been queued, just release the lock and return. */
> > > -	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > > +	pispbe_queue_job(pispbe, job);
> > > +	kfree(job);
> > >  }
> > >
> > >  static void pispbe_isr_jobdone(struct pispbe_dev *pispbe,
> > > @@ -857,7 +872,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
> > >  	 * Every time we add a buffer, check if there's now some work for the hw
> > >  	 * to do.
> > >  	 */
> > > -	pispbe_schedule(pispbe, false);
> > > +	if (!pispbe_prepare_job(pispbe))
> > > +		pispbe_schedule(pispbe, false);
> > >  }
> > >
> > >  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > > @@ -883,7 +899,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > >  		node->pispbe->streaming_map);
> > >
> > >  	/* Maybe we're ready to run. */
> > > -	pispbe_schedule(pispbe, false);
> > > +	if (!pispbe_prepare_job(pispbe))
> > > +		pispbe_schedule(pispbe, false);
> > >
> > >  	return 0;
> > >
> > > @@ -935,6 +952,16 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
> > >
> > >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > >  	pispbe->streaming_map &= ~BIT(node->id);
> > > +
> > > +	/* Release all jobs once all nodes have stopped streaming. */
> > > +	if (pispbe->streaming_map == 0) {
> > > +		struct pispbe_job_descriptor *job, *temp;
> > > +
> > > +		list_for_each_entry_safe(job, temp, &pispbe->job_queue, queue) {
> > > +			list_del(&job->queue);
> > > +			kfree(job);
> > > +		}
> > > +	}
> > >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > >
> > >  	pm_runtime_mark_last_busy(pispbe->dev);
> > > @@ -1677,6 +1704,8 @@ static int pispbe_probe(struct platform_device *pdev)
> > >  	if (!pispbe)
> > >  		return -ENOMEM;
> > >
> > > +	INIT_LIST_HEAD(&pispbe->job_queue);
> > > +
> > >  	dev_set_drvdata(&pdev->dev, pispbe);
> > >  	pispbe->dev = &pdev->dev;
> > >  	platform_set_drvdata(pdev, pispbe);
diff mbox series

Patch

diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
index 73a5c88e25d0..f42541bb4827 100644
--- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
+++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
@@ -190,6 +190,8 @@  struct pispbe_hw_enables {
 
 /* Records a job configuration and memory addresses. */
 struct pispbe_job_descriptor {
+	struct list_head queue;
+	struct pispbe_buffer *buffers[PISPBE_NUM_NODES];
 	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
 	struct pisp_be_tiles_config *config;
 	struct pispbe_hw_enables hw_enables;
@@ -215,8 +217,10 @@  struct pispbe_dev {
 	unsigned int sequence;
 	u32 streaming_map;
 	struct pispbe_job queued_job, running_job;
-	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
+	/* protects "hw_busy" flag, streaming_map and job_queue*/
+	spinlock_t hw_lock;
 	bool hw_busy; /* non-zero if a job is queued or is being started */
+	struct list_head job_queue;
 	int irq;
 	u32 hw_version;
 	u8 done, started;
@@ -440,41 +444,50 @@  static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
  * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
  * available if the blocks are enabled in the config.
  *
- * Needs to be called with hw_lock held.
+ * If all the buffers required to form a job are available, append the
+ * job descriptor to the job queue to be later queued to the HW.
  *
  * Returns 0 if a job has been successfully prepared, < 0 otherwise.
  */
-static int pispbe_prepare_job(struct pispbe_dev *pispbe,
-			      struct pispbe_job_descriptor *job)
+static int pispbe_prepare_job(struct pispbe_dev *pispbe)
 {
 	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
+	struct pispbe_job_descriptor *job;
+	unsigned int streaming_map;
 	unsigned int config_index;
 	struct pispbe_node *node;
-	unsigned long flags;
 
-	lockdep_assert_held(&pispbe->hw_lock);
+	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
+		static const u32 mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
 
-	memset(job, 0, sizeof(struct pispbe_job_descriptor));
+		if ((pispbe->streaming_map & mask) != mask)
+			return -ENODEV;
 
-	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
-		pispbe->streaming_map) !=
-			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
-		return -ENODEV;
+		/*
+		 * Take a copy of streaming_map: nodes activated after this
+		 * point are ignored when preparing this job.
+		 */
+		streaming_map = pispbe->streaming_map;
+	}
+
+	job = kzalloc(sizeof(*job), GFP_KERNEL);
+	if (!job)
+		return -ENOMEM;
 
 	node = &pispbe->node[CONFIG_NODE];
-	spin_lock_irqsave(&node->ready_lock, flags);
-	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
-						    struct pispbe_buffer,
-						    ready_list);
-	if (buf[CONFIG_NODE]) {
+
+	scoped_guard(spinlock_irqsave, &node->ready_lock) {
+		buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
+							    struct pispbe_buffer,
+							    ready_list);
+		if (!buf[CONFIG_NODE]) {
+			kfree(job);
+			return -ENODEV;
+		}
+
 		list_del(&buf[CONFIG_NODE]->ready_list);
-		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
+		job->buffers[CONFIG_NODE] = buf[CONFIG_NODE];
 	}
-	spin_unlock_irqrestore(&node->ready_lock, flags);
-
-	/* Exit early if no config buffer has been queued. */
-	if (!buf[CONFIG_NODE])
-		return -ENODEV;
 
 	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
 	job->config = &pispbe->config[config_index];
@@ -495,7 +508,7 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 			continue;
 
 		buf[i] = NULL;
-		if (!(pispbe->streaming_map & BIT(i)))
+		if (!(streaming_map & BIT(i)))
 			continue;
 
 		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
@@ -522,25 +535,27 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 		node = &pispbe->node[i];
 
 		/* Pull a buffer from each V4L2 queue to form the queued job */
-		spin_lock_irqsave(&node->ready_lock, flags);
+		spin_lock(&node->ready_lock);
 		buf[i] = list_first_entry_or_null(&node->ready_queue,
 						  struct pispbe_buffer,
 						  ready_list);
 		if (buf[i]) {
 			list_del(&buf[i]->ready_list);
-			pispbe->queued_job.buf[i] = buf[i];
+			job->buffers[i] = buf[i];
 		}
-		spin_unlock_irqrestore(&node->ready_lock, flags);
+		spin_unlock(&node->ready_lock);
 
 		if (!buf[i] && !ignore_buffers)
 			goto err_return_buffers;
 	}
 
-	pispbe->queued_job.valid = true;
-
 	/* Convert buffers to DMA addresses for the hardware */
 	pispbe_xlate_addrs(pispbe, job, buf);
 
+	spin_lock(&pispbe->hw_lock);
+	list_add_tail(&job->queue, &pispbe->job_queue);
+	spin_unlock(&pispbe->hw_lock);
+
 	return 0;
 
 err_return_buffers:
@@ -551,33 +566,41 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 			continue;
 
 		/* Return the buffer to the ready_list queue */
-		spin_lock_irqsave(&n->ready_lock, flags);
+		spin_lock(&n->ready_lock);
 		list_add(&buf[i]->ready_list, &n->ready_queue);
-		spin_unlock_irqrestore(&n->ready_lock, flags);
+		spin_unlock(&n->ready_lock);
 	}
 
-	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
+	kfree(job);
 
 	return -ENODEV;
 }
 
 static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 {
-	struct pispbe_job_descriptor job;
-	unsigned long flags;
-	int ret;
+	struct pispbe_job_descriptor *job;
 
-	spin_lock_irqsave(&pispbe->hw_lock, flags);
+	scoped_guard(spinlock_irqsave, &pispbe->hw_lock) {
+		if (clear_hw_busy)
+			pispbe->hw_busy = false;
 
-	if (clear_hw_busy)
-		pispbe->hw_busy = false;
+		if (pispbe->hw_busy)
+			return;
 
-	if (pispbe->hw_busy)
-		goto unlock_and_return;
+		job = list_first_entry_or_null(&pispbe->job_queue,
+					       struct pispbe_job_descriptor,
+					       queue);
+		if (!job)
+			return;
 
-	ret = pispbe_prepare_job(pispbe, &job);
-	if (ret)
-		goto unlock_and_return;
+		list_del(&job->queue);
+
+		for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
+			pispbe->queued_job.buf[i] = job->buffers[i];
+		 pispbe->queued_job.valid = true;
+
+		pispbe->hw_busy = true;
+	}
 
 	/*
 	 * We can kick the job off without the hw_lock, as this can
@@ -585,16 +608,8 @@  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 	 * only when the following job has been queued and an interrupt
 	 * is rised.
 	 */
-	pispbe->hw_busy = true;
-	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
-
-	pispbe_queue_job(pispbe, &job);
-
-	return;
-
-unlock_and_return:
-	/* No job has been queued, just release the lock and return. */
-	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
+	pispbe_queue_job(pispbe, job);
+	kfree(job);
 }
 
 static void pispbe_isr_jobdone(struct pispbe_dev *pispbe,
@@ -857,7 +872,8 @@  static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
 	 * Every time we add a buffer, check if there's now some work for the hw
 	 * to do.
 	 */
-	pispbe_schedule(pispbe, false);
+	if (!pispbe_prepare_job(pispbe))
+		pispbe_schedule(pispbe, false);
 }
 
 static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
@@ -883,7 +899,8 @@  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
 		node->pispbe->streaming_map);
 
 	/* Maybe we're ready to run. */
-	pispbe_schedule(pispbe, false);
+	if (!pispbe_prepare_job(pispbe))
+		pispbe_schedule(pispbe, false);
 
 	return 0;
 
@@ -935,6 +952,16 @@  static void pispbe_node_stop_streaming(struct vb2_queue *q)
 
 	spin_lock_irqsave(&pispbe->hw_lock, flags);
 	pispbe->streaming_map &= ~BIT(node->id);
+
+	/* Release all jobs once all nodes have stopped streaming. */
+	if (pispbe->streaming_map == 0) {
+		struct pispbe_job_descriptor *job, *temp;
+
+		list_for_each_entry_safe(job, temp, &pispbe->job_queue, queue) {
+			list_del(&job->queue);
+			kfree(job);
+		}
+	}
 	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
 
 	pm_runtime_mark_last_busy(pispbe->dev);
@@ -1677,6 +1704,8 @@  static int pispbe_probe(struct platform_device *pdev)
 	if (!pispbe)
 		return -ENOMEM;
 
+	INIT_LIST_HEAD(&pispbe->job_queue);
+
 	dev_set_drvdata(&pdev->dev, pispbe);
 	pispbe->dev = &pdev->dev;
 	platform_set_drvdata(pdev, pispbe);