diff mbox series

[v2,19/23] migration/multifd: Cleanup multifd_save_cleanup()

Message ID 20240202102857.110210-20-peterx@redhat.com (mailing list archive)
State New, archived
Headers show
Series migration/multifd: Refactor ->send_prepare() and cleanups | expand

Commit Message

Peter Xu Feb. 2, 2024, 10:28 a.m. UTC
From: Peter Xu <peterx@redhat.com>

Shrink the function by moving relevant works into helpers: move the thread
join()s into multifd_send_terminate_threads(), then create two more helpers
to cover channel/state cleanups.

Add a TODO entry for the thread terminate process because p->running is
still buggy.  We need to fix it at some point but not yet covered.

Suggested-by: Fabiano Rosas <farosas@suse.de>
Signed-off-by: Peter Xu <peterx@redhat.com>
---
 migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
 1 file changed, 59 insertions(+), 32 deletions(-)

Comments

Fabiano Rosas Feb. 2, 2024, 8:54 p.m. UTC | #1
peterx@redhat.com writes:

> From: Peter Xu <peterx@redhat.com>
>
> Shrink the function by moving relevant works into helpers: move the thread
> join()s into multifd_send_terminate_threads(), then create two more helpers
> to cover channel/state cleanups.
>
> Add a TODO entry for the thread terminate process because p->running is
> still buggy.  We need to fix it at some point but not yet covered.
>
> Suggested-by: Fabiano Rosas <farosas@suse.de>
> Signed-off-by: Peter Xu <peterx@redhat.com>

Reviewed-by: Fabiano Rosas <farosas@suse.de>

minor comment below

> ---
>  migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
>  1 file changed, 59 insertions(+), 32 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 4ab8e6eff2..4cb0d2cc17 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
>       * always set it.
>       */
>      qatomic_set(&multifd_send_state->exiting, 1);
> +
> +    /*
> +     * Firstly, kick all threads out; no matter whether they are just idle,
> +     * or blocked in an IO system call.
> +     */
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
>              qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
>          }
>      }
> +
> +    /*
> +     * Finally recycle all the threads.
> +     *
> +     * TODO: p->running is still buggy, e.g. we can reach here without the
> +     * corresponding multifd_new_send_channel_async() get invoked yet,
> +     * then a new thread can even be created after this function returns.
> +     */

Series on the list:

https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de

