diff mbox series

[v5,08/20] jobs: protect jobs with job_lock/unlock

Message ID 20220208143513.1077229-9-eesposit@redhat.com (mailing list archive)
State New, archived
Headers show
Series job: replace AioContext lock with job_mutex | expand

Commit Message

Emanuele Giuseppe Esposito Feb. 8, 2022, 2:35 p.m. UTC
Introduce the job locking mechanism through the whole job API,
following the comments  in job.h and requirements of job-monitor
(like the functions in job-qmp.c, assume lock is held) and
job-driver (like in mirror.c and all other JobDriver, lock is not held).

Use the _locked helpers introduced before to differentiate
between functions called with and without job_mutex.
This only applies to function that are called under both
cases, all the others will be renamed later.

job_{lock/unlock} is independent from real_job_{lock/unlock}.

Note: at this stage, job_{lock/unlock} and job lock guard macros
are *nop*.

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block.c             |  18 ++++---
 block/replication.c |   8 ++-
 blockdev.c          |  17 ++++--
 blockjob.c          |  62 +++++++++++++++-------
 job-qmp.c           |   2 +
 job.c               | 123 +++++++++++++++++++++++++++++++-------------
 monitor/qmp-cmds.c  |   6 ++-
 qemu-img.c          |  41 +++++++++------
 8 files changed, 191 insertions(+), 86 deletions(-)

Comments

Stefan Hajnoczi Feb. 17, 2022, 2:48 p.m. UTC | #1
On Tue, Feb 08, 2022 at 09:35:01AM -0500, Emanuele Giuseppe Esposito wrote:
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..a03b28726e 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -149,7 +149,9 @@ static void replication_close(BlockDriverState *bs)
>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>          commit_job = &s->commit_job->job;
>          assert(commit_job->aio_context == qemu_get_current_aio_context());

Is it safe to access commit_job->aio_context outside job_mutex?

> @@ -1838,7 +1840,9 @@ static void drive_backup_abort(BlkActionState *common)
>          aio_context = bdrv_get_aio_context(state->bs);
>          aio_context_acquire(aio_context);
>  
> -        job_cancel_sync(&state->job->job, true);
> +        WITH_JOB_LOCK_GUARD() {
> +            job_cancel_sync(&state->job->job, true);
> +        }

Maybe job_cancel_sync() should take the lock internally since all
callers in this patch seem to need the lock?

I noticed this patch does not add WITH_JOB_LOCK_GUARD() to
tests/unit/test-blockjob.c:cancel_common(). Was that an oversight or is
there a reason why job_mutex is not needed around the job_cancel_sync()
call there?

