Message ID | 20240202102857.110210-20-peterx@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | migration/multifd: Refactor ->send_prepare() and cleanups | expand |
peterx@redhat.com writes: > From: Peter Xu <peterx@redhat.com> > > Shrink the function by moving relevant works into helpers: move the thread > join()s into multifd_send_terminate_threads(), then create two more helpers > to cover channel/state cleanups. > > Add a TODO entry for the thread terminate process because p->running is > still buggy. We need to fix it at some point but not yet covered. > > Suggested-by: Fabiano Rosas <farosas@suse.de> > Signed-off-by: Peter Xu <peterx@redhat.com> Reviewed-by: Fabiano Rosas <farosas@suse.de> minor comment below > --- > migration/multifd.c | 91 +++++++++++++++++++++++++++++---------------- > 1 file changed, 59 insertions(+), 32 deletions(-) > > diff --git a/migration/multifd.c b/migration/multifd.c > index 4ab8e6eff2..4cb0d2cc17 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void) > * always set it. > */ > qatomic_set(&multifd_send_state->exiting, 1); > + > + /* > + * Firstly, kick all threads out; no matter whether they are just idle, > + * or blocked in an IO system call. > + */ > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void) > qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); > } > } > + > + /* > + * Finally recycle all the threads. > + * > + * TODO: p->running is still buggy, e.g. we can reach here without the > + * corresponding multifd_new_send_channel_async() get invoked yet, > + * then a new thread can even be created after this function returns. > + */ Series on the list: https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDSendParams *p = &multifd_send_state->params[i]; > + > + if (p->running) { > + qemu_thread_join(&p->thread); > + } > + } > } > > static int multifd_send_channel_destroy(QIOChannel *send) > @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send) > return socket_send_channel_destroy(send); > } > > +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) > +{ > + if (p->registered_yank) { > + migration_ioc_unregister_yank(p->c); > + } > + multifd_send_channel_destroy(p->c); > + p->c = NULL; > + qemu_mutex_destroy(&p->mutex); > + qemu_sem_destroy(&p->sem); > + qemu_sem_destroy(&p->sem_sync); > + g_free(p->name); > + p->name = NULL; > + multifd_pages_clear(p->pages); > + p->pages = NULL; > + p->packet_len = 0; > + g_free(p->packet); > + p->packet = NULL; > + g_free(p->iov); > + p->iov = NULL; > + multifd_send_state->ops->send_cleanup(p, errp); > + > + return *errp == NULL; I think technically this would require the ERRP_GUARD() macro? > +} > + > +static void multifd_send_cleanup_state(void) > +{ > + qemu_sem_destroy(&multifd_send_state->channels_ready); > + g_free(multifd_send_state->params); > + multifd_send_state->params = NULL; > + multifd_pages_clear(multifd_send_state->pages); > + multifd_send_state->pages = NULL; > + g_free(multifd_send_state); > + multifd_send_state = NULL; > +} > + > void multifd_save_cleanup(void) > { > int i; > @@ -615,48 +670,20 @@ void multifd_save_cleanup(void) > if (!migrate_multifd()) { > return; > } > + > multifd_send_terminate_threads(); > - for (i = 0; i < migrate_multifd_channels(); i++) { > - MultiFDSendParams *p = &multifd_send_state->params[i]; > > - if (p->running) { > - qemu_thread_join(&p->thread); > - } > - } > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > Error *local_err = NULL; > > - if (p->registered_yank) { > - migration_ioc_unregister_yank(p->c); > - } > - multifd_send_channel_destroy(p->c); > - p->c = NULL; > - qemu_mutex_destroy(&p->mutex); > - qemu_sem_destroy(&p->sem); > - qemu_sem_destroy(&p->sem_sync); > - g_free(p->name); > - p->name = NULL; > - multifd_pages_clear(p->pages); > - p->pages = NULL; > - p->packet_len = 0; > - g_free(p->packet); > - p->packet = NULL; > - g_free(p->iov); > - p->iov = NULL; > - multifd_send_state->ops->send_cleanup(p, &local_err); > - if (local_err) { > + if (!multifd_send_cleanup_channel(p, &local_err)) { > migrate_set_error(migrate_get_current(), local_err); > error_free(local_err); > } > } > - qemu_sem_destroy(&multifd_send_state->channels_ready); > - g_free(multifd_send_state->params); > - multifd_send_state->params = NULL; > - multifd_pages_clear(multifd_send_state->pages); > - multifd_send_state->pages = NULL; > - g_free(multifd_send_state); > - multifd_send_state = NULL; > + > + multifd_send_cleanup_state(); > } > > static int multifd_zero_copy_flush(QIOChannel *c)
On Fri, Feb 02, 2024 at 05:54:23PM -0300, Fabiano Rosas wrote: > peterx@redhat.com writes: > > > From: Peter Xu <peterx@redhat.com> > > > > Shrink the function by moving relevant works into helpers: move the thread > > join()s into multifd_send_terminate_threads(), then create two more helpers > > to cover channel/state cleanups. > > > > Add a TODO entry for the thread terminate process because p->running is > > still buggy. We need to fix it at some point but not yet covered. > > > > Suggested-by: Fabiano Rosas <farosas@suse.de> > > Signed-off-by: Peter Xu <peterx@redhat.com> > > Reviewed-by: Fabiano Rosas <farosas@suse.de> > > minor comment below > > > --- > > migration/multifd.c | 91 +++++++++++++++++++++++++++++---------------- > > 1 file changed, 59 insertions(+), 32 deletions(-) > > > > diff --git a/migration/multifd.c b/migration/multifd.c > > index 4ab8e6eff2..4cb0d2cc17 100644 > > --- a/migration/multifd.c > > +++ b/migration/multifd.c > > @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void) > > * always set it. > > */ > > qatomic_set(&multifd_send_state->exiting, 1); > > + > > + /* > > + * Firstly, kick all threads out; no matter whether they are just idle, > > + * or blocked in an IO system call. > > + */ > > for (i = 0; i < migrate_multifd_channels(); i++) { > > MultiFDSendParams *p = &multifd_send_state->params[i]; > > > > @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void) > > qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); > > } > > } > > + > > + /* > > + * Finally recycle all the threads. > > + * > > + * TODO: p->running is still buggy, e.g. we can reach here without the > > + * corresponding multifd_new_send_channel_async() get invoked yet, > > + * then a new thread can even be created after this function returns. > > + */ > > Series on the list: > > https://lore.kernel.org/r/20240202191128.1901-1-farosas@suse.de Thanks a lot. I'll read it later today. > > > + for (i = 0; i < migrate_multifd_channels(); i++) { > > + MultiFDSendParams *p = &multifd_send_state->params[i]; > > + > > + if (p->running) { > > + qemu_thread_join(&p->thread); > > + } > > + } > > } > > > > static int multifd_send_channel_destroy(QIOChannel *send) > > @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send) > > return socket_send_channel_destroy(send); > > } > > > > +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) > > +{ > > + if (p->registered_yank) { > > + migration_ioc_unregister_yank(p->c); > > + } > > + multifd_send_channel_destroy(p->c); > > + p->c = NULL; > > + qemu_mutex_destroy(&p->mutex); > > + qemu_sem_destroy(&p->sem); > > + qemu_sem_destroy(&p->sem_sync); > > + g_free(p->name); > > + p->name = NULL; > > + multifd_pages_clear(p->pages); > > + p->pages = NULL; > > + p->packet_len = 0; > > + g_free(p->packet); > > + p->packet = NULL; > > + g_free(p->iov); > > + p->iov = NULL; > > + multifd_send_state->ops->send_cleanup(p, errp); > > + > > + return *errp == NULL; > > I think technically this would require the ERRP_GUARD() macro? I normally only use ERRP_GUARD() if there can be any caller passing in NULL, or when I am not sure it's always !NULL. What I wanted to add here is actually assert(errp), but then I noticed *errp==NULL plays the same role as that, because if errp==NULL, it'll crash here when dereferencing, so it actually has an implicit assert(errp); exactly what I wanted, but even one line less (even if not obvious).
diff --git a/migration/multifd.c b/migration/multifd.c index 4ab8e6eff2..4cb0d2cc17 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -593,6 +593,11 @@ static void multifd_send_terminate_threads(void) * always set it. */ qatomic_set(&multifd_send_state->exiting, 1); + + /* + * Firstly, kick all threads out; no matter whether they are just idle, + * or blocked in an IO system call. + */ for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -601,6 +606,21 @@ static void multifd_send_terminate_threads(void) qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); } } + + /* + * Finally recycle all the threads. + * + * TODO: p->running is still buggy, e.g. we can reach here without the + * corresponding multifd_new_send_channel_async() get invoked yet, + * then a new thread can even be created after this function returns. + */ + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + if (p->running) { + qemu_thread_join(&p->thread); + } + } } static int multifd_send_channel_destroy(QIOChannel *send) @@ -608,6 +628,41 @@ static int multifd_send_channel_destroy(QIOChannel *send) return socket_send_channel_destroy(send); } +static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) +{ + if (p->registered_yank) { + migration_ioc_unregister_yank(p->c); + } + multifd_send_channel_destroy(p->c); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + g_free(p->iov); + p->iov = NULL; + multifd_send_state->ops->send_cleanup(p, errp); + + return *errp == NULL; +} + +static void multifd_send_cleanup_state(void) +{ + qemu_sem_destroy(&multifd_send_state->channels_ready); + g_free(multifd_send_state->params); + multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; + g_free(multifd_send_state); + multifd_send_state = NULL; +} + void multifd_save_cleanup(void) { int i; @@ -615,48 +670,20 @@ void multifd_save_cleanup(void) if (!migrate_multifd()) { return; } + multifd_send_terminate_threads(); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - if (p->running) { - qemu_thread_join(&p->thread); - } - } for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; Error *local_err = NULL; - if (p->registered_yank) { - migration_ioc_unregister_yank(p->c); - } - multifd_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - g_free(p->iov); - p->iov = NULL; - multifd_send_state->ops->send_cleanup(p, &local_err); - if (local_err) { + if (!multifd_send_cleanup_channel(p, &local_err)) { migrate_set_error(migrate_get_current(), local_err); error_free(local_err); } } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; + + multifd_send_cleanup_state(); } static int multifd_zero_copy_flush(QIOChannel *c)