Message ID | 1487006388-7966-8-git-send-email-quintela@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Mon, Feb 13, 2017 at 06:19:43PM +0100, Juan Quintela wrote: > We create new channels for each new thread created. We only send through > them a character to be sure that we are creating the channels in the > right order. > > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > include/migration/migration.h | 7 +++++ > migration/ram.c | 33 ++++++++++++++++++++++ > migration/socket.c | 64 +++++++++++++++++++++++++++++++++++++++++-- > 3 files changed, 101 insertions(+), 3 deletions(-) [snip] > diff --git a/migration/socket.c b/migration/socket.c > index 13966f1..1c764f1 100644 > --- a/migration/socket.c > +++ b/migration/socket.c > @@ -24,6 +24,62 @@ > #include "io/channel-socket.h" > #include "trace.h" > > +struct SocketArgs { > + QIOChannelSocket *ioc; > + SocketAddress *saddr; > + Error **errp; > +} socket_args; > + > +QIOChannel *socket_recv_channel_create(void) > +{ > + QIOChannelSocket *sioc; > + Error *err = NULL; > + > + sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc), > + &err); > + if (!sioc) { > + error_report("could not accept migration connection (%s)", > + error_get_pretty(err)); > + return NULL; > + } > + return QIO_CHANNEL(sioc); > +} > + > +int socket_recv_channel_destroy(QIOChannel *recv) > +{ > + /* Remove channel */ > + object_unref(OBJECT(send)); > + return 0; > +} > + > +/* we have created all the recv channels, we can close the main one */ > +int socket_recv_channel_close_listening(void) > +{ > + /* Close listening socket as its no longer needed */ > + qio_channel_close(QIO_CHANNEL(socket_args.ioc), NULL); > + return 0; > +} > + > +QIOChannel *socket_send_channel_create(void) > +{ > + QIOChannelSocket *sioc = qio_channel_socket_new(); > + > + qio_channel_socket_connect_sync(sioc, socket_args.saddr, > + socket_args.errp); > + qio_channel_set_delay(QIO_CHANNEL(sioc), false); > + return QIO_CHANNEL(sioc); > +} > + > +int socket_send_channel_destroy(QIOChannel *send) > +{ > + /* Remove channel */ > + object_unref(OBJECT(send)); > + if (socket_args.saddr) { > + qapi_free_SocketAddress(socket_args.saddr); > + socket_args.saddr = NULL; > + } > + return 0; > +} > > static SocketAddress *tcp_build_address(const char *host_port, Error **errp) > { > @@ -97,6 +153,10 @@ static void socket_start_outgoing_migration(MigrationState *s, > struct SocketConnectData *data = g_new0(struct SocketConnectData, 1); > > data->s = s; > + > + socket_args.saddr = saddr; > + socket_args.errp = errp; > + > if (saddr->type == SOCKET_ADDRESS_KIND_INET) { > data->hostname = g_strdup(saddr->u.inet.data->host); > } > @@ -107,7 +167,6 @@ static void socket_start_outgoing_migration(MigrationState *s, > socket_outgoing_migration, > data, > socket_connect_data_free); > - qapi_free_SocketAddress(saddr); > } > > void tcp_start_outgoing_migration(MigrationState *s, > @@ -154,8 +213,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc, > object_unref(OBJECT(sioc)); > > out: > - /* Close listening socket as its no longer needed */ > - qio_channel_close(ioc, NULL); > return FALSE; /* unregister */ > } > > @@ -164,6 +221,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr, > Error **errp) > { > QIOChannelSocket *listen_ioc = qio_channel_socket_new(); > + socket_args.ioc = listen_ioc; > > qio_channel_set_name(QIO_CHANNEL(listen_ioc), > "migration-socket-listener"); FYI I put some comments against v3 on this patch just as you sent this v4, as I don't think the changes here are desirable in this format. Regards, Daniel
On 13/02/2017 18:19, Juan Quintela wrote: > + qemu_sem_init(&p->init, 0); > p->quit = false; > + p->c = socket_send_channel_create(); > + if (!p->c) { > + error_report("Error creating a send channel"); > + exit(0); > + } > snprintf(thread_name, 15, "multifd_send_%d", i); > qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p, > QEMU_THREAD_JOINABLE); > + qemu_sem_wait(&p->init); Why do you need p->init here? Could initialization proceed in parallel for all the threads? Paolo
Paolo Bonzini <pbonzini@redhat.com> wrote: > On 13/02/2017 18:19, Juan Quintela wrote: >> + qemu_sem_init(&p->init, 0); >> p->quit = false; >> + p->c = socket_send_channel_create(); >> + if (!p->c) { >> + error_report("Error creating a send channel"); >> + exit(0); >> + } >> snprintf(thread_name, 15, "multifd_send_%d", i); >> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p, >> QEMU_THREAD_JOINABLE); >> + qemu_sem_wait(&p->init); > > Why do you need p->init here? Could initialization proceed in parallel > for all the threads? We need to make sure that the send thread number 2 goes to thread number 2 on destination. Yes, we could do a more complicated algorithm, but we really care so much about this initialization time? Later, Juan.
On 14/02/2017 14:12, Juan Quintela wrote: >> On 13/02/2017 18:19, Juan Quintela wrote: >>> + qemu_sem_init(&p->init, 0); >>> p->quit = false; >>> + p->c = socket_send_channel_create(); >>> + if (!p->c) { >>> + error_report("Error creating a send channel"); >>> + exit(0); >>> + } >>> snprintf(thread_name, 15, "multifd_send_%d", i); >>> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p, >>> QEMU_THREAD_JOINABLE); >>> + qemu_sem_wait(&p->init); >> Why do you need p->init here? Could initialization proceed in parallel >> for all the threads? > > We need to make sure that the send thread number 2 goes to thread number > 2 on destination. Yes, we could do a more complicated algorithm, but we > really care so much about this initialization time? I was wondering if p->init was needed in general, so it would simplify. But without a design document I cannot really understand the logic---as I said, I don't really grok the need for RAM_SAVE_FLAG_MULTIFD_PAGE. Paolo
Paolo Bonzini <pbonzini@redhat.com> wrote: > On 14/02/2017 14:12, Juan Quintela wrote: >>> On 13/02/2017 18:19, Juan Quintela wrote: >>>> + qemu_sem_init(&p->init, 0); >>>> p->quit = false; >>>> + p->c = socket_send_channel_create(); >>>> + if (!p->c) { >>>> + error_report("Error creating a send channel"); >>>> + exit(0); >>>> + } >>>> snprintf(thread_name, 15, "multifd_send_%d", i); >>>> qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p, >>>> QEMU_THREAD_JOINABLE); >>>> + qemu_sem_wait(&p->init); >>> Why do you need p->init here? Could initialization proceed in parallel >>> for all the threads? >> >> We need to make sure that the send thread number 2 goes to thread number >> 2 on destination. Yes, we could do a more complicated algorithm, but we >> really care so much about this initialization time? > > I was wondering if p->init was needed in general, so it would simplify. > But without a design document I cannot really understand the logic---as > I said, I don't really grok the need for RAM_SAVE_FLAG_MULTIFD_PAGE. will get some general documentation in /doc/. Basically what we had on the only stream was: {[page header][page]}+ And we moved to: {[page header]+[where to receive]}: on the principal stream [page]+: on the rest of the multifd All nicely aligned and so. My understanding is that we could optimize the receiving with splice to not even touch userspace? (that part is not done). That was the reason why I didn't want to put header's footers there. As the headers are so small compared with the pages payload, the transmission of them should be lost on the noise, no? Later, Juan.
On 14/02/2017 14:52, Juan Quintela wrote: > will get some general documentation in /doc/. > > Basically what we had on the only stream was: > > > {[page header][page]}+ > > > And we moved to: > > {[page header]+[where to receive]}: on the principal stream > > [page]+: on the rest of the multifd > > All nicely aligned and so. > > My understanding is that we could optimize the receiving with splice to > not even touch userspace? (that part is not done). The frames are not going to be aligned (MTU is usually 1500 or 9000), and the extra synchronization cost might nullify any speedup. Even the send side can in principle be made completely independent, by scanning the bitmap in each thread. > That was the reason > why I didn't want to put header's footers there. As the headers are so > small compared with the pages payload, the transmission of them should > be lost on the noise, no? The transmission may be, but the cost of having one more active socket + possibly the cost of Nagle's algorithm on that one socket + the cost of synchronization can be nontrivial. Paolo
diff --git a/include/migration/migration.h b/include/migration/migration.h index 13fac75..ff890b5 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -22,6 +22,7 @@ #include "qapi-types.h" #include "exec/cpu-common.h" #include "qemu/coroutine_int.h" +#include "io/channel.h" #define QEMU_VM_FILE_MAGIC 0x5145564d #define QEMU_VM_FILE_VERSION_COMPAT 0x00000002 @@ -224,6 +225,12 @@ void tcp_start_incoming_migration(const char *host_port, Error **errp); void tcp_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp); +QIOChannel *socket_recv_channel_create(void); +int socket_recv_channel_destroy(QIOChannel *recv); +int socket_recv_channel_close_listening(void); +QIOChannel *socket_send_channel_create(void); +int socket_send_channel_destroy(QIOChannel *send); + void unix_start_incoming_migration(const char *path, Error **errp); void unix_start_outgoing_migration(MigrationState *s, const char *path, Error **errp); diff --git a/migration/ram.c b/migration/ram.c index 0cb19cf..b101a59 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -386,7 +386,9 @@ void migrate_compress_threads_create(void) struct MultiFDSendParams { QemuThread thread; + QIOChannel *c; QemuSemaphore sem; + QemuSemaphore init; QemuMutex mutex; bool quit; }; @@ -397,6 +399,10 @@ static MultiFDSendParams *multifd_send; static void *multifd_send_thread(void *opaque) { MultiFDSendParams *params = opaque; + char start = 's'; + + qio_channel_write(params->c, &start, 1, &error_abort); + qemu_sem_post(¶ms->init); while (true) { qemu_mutex_lock(¶ms->mutex); @@ -441,7 +447,10 @@ void migrate_multifd_send_threads_join(void) qemu_thread_join(&p->thread); qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->init); + socket_send_channel_destroy(p->c); } + g_free(multifd_send); multifd_send = NULL; } @@ -461,15 +470,24 @@ void migrate_multifd_send_threads_create(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->init, 0); p->quit = false; + p->c = socket_send_channel_create(); + if (!p->c) { + error_report("Error creating a send channel"); + exit(0); + } snprintf(thread_name, 15, "multifd_send_%d", i); qemu_thread_create(&p->thread, thread_name, multifd_send_thread, p, QEMU_THREAD_JOINABLE); + qemu_sem_wait(&p->init); } } struct MultiFDRecvParams { QemuThread thread; + QIOChannel *c; + QemuSemaphore init; QemuSemaphore sem; QemuMutex mutex; bool quit; @@ -481,6 +499,10 @@ static MultiFDRecvParams *multifd_recv; static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *params = opaque; + char start; + + qio_channel_read(params->c, &start, 1, &error_abort); + qemu_sem_post(¶ms->init); while (true) { qemu_mutex_lock(¶ms->mutex); @@ -525,6 +547,8 @@ void migrate_multifd_recv_threads_join(void) qemu_thread_join(&p->thread); qemu_mutex_destroy(&p->mutex); qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->init); + socket_send_channel_destroy(multifd_recv[i].c); } g_free(multifd_recv); multifd_recv = NULL; @@ -544,10 +568,19 @@ void migrate_multifd_recv_threads_create(void) qemu_mutex_init(&p->mutex); qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->init, 0); p->quit = false; + p->c = socket_recv_channel_create(); + + if (!p->c) { + error_report("Error creating a recv channel"); + exit(0); + } qemu_thread_create(&p->thread, "multifd_recv", multifd_recv_thread, p, QEMU_THREAD_JOINABLE); + qemu_sem_wait(&p->init); } + socket_recv_channel_close_listening(); } /** diff --git a/migration/socket.c b/migration/socket.c index 13966f1..1c764f1 100644 --- a/migration/socket.c +++ b/migration/socket.c @@ -24,6 +24,62 @@ #include "io/channel-socket.h" #include "trace.h" +struct SocketArgs { + QIOChannelSocket *ioc; + SocketAddress *saddr; + Error **errp; +} socket_args; + +QIOChannel *socket_recv_channel_create(void) +{ + QIOChannelSocket *sioc; + Error *err = NULL; + + sioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(socket_args.ioc), + &err); + if (!sioc) { + error_report("could not accept migration connection (%s)", + error_get_pretty(err)); + return NULL; + } + return QIO_CHANNEL(sioc); +} + +int socket_recv_channel_destroy(QIOChannel *recv) +{ + /* Remove channel */ + object_unref(OBJECT(send)); + return 0; +} + +/* we have created all the recv channels, we can close the main one */ +int socket_recv_channel_close_listening(void) +{ + /* Close listening socket as its no longer needed */ + qio_channel_close(QIO_CHANNEL(socket_args.ioc), NULL); + return 0; +} + +QIOChannel *socket_send_channel_create(void) +{ + QIOChannelSocket *sioc = qio_channel_socket_new(); + + qio_channel_socket_connect_sync(sioc, socket_args.saddr, + socket_args.errp); + qio_channel_set_delay(QIO_CHANNEL(sioc), false); + return QIO_CHANNEL(sioc); +} + +int socket_send_channel_destroy(QIOChannel *send) +{ + /* Remove channel */ + object_unref(OBJECT(send)); + if (socket_args.saddr) { + qapi_free_SocketAddress(socket_args.saddr); + socket_args.saddr = NULL; + } + return 0; +} static SocketAddress *tcp_build_address(const char *host_port, Error **errp) { @@ -97,6 +153,10 @@ static void socket_start_outgoing_migration(MigrationState *s, struct SocketConnectData *data = g_new0(struct SocketConnectData, 1); data->s = s; + + socket_args.saddr = saddr; + socket_args.errp = errp; + if (saddr->type == SOCKET_ADDRESS_KIND_INET) { data->hostname = g_strdup(saddr->u.inet.data->host); } @@ -107,7 +167,6 @@ static void socket_start_outgoing_migration(MigrationState *s, socket_outgoing_migration, data, socket_connect_data_free); - qapi_free_SocketAddress(saddr); } void tcp_start_outgoing_migration(MigrationState *s, @@ -154,8 +213,6 @@ static gboolean socket_accept_incoming_migration(QIOChannel *ioc, object_unref(OBJECT(sioc)); out: - /* Close listening socket as its no longer needed */ - qio_channel_close(ioc, NULL); return FALSE; /* unregister */ } @@ -164,6 +221,7 @@ static void socket_start_incoming_migration(SocketAddress *saddr, Error **errp) { QIOChannelSocket *listen_ioc = qio_channel_socket_new(); + socket_args.ioc = listen_ioc; qio_channel_set_name(QIO_CHANNEL(listen_ioc), "migration-socket-listener");
We create new channels for each new thread created. We only send through them a character to be sure that we are creating the channels in the right order. Signed-off-by: Juan Quintela <quintela@redhat.com> --- include/migration/migration.h | 7 +++++ migration/ram.c | 33 ++++++++++++++++++++++ migration/socket.c | 64 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 101 insertions(+), 3 deletions(-)