> @@ -252,7 +258,13 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
>  
>  static void block_job_on_idle(Notifier *n, void *opaque)
>  {
> +    /*
> +     * we can't kick with job_mutex held, but we also want
> +     * to protect the notifier list.
> +     */
> +    job_unlock();
>      aio_wait_kick();
> +    job_lock();

I don't understand this. aio_wait_kick() looks safe to call with a mutex
held?

> @@ -292,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
>      job->speed = speed;
>  
>      if (drv->set_speed) {
> +        job_unlock();
>          drv->set_speed(job, speed);
> +        job_lock();

What guarantees that job stays alive during drv->set_speed(job)? We
don't hold a ref here. Maybe the assumption is that
block_job_set_speed() only gets called from the main loop thread and
nothing else will modify the jobs list while we're in drv->set_speed()?

> @@ -545,10 +566,15 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
>                                          action);
>      }
>      if (action == BLOCK_ERROR_ACTION_STOP) {
> -        if (!job->job.user_paused) {
> -            job_pause(&job->job);
> -            /* make the pause user visible, which will be resumed from QMP. */
> -            job->job.user_paused = true;
> +        WITH_JOB_LOCK_GUARD() {
> +            if (!job->job.user_paused) {
> +                job_pause(&job->job);
> +                /*
> +                 * make the pause user visible, which will be
> +                 * resumed from QMP.
> +                 */
> +                job->job.user_paused = true;
> +            }
>          }
>          block_job_iostatus_set_err(job, error);

Does this need the lock? If not, why is block_job_iostatus_reset()
called with the hold?
Emanuele Giuseppe Esposito Feb. 24, 2022, 12:45 p.m. UTC | #2
On 17/02/2022 15:48, Stefan Hajnoczi wrote:
> On Tue, Feb 08, 2022 at 09:35:01AM -0500, Emanuele Giuseppe Esposito wrote:
>> diff --git a/block/replication.c b/block/replication.c
>> index 55c8f894aa..a03b28726e 100644
>> --- a/block/replication.c
>> +++ b/block/replication.c
>> @@ -149,7 +149,9 @@ static void replication_close(BlockDriverState *bs)
>>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>>          commit_job = &s->commit_job->job;
>>          assert(commit_job->aio_context == qemu_get_current_aio_context());
> 
> Is it safe to access commit_job->aio_context outside job_mutex?

No, but it is currently not done. Patch 18 takes care of protecting
aio_context. Remember again that job lock API is still nop.
> 
>> @@ -1838,7 +1840,9 @@ static void drive_backup_abort(BlkActionState *common)
>>          aio_context = bdrv_get_aio_context(state->bs);
>>          aio_context_acquire(aio_context);
>>  
>> -        job_cancel_sync(&state->job->job, true);
>> +        WITH_JOB_LOCK_GUARD() {
>> +            job_cancel_sync(&state->job->job, true);
>> +        }
> 
> Maybe job_cancel_sync() should take the lock internally since all
> callers in this patch seem to need the lock?

The _locked version is useful because it is used when lock guards are
already present, and cover multiple operations. There are only 3 places
where a lock guard is added to cover job_cance_sync_locked. Is it worth
defining another additional function?


> 
> I noticed this patch does not add WITH_JOB_LOCK_GUARD() to
> tests/unit/test-blockjob.c:cancel_common(). Was that an oversight or is
> there a reason why job_mutex is not needed around the job_cancel_sync()
> call there?

No, locks in unit tests are added in patch 10 "jobs: protect jobs with
job_lock/unlock".

> 
>> @@ -252,7 +258,13 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
>>  
>>  static void block_job_on_idle(Notifier *n, void *opaque)
>>  {
>> +    /*
>> +     * we can't kick with job_mutex held, but we also want
>> +     * to protect the notifier list.
>> +     */
>> +    job_unlock();
>>      aio_wait_kick();
>> +    job_lock();
> 
> I don't understand this. aio_wait_kick() looks safe to call with a mutex
> held?
You are right. It should be safe.

> 
>> @@ -292,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
>>      job->speed = speed;
>>  
>>      if (drv->set_speed) {
>> +        job_unlock();
>>          drv->set_speed(job, speed);
>> +        job_lock();
> 
> What guarantees that job stays alive during drv->set_speed(job)? We
> don't hold a ref here. Maybe the assumption is that
> block_job_set_speed() only gets called from the main loop thread and
> nothing else will modify the jobs list while we're in drv->set_speed()?

What guaranteed this before? I am not sure.

> 
>> @@ -545,10 +566,15 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
>>                                          action);
>>      }
>>      if (action == BLOCK_ERROR_ACTION_STOP) {
>> -        if (!job->job.user_paused) {
>> -            job_pause(&job->job);
>> -            /* make the pause user visible, which will be resumed from QMP. */
>> -            job->job.user_paused = true;
>> +        WITH_JOB_LOCK_GUARD() {
>> +            if (!job->job.user_paused) {
>> +                job_pause(&job->job);
>> +                /*
>> +                 * make the pause user visible, which will be
>> +                 * resumed from QMP.
>> +                 */
>> +                job->job.user_paused = true;
>> +            }
>>          }
>>          block_job_iostatus_set_err(job, error);
> 
> Does this need the lock? If not, why is block_job_iostatus_reset()
> called with the hold?
> 
block_job_iostatus_set_err does not touch any Job fields. On the other
hand block_job_iostatus_reset reads job.user_paused and job.pause_count.

Emanuele
Stefan Hajnoczi Feb. 24, 2022, 4:48 p.m. UTC | #3
On Thu, Feb 24, 2022 at 01:45:48PM +0100, Emanuele Giuseppe Esposito wrote:
> 
> 
> On 17/02/2022 15:48, Stefan Hajnoczi wrote:
> > On Tue, Feb 08, 2022 at 09:35:01AM -0500, Emanuele Giuseppe Esposito wrote:
> >> diff --git a/block/replication.c b/block/replication.c
> >> index 55c8f894aa..a03b28726e 100644
> >> --- a/block/replication.c
> >> +++ b/block/replication.c
> >> @@ -149,7 +149,9 @@ static void replication_close(BlockDriverState *bs)
> >>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
> >>          commit_job = &s->commit_job->job;
> >>          assert(commit_job->aio_context == qemu_get_current_aio_context());
> > 
> > Is it safe to access commit_job->aio_context outside job_mutex?
> 
> No, but it is currently not done. Patch 18 takes care of protecting
> aio_context. Remember again that job lock API is still nop.
> > 
> >> @@ -1838,7 +1840,9 @@ static void drive_backup_abort(BlkActionState *common)
> >>          aio_context = bdrv_get_aio_context(state->bs);
> >>          aio_context_acquire(aio_context);
> >>  
> >> -        job_cancel_sync(&state->job->job, true);
> >> +        WITH_JOB_LOCK_GUARD() {
> >> +            job_cancel_sync(&state->job->job, true);
> >> +        }
> > 
> > Maybe job_cancel_sync() should take the lock internally since all
> > callers in this patch seem to need the lock?
> 
> The _locked version is useful because it is used when lock guards are
> already present, and cover multiple operations. There are only 3 places
> where a lock guard is added to cover job_cance_sync_locked. Is it worth
> defining another additional function?
> 
> 
> > 
> > I noticed this patch does not add WITH_JOB_LOCK_GUARD() to
> > tests/unit/test-blockjob.c:cancel_common(). Was that an oversight or is
> > there a reason why job_mutex is not needed around the job_cancel_sync()
> > call there?
> 
> No, locks in unit tests are added in patch 10 "jobs: protect jobs with
> job_lock/unlock".

I see, it's a question of how to split up the patches. When patches
leave the code in a state with broken invariants it becomes difficult to
review. I can't distinguish between actual bugs and temporary violations
that will be fixed in a later patch (unless they are clearly marked).

If you can structure patches so they are self-contained and don't leave
the broken invariants then that would make review easier, but in this
case it is tricky so I'll do the best I can to review it if you cannot
restructure the sequence of commits.

> > 
> >> @@ -252,7 +258,13 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
> >>  
> >>  static void block_job_on_idle(Notifier *n, void *opaque)
> >>  {
> >> +    /*
> >> +     * we can't kick with job_mutex held, but we also want
> >> +     * to protect the notifier list.
> >> +     */
> >> +    job_unlock();
> >>      aio_wait_kick();
> >> +    job_lock();
> > 
> > I don't understand this. aio_wait_kick() looks safe to call with a mutex
> > held?
> You are right. It should be safe.
> 
> > 
> >> @@ -292,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
> >>      job->speed = speed;
> >>  
> >>      if (drv->set_speed) {
> >> +        job_unlock();
> >>          drv->set_speed(job, speed);
> >> +        job_lock();
> > 
> > What guarantees that job stays alive during drv->set_speed(job)? We
> > don't hold a ref here. Maybe the assumption is that
> > block_job_set_speed() only gets called from the main loop thread and
> > nothing else will modify the jobs list while we're in drv->set_speed()?
> 
> What guaranteed this before? I am not sure.

I guess the reason is the one I suggested. It should be documented in
the comments.

> 
> > 
> >> @@ -545,10 +566,15 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
> >>                                          action);
> >>      }
> >>      if (action == BLOCK_ERROR_ACTION_STOP) {
> >> -        if (!job->job.user_paused) {
> >> -            job_pause(&job->job);
> >> -            /* make the pause user visible, which will be resumed from QMP. */
> >> -            job->job.user_paused = true;
> >> +        WITH_JOB_LOCK_GUARD() {
> >> +            if (!job->job.user_paused) {
> >> +                job_pause(&job->job);
> >> +                /*
> >> +                 * make the pause user visible, which will be
> >> +                 * resumed from QMP.
> >> +                 */
> >> +                job->job.user_paused = true;
> >> +            }
> >>          }
> >>          block_job_iostatus_set_err(job, error);
> > 
> > Does this need the lock? If not, why is block_job_iostatus_reset()
> > called with the hold?
> > 
> block_job_iostatus_set_err does not touch any Job fields. On the other
> hand block_job_iostatus_reset reads job.user_paused and job.pause_count.

BlockJob->iostatus requires no locking?
Emanuele Giuseppe Esposito Feb. 24, 2022, 4:55 p.m. UTC | #4
On 24/02/2022 17:48, Stefan Hajnoczi wrote:
> On Thu, Feb 24, 2022 at 01:45:48PM +0100, Emanuele Giuseppe Esposito wrote:
>>
>>
>> On 17/02/2022 15:48, Stefan Hajnoczi wrote:
>>> On Tue, Feb 08, 2022 at 09:35:01AM -0500, Emanuele Giuseppe Esposito wrote:
>>>> diff --git a/block/replication.c b/block/replication.c
>>>> index 55c8f894aa..a03b28726e 100644
>>>> --- a/block/replication.c
>>>> +++ b/block/replication.c
>>>> @@ -149,7 +149,9 @@ static void replication_close(BlockDriverState *bs)
>>>>      if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>>>>          commit_job = &s->commit_job->job;
>>>>          assert(commit_job->aio_context == qemu_get_current_aio_context());
>>>
>>> Is it safe to access commit_job->aio_context outside job_mutex?
>>
>> No, but it is currently not done. Patch 18 takes care of protecting
>> aio_context. Remember again that job lock API is still nop.
>>>
>>>> @@ -1838,7 +1840,9 @@ static void drive_backup_abort(BlkActionState *common)
>>>>          aio_context = bdrv_get_aio_context(state->bs);
>>>>          aio_context_acquire(aio_context);
>>>>  
>>>> -        job_cancel_sync(&state->job->job, true);
>>>> +        WITH_JOB_LOCK_GUARD() {
>>>> +            job_cancel_sync(&state->job->job, true);
>>>> +        }
>>>
>>> Maybe job_cancel_sync() should take the lock internally since all
>>> callers in this patch seem to need the lock?
>>
>> The _locked version is useful because it is used when lock guards are
>> already present, and cover multiple operations. There are only 3 places
>> where a lock guard is added to cover job_cance_sync_locked. Is it worth
>> defining another additional function?
>>
>>
>>>
>>> I noticed this patch does not add WITH_JOB_LOCK_GUARD() to
>>> tests/unit/test-blockjob.c:cancel_common(). Was that an oversight or is
>>> there a reason why job_mutex is not needed around the job_cancel_sync()
>>> call there?
>>
>> No, locks in unit tests are added in patch 10 "jobs: protect jobs with
>> job_lock/unlock".
> 
> I see, it's a question of how to split up the patches. When patches
> leave the code in a state with broken invariants it becomes difficult to
> review. I can't distinguish between actual bugs and temporary violations
> that will be fixed in a later patch (unless they are clearly marked).
> 
> If you can structure patches so they are self-contained and don't leave
> the broken invariants then that would make review easier, but in this
> case it is tricky so I'll do the best I can to review it if you cannot
> restructure the sequence of commits.

Yes, the main problem is that ideally we want to add job lock and remove
Aiocontext lock. But together this can't happen, and just adding proper
locks will create a ton of deadlocks, because in order to maintain
invariants sometimes job lock is inside aiocontext lock, and some other
times the opposite happens.

The way it is done in this serie is:
1) create job_lock/unlock as nop
2) make sure the nop job_lock is protecting the Job fields
3) do all API renaming, accoring with the locking used above
4) enable job_lock (not nop anymore) and at the same time remove the
AioContext

If you want, a more up-to-date branch with your feedbacks applied so far
is here:
https://gitlab.com/eesposit/qemu/-/commits/dp_jobs_reviewed

> 
>>>
>>>> @@ -252,7 +258,13 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
>>>>  
>>>>  static void block_job_on_idle(Notifier *n, void *opaque)
>>>>  {
>>>> +    /*
>>>> +     * we can't kick with job_mutex held, but we also want
>>>> +     * to protect the notifier list.
>>>> +     */
>>>> +    job_unlock();
>>>>      aio_wait_kick();
>>>> +    job_lock();
>>>
>>> I don't understand this. aio_wait_kick() looks safe to call with a mutex
>>> held?
>> You are right. It should be safe.
>>
>>>
>>>> @@ -292,7 +304,9 @@ bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
>>>>      job->speed = speed;
>>>>  
>>>>      if (drv->set_speed) {
>>>> +        job_unlock();
>>>>          drv->set_speed(job, speed);
>>>> +        job_lock();
>>>
>>> What guarantees that job stays alive during drv->set_speed(job)? We
>>> don't hold a ref here. Maybe the assumption is that
>>> block_job_set_speed() only gets called from the main loop thread and
>>> nothing else will modify the jobs list while we're in drv->set_speed()?
>>
>> What guaranteed this before? I am not sure.
> 
> I guess the reason is the one I suggested. It should be documented in
> the comments.
> 
>>
>>>
>>>> @@ -545,10 +566,15 @@ BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
>>>>                                          action);
>>>>      }
>>>>      if (action == BLOCK_ERROR_ACTION_STOP) {
>>>> -        if (!job->job.user_paused) {
>>>> -            job_pause(&job->job);
>>>> -            /* make the pause user visible, which will be resumed from QMP. */
>>>> -            job->job.user_paused = true;
>>>> +        WITH_JOB_LOCK_GUARD() {
>>>> +            if (!job->job.user_paused) {
>>>> +                job_pause(&job->job);
>>>> +                /*
>>>> +                 * make the pause user visible, which will be
>>>> +                 * resumed from QMP.
>>>> +                 */
>>>> +                job->job.user_paused = true;
>>>> +            }
>>>>          }
>>>>          block_job_iostatus_set_err(job, error);
>>>
>>> Does this need the lock? If not, why is block_job_iostatus_reset()
>>> called with the hold?
>>>
>> block_job_iostatus_set_err does not touch any Job fields. On the other
>> hand block_job_iostatus_reset reads job.user_paused and job.pause_count.
> 
> BlockJob->iostatus requires no locking?
>
diff mbox series

Patch

diff --git a/block.c b/block.c
index 506c2f778d..33fe969fd9 100644
--- a/block.c
+++ b/block.c
@@ -4968,7 +4968,9 @@  static void bdrv_close(BlockDriverState *bs)
 
 void bdrv_close_all(void)
 {
-    assert(job_next(NULL) == NULL);
+    WITH_JOB_LOCK_GUARD() {
+        assert(job_next(NULL) == NULL);
+    }
     assert(qemu_in_main_thread());
 
     /* Drop references from requests still in flight, such as canceled block
@@ -6145,13 +6147,15 @@  XDbgBlockGraph *bdrv_get_xdbg_block_graph(Error **errp)
         }
     }
 
-    for (job = block_job_next(NULL); job; job = block_job_next(job)) {
-        GSList *el;
+    WITH_JOB_LOCK_GUARD() {
+        for (job = block_job_next(NULL); job; job = block_job_next(job)) {
+            GSList *el;
 
-        xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
-                           job->job.id);
-        for (el = job->nodes; el; el = el->next) {
-            xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
+            xdbg_graph_add_node(gr, job, X_DBG_BLOCK_GRAPH_NODE_TYPE_BLOCK_JOB,
+                                job->job.id);
+            for (el = job->nodes; el; el = el->next) {
+                xdbg_graph_add_edge(gr, job, (BdrvChild *)el->data);
+            }
         }
     }
 
diff --git a/block/replication.c b/block/replication.c
index 55c8f894aa..a03b28726e 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -149,7 +149,9 @@  static void replication_close(BlockDriverState *bs)
     if (s->stage == BLOCK_REPLICATION_FAILOVER) {
         commit_job = &s->commit_job->job;
         assert(commit_job->aio_context == qemu_get_current_aio_context());
-        job_cancel_sync(commit_job, false);
+        WITH_JOB_LOCK_GUARD() {
+            job_cancel_sync(commit_job, false);
+        }
     }
 
     if (s->mode == REPLICATION_MODE_SECONDARY) {
@@ -726,7 +728,9 @@  static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
          * disk, secondary disk in backup_job_completed().
          */
         if (s->backup_job) {
-            job_cancel_sync(&s->backup_job->job, true);
+            WITH_JOB_LOCK_GUARD() {
+                job_cancel_sync(&s->backup_job->job, true);
+            }
         }
 
         if (!failover) {
diff --git a/blockdev.c b/blockdev.c
index e315466914..c5fba4d157 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -150,6 +150,8 @@  void blockdev_mark_auto_del(BlockBackend *blk)
         return;
     }
 
+    JOB_LOCK_GUARD();
+
     for (job = block_job_next(NULL); job; job = block_job_next(job)) {
         if (block_job_has_bdrv(job, blk_bs(blk))) {
             AioContext *aio_context = job->job.aio_context;
@@ -1838,7 +1840,9 @@  static void drive_backup_abort(BlkActionState *common)
         aio_context = bdrv_get_aio_context(state->bs);
         aio_context_acquire(aio_context);
 
-        job_cancel_sync(&state->job->job, true);
+        WITH_JOB_LOCK_GUARD() {
+            job_cancel_sync(&state->job->job, true);
+        }
 
         aio_context_release(aio_context);
     }
@@ -1939,7 +1943,9 @@  static void blockdev_backup_abort(BlkActionState *common)
         aio_context = bdrv_get_aio_context(state->bs);
         aio_context_acquire(aio_context);
 
-        job_cancel_sync(&state->job->job, true);
+        WITH_JOB_LOCK_GUARD() {
+            job_cancel_sync(&state->job->job, true);
+        }
 
         aio_context_release(aio_context);
     }
@@ -2388,7 +2394,10 @@  exit:
     if (!has_props) {
         qapi_free_TransactionProperties(props);
     }
-    job_txn_unref(block_job_txn);
+
+    WITH_JOB_LOCK_GUARD() {
+        job_txn_unref(block_job_txn);
+    }
 }
 
 BlockDirtyBitmapSha256 *qmp_x_debug_block_dirty_bitmap_sha256(const char *node,
@@ -3711,6 +3720,8 @@  BlockJobInfoList *qmp_query_block_jobs(Error **errp)
     BlockJobInfoList *head = NULL, **tail = &head;
     BlockJob *job;
 
+    JOB_LOCK_GUARD();
+
     for (job = block_job_next(NULL); job; job = block_job_next(job)) {
         BlockJobInfo *value;
 
diff --git a/blockjob.c b/blockjob.c
index 98ce5cb1ed..5fb477fe26 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -102,7 +102,9 @@  static char *child_job_get_parent_desc(BdrvChild *c)
 static void child_job_drained_begin(BdrvChild *c)
 {
     BlockJob *job = c->opaque;
-    job_pause(&job->job);
+    WITH_JOB_LOCK_GUARD() {
+        job_pause(&job->job);
+    }
 }
 
 static bool child_job_drained_poll(BdrvChild *c)
@@ -114,8 +116,10 @@  static bool child_job_drained_poll(BdrvChild *c)
     /* An inactive or completed job doesn't have any pending requests. Jobs
      * with !job->busy are either already paused or have a pause point after
      * being reentered, so no job driver code will run before they pause. */
-    if (!job->busy || job_is_completed(job)) {
-        return false;
+    WITH_JOB_LOCK_GUARD() {
+        if (!job->busy || job_is_completed_locked(job)) {
+            return false;
+        }
     }
 
     /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
@@ -130,7 +134,9 @@  static bool child_job_drained_poll(BdrvChild *c)
 static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
 {
     BlockJob *job = c->opaque;
-    job_resume(&job->job);
+    WITH_JOB_LOCK_GUARD() {
+        job_resume(&job->job);
+    }
 }
 
 static bool child_job_can_set_aio_ctx(BdrvChild *c, AioContext *ctx,
@@ -252,7 +258,13 @@  int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
 
 static void block_job_on_idle(Notifier *n, void *opaque)
 {
+    /*
+     * we can't kick with job_mutex held, but we also want
+     * to protect the notifier list.
+     */
+    job_unlock();
     aio_wait_kick();
+    job_lock();
 }
 
 bool block_job_is_internal(BlockJob *job)
@@ -292,7 +304,9 @@  bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
     job->speed = speed;
 
     if (drv->set_speed) {
+        job_unlock();
         drv->set_speed(job, speed);
+        job_lock();
     }
 
     if (speed && speed <= old_speed) {
@@ -334,7 +348,7 @@  BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
     info->len       = progress_total;
     info->speed     = job->speed;
     info->io_status = job->iostatus;
-    info->ready     = job_is_ready(&job->job),
+    info->ready     = job_is_ready_locked(&job->job),
     info->status    = job->job.status;
     info->auto_finalize = job->job.auto_finalize;
     info->auto_dismiss  = job->job.auto_dismiss;
@@ -469,13 +483,15 @@  void *block_job_create(const char *job_id, const BlockJobDriver *driver,
     job->ready_notifier.notify = block_job_event_ready;
     job->idle_notifier.notify = block_job_on_idle;
 
-    notifier_list_add(&job->job.on_finalize_cancelled,
-                      &job->finalize_cancelled_notifier);
-    notifier_list_add(&job->job.on_finalize_completed,
-                      &job->finalize_completed_notifier);
-    notifier_list_add(&job->job.on_pending, &job->pending_notifier);
-    notifier_list_add(&job->job.on_ready, &job->ready_notifier);
-    notifier_list_add(&job->job.on_idle, &job->idle_notifier);
+    WITH_JOB_LOCK_GUARD() {
+        notifier_list_add(&job->job.on_finalize_cancelled,
+                          &job->finalize_cancelled_notifier);
+        notifier_list_add(&job->job.on_finalize_completed,
+                          &job->finalize_completed_notifier);
+        notifier_list_add(&job->job.on_pending, &job->pending_notifier);
+        notifier_list_add(&job->job.on_ready, &job->ready_notifier);
+        notifier_list_add(&job->job.on_idle, &job->idle_notifier);
+    }
 
     error_setg(&job->blocker, "block device is in use by block job: %s",
                job_type_str(&job->job));
@@ -487,7 +503,10 @@  void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 
     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 
-    if (!block_job_set_speed(job, speed, errp)) {
+    WITH_JOB_LOCK_GUARD() {
+        ret = block_job_set_speed(job, speed, errp);
+    }
+    if (!ret) {
         goto fail;
     }
 
@@ -512,7 +531,9 @@  void block_job_user_resume(Job *job)
 {
     BlockJob *bjob = container_of(job, BlockJob, job);
     assert(qemu_in_main_thread());
-    block_job_iostatus_reset(bjob);
+    WITH_JOB_LOCK_GUARD() {
+        block_job_iostatus_reset(bjob);
+    }
 }
 
 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
@@ -545,10 +566,15 @@  BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
                                         action);
     }
     if (action == BLOCK_ERROR_ACTION_STOP) {
-        if (!job->job.user_paused) {
-            job_pause(&job->job);
-            /* make the pause user visible, which will be resumed from QMP. */
-            job->job.user_paused = true;
+        WITH_JOB_LOCK_GUARD() {
+            if (!job->job.user_paused) {
+                job_pause(&job->job);
+                /*
+                 * make the pause user visible, which will be
+                 * resumed from QMP.
+                 */
+                job->job.user_paused = true;
+            }
         }
         block_job_iostatus_set_err(job, error);
     }
diff --git a/job-qmp.c b/job-qmp.c
index a6774aaaa5..06f970f6cf 100644
--- a/job-qmp.c
+++ b/job-qmp.c
@@ -171,6 +171,8 @@  JobInfoList *qmp_query_jobs(Error **errp)
     JobInfoList *head = NULL, **tail = &head;
     Job *job;
 
+    JOB_LOCK_GUARD();
+
     for (job = job_next(NULL); job; job = job_next(job)) {
         JobInfo *value;
 
diff --git a/job.c b/job.c
index 6f1f529dc1..75f7c28147 100644
--- a/job.c
+++ b/job.c
@@ -347,6 +347,8 @@  void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
 {
     Job *job;
 
+    JOB_LOCK_GUARD();
+
     if (job_id) {
         if (flags & JOB_INTERNAL) {
             error_setg(errp, "Cannot specify job ID for internal job");
@@ -421,7 +423,9 @@  void job_unref(Job *job)
         assert(!job->txn);
 
         if (job->driver->free) {
+            job_unlock();
             job->driver->free(job);
+            job_lock();
         }
 
         QLIST_REMOVE(job, job_list);
@@ -508,6 +512,7 @@  void job_enter_cond(Job *job, bool(*fn)(Job *job))
 
 void job_enter(Job *job)
 {
+    JOB_LOCK_GUARD();
     job_enter_cond(job, NULL);
 }
 
@@ -526,7 +531,9 @@  static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
     job->busy = false;
     job_event_idle(job);
     real_job_unlock();
+    job_unlock();
     qemu_coroutine_yield();
+    job_lock();
 
     /* Set by job_enter_cond() before re-entering the coroutine.  */
     assert(job->busy);
@@ -540,15 +547,17 @@  static void coroutine_fn job_pause_point_locked(Job *job)
     if (!job_should_pause(job)) {
         return;
     }
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
         return;
     }
 
     if (job->driver->pause) {
+        job_unlock();
         job->driver->pause(job);
+        job_lock();
     }
 
-    if (job_should_pause(job) && !job_is_cancelled(job)) {
+    if (job_should_pause(job) && !job_is_cancelled_locked(job)) {
         JobStatus status = job->status;
         job_state_transition(job, status == JOB_STATUS_READY
                                   ? JOB_STATUS_STANDBY
@@ -560,7 +569,9 @@  static void coroutine_fn job_pause_point_locked(Job *job)
     }
 
     if (job->driver->resume) {
+        job_unlock();
         job->driver->resume(job);
+        job_lock();
     }
 }
 
@@ -572,10 +583,11 @@  void coroutine_fn job_pause_point(Job *job)
 
 void job_yield(Job *job)
 {
+    JOB_LOCK_GUARD();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
         return;
     }
 
@@ -583,15 +595,16 @@  void job_yield(Job *job)
         job_do_yield(job, -1);
     }
 
-    job_pause_point(job);
+    job_pause_point_locked(job);
 }
 
 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
 {
+    JOB_LOCK_GUARD();
     assert(job->busy);
 
     /* Check cancellation *before* setting busy = false, too!  */
-    if (job_is_cancelled(job)) {
+    if (job_is_cancelled_locked(job)) {
         return;
     }
 
@@ -599,10 +612,10 @@  void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
         job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
     }
 
-    job_pause_point(job);
+    job_pause_point_locked(job);
 }
 
-/* Assumes the block_job_mutex is held */
+/* Assumes the job_mutex is held */
 static bool job_timer_not_pending(Job *job)
 {
     return !timer_pending(&job->sleep_timer);
@@ -612,7 +625,7 @@  void job_pause(Job *job)
 {
     job->pause_count++;
     if (!job->paused) {
-        job_enter(job);
+        job_enter_cond(job, NULL);
     }
 }
 
@@ -658,7 +671,9 @@  void job_user_resume(Job *job, Error **errp)
         return;
     }
     if (job->driver->user_resume) {
+        job_unlock();
         job->driver->user_resume(job);
+        job_lock();
     }
     job->user_paused = false;
     job_resume(job);
@@ -692,6 +707,7 @@  void job_dismiss(Job **jobptr, Error **errp)
 
 void job_early_fail(Job *job)
 {
+    JOB_LOCK_GUARD();
     assert(job->status == JOB_STATUS_CREATED);
     job_do_dismiss(job);
 }
@@ -706,7 +722,7 @@  static void job_conclude(Job *job)
 
 static void job_update_rc(Job *job)
 {
-    if (!job->ret && job_is_cancelled(job)) {
+    if (!job->ret && job_is_cancelled_locked(job)) {
         job->ret = -ECANCELED;
     }
     if (job->ret) {
@@ -722,7 +738,9 @@  static void job_commit(Job *job)
     assert(!job->ret);
     assert(qemu_in_main_thread());
     if (job->driver->commit) {
+        job_unlock();
         job->driver->commit(job);
+        job_lock();
     }
 }
 
@@ -731,7 +749,9 @@  static void job_abort(Job *job)
     assert(job->ret);
     assert(qemu_in_main_thread());
     if (job->driver->abort) {
+        job_unlock();
         job->driver->abort(job);
+        job_lock();
     }
 }
 
@@ -739,15 +759,18 @@  static void job_clean(Job *job)
 {
     assert(qemu_in_main_thread());
     if (job->driver->clean) {
+        job_unlock();
         job->driver->clean(job);
+        job_lock();
     }
 }
 
 static int job_finalize_single(Job *job)
 {
+    int job_ret;
     AioContext *ctx = job->aio_context;
 
-    assert(job_is_completed(job));
+    assert(job_is_completed_locked(job));
 
     /* Ensure abort is called for late-transactional failures */
     job_update_rc(job);
@@ -764,12 +787,15 @@  static int job_finalize_single(Job *job)
     aio_context_release(ctx);
 
     if (job->cb) {
-        job->cb(job->opaque, job->ret);
+        job_ret = job->ret;
+        job_unlock();
+        job->cb(job->opaque, job_ret);
+        job_lock();
     }
 
     /* Emit events only if we actually started */
     if (job_started(job)) {
-        if (job_is_cancelled(job)) {
+        if (job_is_cancelled_locked(job)) {
             job_event_cancelled(job);
         } else {
             job_event_completed(job);
@@ -785,7 +811,9 @@  static void job_cancel_async(Job *job, bool force)
 {
     assert(qemu_in_main_thread());
     if (job->driver->cancel) {
+        job_unlock();
         force = job->driver->cancel(job, force);
+        job_lock();
     } else {
         /* No .cancel() means the job will behave as if force-cancelled */
         force = true;
@@ -794,7 +822,9 @@  static void job_cancel_async(Job *job, bool force)
     if (job->user_paused) {
         /* Do not call job_enter here, the caller will handle it.  */
         if (job->driver->user_resume) {
+            job_unlock();
             job->driver->user_resume(job);
+            job_lock();
         }
         job->user_paused = false;
         assert(job->pause_count > 0);
@@ -863,8 +893,8 @@  static void job_completed_txn_abort(Job *job)
          */
         ctx = other_job->aio_context;
         aio_context_acquire(ctx);
-        if (!job_is_completed(other_job)) {
-            assert(job_cancel_requested(other_job));
+        if (!job_is_completed_locked(other_job)) {
+            assert(job_cancel_requested_locked(other_job));
             job_finish_sync(other_job, NULL, NULL);
         }
         aio_context_release(ctx);
@@ -883,13 +913,17 @@  static void job_completed_txn_abort(Job *job)
 
 static int job_prepare(Job *job)
 {
+    int ret;
     AioContext *ctx = job->aio_context;
     assert(qemu_in_main_thread());
 
     if (job->ret == 0 && job->driver->prepare) {
+        job_unlock();
         aio_context_acquire(ctx);
-        job->ret = job->driver->prepare(job);
+        ret = job->driver->prepare(job);
         aio_context_release(ctx);
+        job_lock();
+        job->ret = ret;
         job_update_rc(job);
     }
 
@@ -935,6 +969,7 @@  static int job_transition_to_pending(Job *job)
 
 void job_transition_to_ready(Job *job)
 {
+    JOB_LOCK_GUARD();
     job_state_transition(job, JOB_STATUS_READY);
     job_event_ready(job);
 }
@@ -951,7 +986,7 @@  static void job_completed_txn_success(Job *job)
      * txn.
      */
     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
-        if (!job_is_completed(other_job)) {
+        if (!job_is_completed_locked(other_job)) {
             return;
         }
         assert(other_job->ret == 0);
@@ -967,7 +1002,7 @@  static void job_completed_txn_success(Job *job)
 
 static void job_completed(Job *job)
 {
-    assert(job && job->txn && !job_is_completed(job));
+    assert(job && job->txn && !job_is_completed_locked(job));
 
     job_update_rc(job);
     trace_job_completed(job, job->ret);
@@ -1018,25 +1053,33 @@  static void job_exit(void *opaque)
 static void coroutine_fn job_co_entry(void *opaque)
 {
     Job *job = opaque;
+    int ret;
 
     assert(job && job->driver && job->driver->run);
-    assert(job->aio_context == qemu_get_current_aio_context());
-    job_pause_point(job);
-    job->ret = job->driver->run(job, &job->err);
-    job->deferred_to_main_loop = true;
-    job->busy = true;
+    WITH_JOB_LOCK_GUARD() {
+        assert(job->aio_context == qemu_get_current_aio_context());
+        job_pause_point_locked(job);
+    }
+    ret = job->driver->run(job, &job->err);
+    WITH_JOB_LOCK_GUARD() {
+        job->ret = ret;
+        job->deferred_to_main_loop = true;
+        job->busy = true;
+    }
     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
 }
 
 void job_start(Job *job)
 {
-    assert(job && !job_started(job) && job->paused &&
-           job->driver && job->driver->run);
-    job->co = qemu_coroutine_create(job_co_entry, job);
-    job->pause_count--;
-    job->busy = true;
-    job->paused = false;
-    job_state_transition(job, JOB_STATUS_RUNNING);
+    WITH_JOB_LOCK_GUARD() {
+        assert(job && !job_started(job) && job->paused &&
+            job->driver && job->driver->run);
+        job->co = qemu_coroutine_create(job_co_entry, job);
+        job->pause_count--;
+        job->busy = true;
+        job->paused = false;
+        job_state_transition(job, JOB_STATUS_RUNNING);
+    }
     aio_co_enter(job->aio_context, job->co);
 }
 
@@ -1054,17 +1097,17 @@  void job_cancel(Job *job, bool force)
          * job_cancel_async() ignores soft-cancel requests for jobs
          * that are already done (i.e. deferred to the main loop).  We
          * have to check again whether the job is really cancelled.
-         * (job_cancel_requested() and job_is_cancelled() are equivalent
-         * here, because job_cancel_async() will make soft-cancel
-         * requests no-ops when deferred_to_main_loop is true.  We
-         * choose to call job_is_cancelled() to show that we invoke
+         * (job_cancel_requested_locked() and job_is_cancelled_locked()
+         * are equivalent here, because job_cancel_async() will
+         * make soft-cancel requests no-ops when deferred_to_main_loop is true.
+         * We choose to call job_is_cancelled_locked() to show that we invoke
          * job_completed_txn_abort() only for force-cancelled jobs.)
          */
-        if (job_is_cancelled(job)) {
+        if (job_is_cancelled_locked(job)) {
             job_completed_txn_abort(job);
         }
     } else {
-        job_enter(job);
+        job_enter_cond(job, NULL);
     }
 }
 
@@ -1106,6 +1149,7 @@  void job_cancel_sync_all(void)
     Job *job;
     AioContext *aio_context;
 
+    JOB_LOCK_GUARD();
     while ((job = job_next(NULL))) {
         aio_context = job->aio_context;
         aio_context_acquire(aio_context);
@@ -1127,13 +1171,15 @@  void job_complete(Job *job, Error **errp)
     if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
         return;
     }
-    if (job_cancel_requested(job) || !job->driver->complete) {
+    if (job_cancel_requested_locked(job) || !job->driver->complete) {
         error_setg(errp, "The active block job '%s' cannot be completed",
                    job->id);
         return;
     }
 
+    job_unlock();
     job->driver->complete(job, errp);
+    job_lock();
 }
 
 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
@@ -1152,10 +1198,13 @@  int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
         return -EBUSY;
     }
 
+    job_unlock();
     AIO_WAIT_WHILE(job->aio_context,
                    (job_enter(job), !job_is_completed(job)));
+    job_lock();
 
-    ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
+    ret = (job_is_cancelled_locked(job) && job->ret == 0)
+          ? -ECANCELED : job->ret;
     job_unref(job);
     return ret;
 }
diff --git a/monitor/qmp-cmds.c b/monitor/qmp-cmds.c
index 206d9a8c7b..97a16efcdc 100644
--- a/monitor/qmp-cmds.c
+++ b/monitor/qmp-cmds.c
@@ -134,8 +134,10 @@  void qmp_cont(Error **errp)
         blk_iostatus_reset(blk);
     }
 
-    for (job = block_job_next(NULL); job; job = block_job_next(job)) {
-        block_job_iostatus_reset(job);
+    WITH_JOB_LOCK_GUARD() {
+        for (job = block_job_next(NULL); job; job = block_job_next(job)) {
+            block_job_iostatus_reset(job);
+        }
     }
 
     /* Continuing after completed migration. Images have been inactivated to
diff --git a/qemu-img.c b/qemu-img.c
index 6fe2466032..320d82b42a 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -906,25 +906,30 @@  static void run_block_job(BlockJob *job, Error **errp)
     int ret = 0;
 
     aio_context_acquire(aio_context);
-    job_ref(&job->job);
-    do {
-        float progress = 0.0f;
-        aio_poll(aio_context, true);
+    WITH_JOB_LOCK_GUARD() {
+        job_ref(&job->job);
+        do {
+            float progress = 0.0f;
+            job_unlock();
+            aio_poll(aio_context, true);
+
+            progress_get_snapshot(&job->job.progress, &progress_current,
+                                &progress_total);
+            if (progress_total) {
+                progress = (float)progress_current / progress_total * 100.f;
+            }
+            qemu_progress_print(progress, 0);
+            job_lock();
+        } while (!job_is_ready_locked(&job->job) &&
+                 !job_is_completed_locked(&job->job));
 
-        progress_get_snapshot(&job->job.progress, &progress_current,
-                              &progress_total);
-        if (progress_total) {
-            progress = (float)progress_current / progress_total * 100.f;
+        if (!job_is_completed_locked(&job->job)) {
+            ret = job_complete_sync(&job->job, errp);
+        } else {
+            ret = job->job.ret;
         }
-        qemu_progress_print(progress, 0);
-    } while (!job_is_ready(&job->job) && !job_is_completed(&job->job));
-
-    if (!job_is_completed(&job->job)) {
-        ret = job_complete_sync(&job->job, errp);
-    } else {
-        ret = job->job.ret;
+        job_unref(&job->job);
     }
-    job_unref(&job->job);
     aio_context_release(aio_context);
 
     /* publish completion progress only when success */
@@ -1077,7 +1082,9 @@  static int img_commit(int argc, char **argv)
         bdrv_ref(bs);
     }
 
-    job = block_job_get("commit");
+    WITH_JOB_LOCK_GUARD() {
+        job = block_job_get("commit");
+    }
     assert(job);
     run_block_job(job, &local_err);
     if (local_err) {