> +    for (i = 0; i < migrate_multifd_channels(); i++) {
> +        MultiFDSendParams *p = &multifd_send_state->params[i];
> +
> +        if (p->running) {
> +            qemu_thread_join(&p->thread);
> +        }
> +    }
>  }
>  
>  static int multifd_send_channel_destroy(QIOChannel *send)
> @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send)
>      return socket_send_channel_destroy(send);
>  }
>  
> +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> +{
> +    if (p->registered_yank) {
> +        migration_ioc_unregister_yank(p->c);
> +    }
> +    multifd_send_channel_destroy(p->c);
> +    p->c = NULL;
> +    qemu_mutex_destroy(&p->mutex);
> +    qemu_sem_destroy(&p->sem);
> +    qemu_sem_destroy(&p->sem_sync);
> +    g_free(p->name);
> +    p->name = NULL;
> +    multifd_pages_clear(p->pages);
> +    p->pages = NULL;
> +    p->packet_len = 0;
> +    g_free(p->packet);
> +    p->packet = NULL;
> +    g_free(p->iov);
> +    p->iov = NULL;
> +    multifd_send_state->ops->send_cleanup(p, errp);
> +
> +    return *errp == NULL;

I think technically this would require the ERRP_GUARD() macro?

> +}
> +
> +static void multifd_send_cleanup_state(void)
> +{
> +    qemu_sem_destroy(&multifd_send_state->channels_ready);
> +    g_free(multifd_send_state->params);
> +    multifd_send_state->params = NULL;
> +    multifd_pages_clear(multifd_send_state->pages);
> +    multifd_send_state->pages = NULL;
> +    g_free(multifd_send_state);
> +    multifd_send_state = NULL;
> +}
> +
>  void multifd_save_cleanup(void)
>  {
>      int i;
> @@ -615,48 +670,20 @@ void multifd_save_cleanup(void)
>      if (!migrate_multifd()) {
>          return;
>      }
> +
>      multifd_send_terminate_threads();
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> -        MultiFDSendParams *p = &multifd_send_state->params[i];
>  
> -        if (p->running) {
> -            qemu_thread_join(&p->thread);
> -        }
> -    }
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
>  
> -        if (p->registered_yank) {
> -            migration_ioc_unregister_yank(p->c);
> -        }
> -        multifd_send_channel_destroy(p->c);
> -        p->c = NULL;
> -        qemu_mutex_destroy(&p->mutex);
> -        qemu_sem_destroy(&p->sem);
> -        qemu_sem_destroy(&p->sem_sync);
> -        g_free(p->name);
> -        p->name = NULL;
> -        multifd_pages_clear(p->pages);
> -        p->pages = NULL;
> -        p->packet_len = 0;
> -        g_free(p->packet);
> -        p->packet = NULL;
> -        g_free(p->iov);
> -        p->iov = NULL;
> -        multifd_send_state->ops->send_cleanup(p, &local_err);
> -        if (local_err) {
> +        if (!multifd_send_cleanup_channel(p, &local_err)) {
>              migrate_set_error(migrate_get_current(), local_err);
>              error_free(local_err);
>          }
>      }
> -    qemu_sem_destroy(&multifd_send_state->channels_ready);
> -    g_free(multifd_send_state->params);
> -    multifd_send_state->params = NULL;
> -    multifd_pages_clear(multifd_send_state->pages);
> -    multifd_send_state->pages = NULL;
> -    g_free(multifd_send_state);
> -    multifd_send_state = NULL;
> +
> +    multifd_send_cleanup_state();
>  }
>  
>  static int multifd_zero_copy_flush(QIOChannel *c)
Peter Xu Feb. 5, 2024, 4:25 a.m. UTC | #2
On Fri, Feb 02, 2024 at 05:54:23PM -0300, Fabiano Rosas wrote:
> peterx@redhat.com writes:
> 
> > From: Peter Xu <peterx@redhat.com>
> >
> > Shrink the function by moving relevant works into helpers: move the thread
> > join()s into multifd_send_terminate_threads(), then create two more helpers
> > to cover channel/state cleanups.
> >
> > Add a TODO entry for the thread terminate process because p->running is
> > still buggy.  We need to fix it at some point but not yet covered.
> >
> > Suggested-by: Fabiano Rosas <farosas@suse.de>
> > Signed-off-by: Peter Xu <peterx@redhat.com>
> 
> Reviewed-by: Fabiano Rosas <farosas@suse.de>
> 
> minor comment below
> 
> > ---
> >  migration/multifd.c | 91 +++++++++++++++++++++++++++++----------------
> >  1 file changed, 59 insertions(+), 32 deletions(-)
> >
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 4ab8e6eff2..4cb0d2cc17 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void)
> >       * always set it.
> >       */
> >      qatomic_set(&multifd_send_state->exiting, 1);
> > +
> > +    /*
> > +     * Firstly, kick all threads out; no matter whether they are just idle,
> > +     * or blocked in an IO system call.
> > +     */
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >  
> > @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void)
> >              qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
> >          }
> >      }
> > +
> > +    /*
> > +     * Finally recycle all the threads.
> > +     *
> > +     * TODO: p->running is still buggy, e.g. we can reach here without the
> > +     * corresponding multifd_new_send_channel_async() get invoked yet,
> > +     * then a new thread can even be created after this function returns.
> > +     */
> 
> Series on the list:
> 
> https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de

Thanks a lot.  I'll read it later today.

> 
> > +    for (i = 0; i < migrate_multifd_channels(); i++) {
> > +        MultiFDSendParams *p = &multifd_send_state->params[i];
> > +
> > +        if (p->running) {
> > +            qemu_thread_join(&p->thread);
> > +        }
> > +    }
> >  }
> >  
> >  static int multifd_send_channel_destroy(QIOChannel *send)
> > @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send)
> >      return socket_send_channel_destroy(send);
> >  }
> >  
> > +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> > +{
> > +    if (p->registered_yank) {
> > +        migration_ioc_unregister_yank(p->c);
> > +    }
> > +    multifd_send_channel_destroy(p->c);
> > +    p->c = NULL;
> > +    qemu_mutex_destroy(&p->mutex);
> > +    qemu_sem_destroy(&p->sem);
> > +    qemu_sem_destroy(&p->sem_sync);
> > +    g_free(p->name);
> > +    p->name = NULL;
> > +    multifd_pages_clear(p->pages);
> > +    p->pages = NULL;
> > +    p->packet_len = 0;
> > +    g_free(p->packet);
> > +    p->packet = NULL;
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> > +    multifd_send_state->ops->send_cleanup(p, errp);
> > +
> > +    return *errp == NULL;
> 
> I think technically this would require the ERRP_GUARD() macro?

I normally only use ERRP_GUARD() if there can be any caller passing in
NULL, or when I am not sure it's always !NULL.

What I wanted to add here is actually assert(errp), but then I noticed
*errp==NULL plays the same role as that, because if errp==NULL, it'll crash
here when dereferencing, so it actually has an implicit assert(errp);
exactly what I wanted, but even one line less (even if not obvious).
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 4ab8e6eff2..4cb0d2cc17 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -593,6 +593,11 @@  static void multifd_send_terminate_threads(void)
      * always set it.
      */
     qatomic_set(&multifd_send_state->exiting, 1);
+
+    /*
+     * Firstly, kick all threads out; no matter whether they are just idle,
+     * or blocked in an IO system call.
+     */
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -601,6 +606,21 @@  static void multifd_send_terminate_threads(void)
             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
     }
+
+    /*
+     * Finally recycle all the threads.
+     *
+     * TODO: p->running is still buggy, e.g. we can reach here without the
+     * corresponding multifd_new_send_channel_async() get invoked yet,
+     * then a new thread can even be created after this function returns.
+     */
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        if (p->running) {
+            qemu_thread_join(&p->thread);
+        }
+    }
 }
 
 static int multifd_send_channel_destroy(QIOChannel *send)
@@ -608,6 +628,41 @@  static int multifd_send_channel_destroy(QIOChannel *send)
     return socket_send_channel_destroy(send);
 }
 
+static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
+{
+    if (p->registered_yank) {
+        migration_ioc_unregister_yank(p->c);
+    }
+    multifd_send_channel_destroy(p->c);
+    p->c = NULL;
+    qemu_mutex_destroy(&p->mutex);
+    qemu_sem_destroy(&p->sem);
+    qemu_sem_destroy(&p->sem_sync);
+    g_free(p->name);
+    p->name = NULL;
+    multifd_pages_clear(p->pages);
+    p->pages = NULL;
+    p->packet_len = 0;
+    g_free(p->packet);
+    p->packet = NULL;
+    g_free(p->iov);
+    p->iov = NULL;
+    multifd_send_state->ops->send_cleanup(p, errp);
+
+    return *errp == NULL;
+}
+
+static void multifd_send_cleanup_state(void)
+{
+    qemu_sem_destroy(&multifd_send_state->channels_ready);
+    g_free(multifd_send_state->params);
+    multifd_send_state->params = NULL;
+    multifd_pages_clear(multifd_send_state->pages);
+    multifd_send_state->pages = NULL;
+    g_free(multifd_send_state);
+    multifd_send_state = NULL;
+}
+
 void multifd_save_cleanup(void)
 {
     int i;
@@ -615,48 +670,20 @@  void multifd_save_cleanup(void)
     if (!migrate_multifd()) {
         return;
     }
+
     multifd_send_terminate_threads();
-    for (i = 0; i < migrate_multifd_channels(); i++) {
-        MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        if (p->running) {
-            qemu_thread_join(&p->thread);
-        }
-    }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
 
-        if (p->registered_yank) {
-            migration_ioc_unregister_yank(p->c);
-        }
-        multifd_send_channel_destroy(p->c);
-        p->c = NULL;
-        qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem);
-        qemu_sem_destroy(&p->sem_sync);
-        g_free(p->name);
-        p->name = NULL;
-        multifd_pages_clear(p->pages);
-        p->pages = NULL;
-        p->packet_len = 0;
-        g_free(p->packet);
-        p->packet = NULL;
-        g_free(p->iov);
-        p->iov = NULL;
-        multifd_send_state->ops->send_cleanup(p, &local_err);
-        if (local_err) {
+        if (!multifd_send_cleanup_channel(p, &local_err)) {
             migrate_set_error(migrate_get_current(), local_err);
             error_free(local_err);
         }
     }
-    qemu_sem_destroy(&multifd_send_state->channels_ready);
-    g_free(multifd_send_state->params);
-    multifd_send_state->params = NULL;
-    multifd_pages_clear(multifd_send_state->pages);
-    multifd_send_state->pages = NULL;
-    g_free(multifd_send_state);
-    multifd_send_state = NULL;
+
+    multifd_send_cleanup_state();
 }
 
 static int multifd_zero_copy_flush(QIOChannel *c)