Message ID | 8b15ab3d4fe51b792897ffc87e221bfb9317a836.1571905346.git.jag.raman@oracle.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Initial support of multi-process qemu | expand |
On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote: > Collect the VMSD from remote process on the source and save > it to the channel leading to the destination > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> > Signed-off-by: John G Johnson <john.g.johnson@oracle.com> > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com> > --- > New patch in v4 > > hw/proxy/qemu-proxy.c | 132 ++++++++++++++++++++++++++++++++++++++++++ > include/hw/proxy/qemu-proxy.h | 2 + > include/io/mpqemu-link.h | 1 + > 3 files changed, 135 insertions(+) > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c > index 623a6c5..ce72e6a 100644 > --- a/hw/proxy/qemu-proxy.c > +++ b/hw/proxy/qemu-proxy.c > @@ -52,6 +52,14 @@ > #include "util/event_notifier-posix.c" > #include "hw/boards.h" > #include "include/qemu/log.h" > +#include "io/channel.h" > +#include "migration/qemu-file-types.h" > +#include "qapi/error.h" > +#include "io/channel-util.h" > +#include "migration/qemu-file-channel.h" > +#include "migration/qemu-file.h" > +#include "migration/migration.h" > +#include "migration/vmstate.h" > > QEMUTimer *hb_timer; > static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); > @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); > static void broadcast_msg(MPQemuMsg *msg, bool need_reply); > > +#define PAGE_SIZE getpagesize() > +uint8_t *mig_data; > + > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) > { > /* TODO: Add proper handler. */ > @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) > dev->mem_init = false; > } > > +typedef struct { > + QEMUFile *rem; > + PCIProxyDev *dev; > +} proxy_mig_data; > + > +static void *proxy_mig_out(void *opaque) > +{ > + proxy_mig_data *data = opaque; > + PCIProxyDev *dev = data->dev; > + uint8_t byte; > + uint64_t data_size = PAGE_SIZE; > + > + mig_data = g_malloc(data_size); > + > + while (true) { > + byte = qemu_get_byte(data->rem); There is a pretty large set of APIs hiding behind the qemu_get_byte call, which does not give me confidence that... > + mig_data[dev->migsize++] = byte; > + if (dev->migsize == data_size) { > + data_size += PAGE_SIZE; > + mig_data = g_realloc(mig_data, data_size); > + } > + } > + > + return NULL; > +} > + > +static int proxy_pre_save(void *opaque) > +{ > + PCIProxyDev *pdev = opaque; > + proxy_mig_data *mig_data; > + QEMUFile *f_remote; > + MPQemuMsg msg = {0}; > + QemuThread thread; > + Error *err = NULL; > + QIOChannel *ioc; > + uint64_t size; > + int fd[2]; > + > + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { > + return -1; > + } > + > + ioc = qio_channel_new_fd(fd[0], &err); > + if (err) { > + error_report_err(err); > + return -1; > + } > + > + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); > + > + f_remote = qemu_fopen_channel_input(ioc); > + > + pdev->migsize = 0; > + > + mig_data = g_malloc0(sizeof(proxy_mig_data)); > + mig_data->rem = f_remote; > + mig_data->dev = pdev; > + > + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, > + QEMU_THREAD_DETACHED); > + > + msg.cmd = START_MIG_OUT; > + msg.bytestream = 0; > + msg.num_fds = 2; > + msg.fds[0] = fd[1]; > + msg.fds[1] = GET_REMOTE_WAIT; > + > + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); > + size = wait_for_remote(msg.fds[1]); > + PUT_REMOTE_WAIT(msg.fds[1]); > + > + assert(size != ULLONG_MAX); > + > + /* > + * migsize is being update by a separate thread. Using volatile to > + * instruct the compiler to fetch the value of this variable from > + * memory during every read > + */ > + while (*((volatile uint64_t *)&pdev->migsize) < size) { > + } > + > + qemu_thread_cancel(&thread); ....this is a safe way to stop the thread executing without resulting in memory being leaked. In addition thread cancellation is asynchronous, so the thread may still be using the QEMUFile object while.... > + qemu_fclose(f_remote); ..this is closing it. This feels like it is a crash danger. > + close(fd[1]); > + > + return 0; > +} Regards, Daniel
On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote: > On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote: >> Collect the VMSD from remote process on the source and save >> it to the channel leading to the destination >> >> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> >> Signed-off-by: John G Johnson <john.g.johnson@oracle.com> >> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com> >> --- >> New patch in v4 >> >> hw/proxy/qemu-proxy.c | 132 ++++++++++++++++++++++++++++++++++++++++++ >> include/hw/proxy/qemu-proxy.h | 2 + >> include/io/mpqemu-link.h | 1 + >> 3 files changed, 135 insertions(+) >> >> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c >> index 623a6c5..ce72e6a 100644 >> --- a/hw/proxy/qemu-proxy.c >> +++ b/hw/proxy/qemu-proxy.c >> @@ -52,6 +52,14 @@ >> #include "util/event_notifier-posix.c" >> #include "hw/boards.h" >> #include "include/qemu/log.h" >> +#include "io/channel.h" >> +#include "migration/qemu-file-types.h" >> +#include "qapi/error.h" >> +#include "io/channel-util.h" >> +#include "migration/qemu-file-channel.h" >> +#include "migration/qemu-file.h" >> +#include "migration/migration.h" >> +#include "migration/vmstate.h" >> >> QEMUTimer *hb_timer; >> static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); >> @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); >> static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); >> static void broadcast_msg(MPQemuMsg *msg, bool need_reply); >> >> +#define PAGE_SIZE getpagesize() >> +uint8_t *mig_data; >> + >> static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) >> { >> /* TODO: Add proper handler. */ >> @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) >> dev->mem_init = false; >> } >> >> +typedef struct { >> + QEMUFile *rem; >> + PCIProxyDev *dev; >> +} proxy_mig_data; >> + >> +static void *proxy_mig_out(void *opaque) >> +{ >> + proxy_mig_data *data = opaque; >> + PCIProxyDev *dev = data->dev; >> + uint8_t byte; >> + uint64_t data_size = PAGE_SIZE; >> + >> + mig_data = g_malloc(data_size); >> + >> + while (true) { >> + byte = qemu_get_byte(data->rem); > > There is a pretty large set of APIs hiding behind the qemu_get_byte > call, which does not give me confidence that... > >> + mig_data[dev->migsize++] = byte; >> + if (dev->migsize == data_size) { >> + data_size += PAGE_SIZE; >> + mig_data = g_realloc(mig_data, data_size); >> + } >> + } >> + >> + return NULL; >> +} >> + >> +static int proxy_pre_save(void *opaque) >> +{ >> + PCIProxyDev *pdev = opaque; >> + proxy_mig_data *mig_data; >> + QEMUFile *f_remote; >> + MPQemuMsg msg = {0}; >> + QemuThread thread; >> + Error *err = NULL; >> + QIOChannel *ioc; >> + uint64_t size; >> + int fd[2]; >> + >> + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { >> + return -1; >> + } >> + >> + ioc = qio_channel_new_fd(fd[0], &err); >> + if (err) { >> + error_report_err(err); >> + return -1; >> + } >> + >> + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); >> + >> + f_remote = qemu_fopen_channel_input(ioc); >> + >> + pdev->migsize = 0; >> + >> + mig_data = g_malloc0(sizeof(proxy_mig_data)); >> + mig_data->rem = f_remote; >> + mig_data->dev = pdev; >> + >> + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, >> + QEMU_THREAD_DETACHED); >> + >> + msg.cmd = START_MIG_OUT; >> + msg.bytestream = 0; >> + msg.num_fds = 2; >> + msg.fds[0] = fd[1]; >> + msg.fds[1] = GET_REMOTE_WAIT; >> + >> + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); >> + size = wait_for_remote(msg.fds[1]); >> + PUT_REMOTE_WAIT(msg.fds[1]); >> + >> + assert(size != ULLONG_MAX); >> + >> + /* >> + * migsize is being update by a separate thread. Using volatile to >> + * instruct the compiler to fetch the value of this variable from >> + * memory during every read >> + */ >> + while (*((volatile uint64_t *)&pdev->migsize) < size) { >> + } >> + >> + qemu_thread_cancel(&thread); > > ....this is a safe way to stop the thread executing without > resulting in memory being leaked. > > In addition thread cancellation is asynchronous, so the thread > may still be using the QEMUFile object while.... > >> + qemu_fclose(f_remote); The above "wait_for_remote()" call waits for the remote process to finish with Migration, and return the size of the VMSD. It should be safe to cancel the thread and close the file, once the remote process is done sending the VMSD and we have read "size" bytes from it, is it not? Thank you very much! -- Jag > > ..this is closing it. This feels like it is a crash danger. > > >> + close(fd[1]); >> + >> + return 0; >> +} > > Regards, > Daniel >
On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote: > > > On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote: > > On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote: > > > Collect the VMSD from remote process on the source and save > > > it to the channel leading to the destination > > > > > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> > > > Signed-off-by: John G Johnson <john.g.johnson@oracle.com> > > > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com> > > > --- > > > New patch in v4 > > > > > > hw/proxy/qemu-proxy.c | 132 ++++++++++++++++++++++++++++++++++++++++++ > > > include/hw/proxy/qemu-proxy.h | 2 + > > > include/io/mpqemu-link.h | 1 + > > > 3 files changed, 135 insertions(+) > > > > > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c > > > index 623a6c5..ce72e6a 100644 > > > --- a/hw/proxy/qemu-proxy.c > > > +++ b/hw/proxy/qemu-proxy.c > > > @@ -52,6 +52,14 @@ > > > #include "util/event_notifier-posix.c" > > > #include "hw/boards.h" > > > #include "include/qemu/log.h" > > > +#include "io/channel.h" > > > +#include "migration/qemu-file-types.h" > > > +#include "qapi/error.h" > > > +#include "io/channel-util.h" > > > +#include "migration/qemu-file-channel.h" > > > +#include "migration/qemu-file.h" > > > +#include "migration/migration.h" > > > +#include "migration/vmstate.h" > > > QEMUTimer *hb_timer; > > > static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); > > > @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); > > > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); > > > static void broadcast_msg(MPQemuMsg *msg, bool need_reply); > > > +#define PAGE_SIZE getpagesize() > > > +uint8_t *mig_data; > > > + > > > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) > > > { > > > /* TODO: Add proper handler. */ > > > @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) > > > dev->mem_init = false; > > > } > > > +typedef struct { > > > + QEMUFile *rem; > > > + PCIProxyDev *dev; > > > +} proxy_mig_data; > > > + > > > +static void *proxy_mig_out(void *opaque) > > > +{ > > > + proxy_mig_data *data = opaque; > > > + PCIProxyDev *dev = data->dev; > > > + uint8_t byte; > > > + uint64_t data_size = PAGE_SIZE; > > > + > > > + mig_data = g_malloc(data_size); > > > + > > > + while (true) { > > > + byte = qemu_get_byte(data->rem); > > > > There is a pretty large set of APIs hiding behind the qemu_get_byte > > call, which does not give me confidence that... > > > > > + mig_data[dev->migsize++] = byte; > > > + if (dev->migsize == data_size) { > > > + data_size += PAGE_SIZE; > > > + mig_data = g_realloc(mig_data, data_size); > > > + } > > > + } > > > + > > > + return NULL; > > > +} > > > + > > > +static int proxy_pre_save(void *opaque) > > > +{ > > > + PCIProxyDev *pdev = opaque; > > > + proxy_mig_data *mig_data; > > > + QEMUFile *f_remote; > > > + MPQemuMsg msg = {0}; > > > + QemuThread thread; > > > + Error *err = NULL; > > > + QIOChannel *ioc; > > > + uint64_t size; > > > + int fd[2]; > > > + > > > + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { > > > + return -1; > > > + } > > > + > > > + ioc = qio_channel_new_fd(fd[0], &err); > > > + if (err) { > > > + error_report_err(err); > > > + return -1; > > > + } > > > + > > > + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); > > > + > > > + f_remote = qemu_fopen_channel_input(ioc); > > > + > > > + pdev->migsize = 0; > > > + > > > + mig_data = g_malloc0(sizeof(proxy_mig_data)); > > > + mig_data->rem = f_remote; > > > + mig_data->dev = pdev; > > > + > > > + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, > > > + QEMU_THREAD_DETACHED); > > > + > > > + msg.cmd = START_MIG_OUT; > > > + msg.bytestream = 0; > > > + msg.num_fds = 2; > > > + msg.fds[0] = fd[1]; > > > + msg.fds[1] = GET_REMOTE_WAIT; > > > + > > > + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); > > > + size = wait_for_remote(msg.fds[1]); > > > + PUT_REMOTE_WAIT(msg.fds[1]); > > > + > > > + assert(size != ULLONG_MAX); > > > + > > > + /* > > > + * migsize is being update by a separate thread. Using volatile to > > > + * instruct the compiler to fetch the value of this variable from > > > + * memory during every read > > > + */ > > > + while (*((volatile uint64_t *)&pdev->migsize) < size) { > > > + } > > > + > > > + qemu_thread_cancel(&thread); > > > > ....this is a safe way to stop the thread executing without > > resulting in memory being leaked. > > > > In addition thread cancellation is asynchronous, so the thread > > may still be using the QEMUFile object while.... > > > > > + qemu_fclose(f_remote); > > The above "wait_for_remote()" call waits for the remote process to > finish with Migration, and return the size of the VMSD. > > It should be safe to cancel the thread and close the file, once the > remote process is done sending the VMSD and we have read "size" bytes > from it, is it not? Ok, so the thread is doing while (true) { byte = qemu_get_byte(data->rem); ...do something with byte... } so when the thread is cancelled it is almost certainly in the qemu_get_byte() call. Since you say wait_for_remote() syncs with the end of migration, I'll presume there's no more data to be read but the file is still open. If we're using a blocking FD here we'll probably be stuck in read() when we're cancelled, and cancellation would probably be ok from looking at the current impl of QEMUFile / QIOChannel. If we're handling any error scenario though there could be a "Error *local_err" that needs freeing before cancellation. If the fclose is processed before cancellation takes affect on the target thread though we could have a race. 1. proxy_mig_out blocked in read from qemu_fill_buffer 2. main thread request async cancel 3. main thread calls qemu_fclose which closes the FD and free's the QEMUFile object 4. proxy_mig_out thread returns from read() with ret == 0 (EOF) 5. proxy_mig_out thread calls qemu_file_set_error_obj on a QEMUFole object free'd in (3). use after free. opps 6. ..async cancel request gets delivered.... admittedly it is fairly unlikely for the async cancel to be delayed for so long that this sequence happens, but unexpected things can happen when we really don't want them. IMHO the safe way to deal with this would be a lock-step sequence between the threads 1. proxy_mig_out blocked in read from qemu_fill_buffer 2. main thread closes the FD with qemu_file_shutdown() closing both directions 3. proxy_mig_out returns from read with ret == 0 (EOF) 4. proxy_mig_out thread breaks out of its inifinite loop due to EOF and exits 5. main thread calls pthread_join on proxy_mig_out 6. main thread calls qemu_fclose() this is easier to reason about the safety of than the cancel based approach IMHO. Regards, Daniel
On 11/13/2019 12:11 PM, Daniel P. Berrangé wrote: > On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote: >> >> >> On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote: >>> On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote: >>>> Collect the VMSD from remote process on the source and save >>>> it to the channel leading to the destination >>>> >>>> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> >>>> Signed-off-by: John G Johnson <john.g.johnson@oracle.com> >>>> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com> >>>> --- >>>> New patch in v4 >>>> >>>> hw/proxy/qemu-proxy.c | 132 ++++++++++++++++++++++++++++++++++++++++++ >>>> include/hw/proxy/qemu-proxy.h | 2 + >>>> include/io/mpqemu-link.h | 1 + >>>> 3 files changed, 135 insertions(+) >>>> >>>> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c >>>> index 623a6c5..ce72e6a 100644 >>>> --- a/hw/proxy/qemu-proxy.c >>>> +++ b/hw/proxy/qemu-proxy.c >>>> @@ -52,6 +52,14 @@ >>>> #include "util/event_notifier-posix.c" >>>> #include "hw/boards.h" >>>> #include "include/qemu/log.h" >>>> +#include "io/channel.h" >>>> +#include "migration/qemu-file-types.h" >>>> +#include "qapi/error.h" >>>> +#include "io/channel-util.h" >>>> +#include "migration/qemu-file-channel.h" >>>> +#include "migration/qemu-file.h" >>>> +#include "migration/migration.h" >>>> +#include "migration/vmstate.h" >>>> QEMUTimer *hb_timer; >>>> static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); >>>> @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); >>>> static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); >>>> static void broadcast_msg(MPQemuMsg *msg, bool need_reply); >>>> +#define PAGE_SIZE getpagesize() >>>> +uint8_t *mig_data; >>>> + >>>> static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) >>>> { >>>> /* TODO: Add proper handler. */ >>>> @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) >>>> dev->mem_init = false; >>>> } >>>> +typedef struct { >>>> + QEMUFile *rem; >>>> + PCIProxyDev *dev; >>>> +} proxy_mig_data; >>>> + >>>> +static void *proxy_mig_out(void *opaque) >>>> +{ >>>> + proxy_mig_data *data = opaque; >>>> + PCIProxyDev *dev = data->dev; >>>> + uint8_t byte; >>>> + uint64_t data_size = PAGE_SIZE; >>>> + >>>> + mig_data = g_malloc(data_size); >>>> + >>>> + while (true) { >>>> + byte = qemu_get_byte(data->rem); >>> >>> There is a pretty large set of APIs hiding behind the qemu_get_byte >>> call, which does not give me confidence that... >>> >>>> + mig_data[dev->migsize++] = byte; >>>> + if (dev->migsize == data_size) { >>>> + data_size += PAGE_SIZE; >>>> + mig_data = g_realloc(mig_data, data_size); >>>> + } >>>> + } >>>> + >>>> + return NULL; >>>> +} >>>> + >>>> +static int proxy_pre_save(void *opaque) >>>> +{ >>>> + PCIProxyDev *pdev = opaque; >>>> + proxy_mig_data *mig_data; >>>> + QEMUFile *f_remote; >>>> + MPQemuMsg msg = {0}; >>>> + QemuThread thread; >>>> + Error *err = NULL; >>>> + QIOChannel *ioc; >>>> + uint64_t size; >>>> + int fd[2]; >>>> + >>>> + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { >>>> + return -1; >>>> + } >>>> + >>>> + ioc = qio_channel_new_fd(fd[0], &err); >>>> + if (err) { >>>> + error_report_err(err); >>>> + return -1; >>>> + } >>>> + >>>> + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); >>>> + >>>> + f_remote = qemu_fopen_channel_input(ioc); >>>> + >>>> + pdev->migsize = 0; >>>> + >>>> + mig_data = g_malloc0(sizeof(proxy_mig_data)); >>>> + mig_data->rem = f_remote; >>>> + mig_data->dev = pdev; >>>> + >>>> + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, >>>> + QEMU_THREAD_DETACHED); >>>> + >>>> + msg.cmd = START_MIG_OUT; >>>> + msg.bytestream = 0; >>>> + msg.num_fds = 2; >>>> + msg.fds[0] = fd[1]; >>>> + msg.fds[1] = GET_REMOTE_WAIT; >>>> + >>>> + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); >>>> + size = wait_for_remote(msg.fds[1]); >>>> + PUT_REMOTE_WAIT(msg.fds[1]); >>>> + >>>> + assert(size != ULLONG_MAX); >>>> + >>>> + /* >>>> + * migsize is being update by a separate thread. Using volatile to >>>> + * instruct the compiler to fetch the value of this variable from >>>> + * memory during every read >>>> + */ >>>> + while (*((volatile uint64_t *)&pdev->migsize) < size) { >>>> + } >>>> + >>>> + qemu_thread_cancel(&thread); >>> >>> ....this is a safe way to stop the thread executing without >>> resulting in memory being leaked. >>> >>> In addition thread cancellation is asynchronous, so the thread >>> may still be using the QEMUFile object while.... >>> >>>> + qemu_fclose(f_remote); >> >> The above "wait_for_remote()" call waits for the remote process to >> finish with Migration, and return the size of the VMSD. >> >> It should be safe to cancel the thread and close the file, once the >> remote process is done sending the VMSD and we have read "size" bytes >> from it, is it not? > > Ok, so the thread is doing > > while (true) { > byte = qemu_get_byte(data->rem); > ...do something with byte... > } > > so when the thread is cancelled it is almost certainly in the > qemu_get_byte() call. Since you say wait_for_remote() syncs > with the end of migration, I'll presume there's no more data > to be read but the file is still open. > > If we're using a blocking FD here we'll probably be stuck in > read() when we're cancelled, and cancellation would probably > be ok from looking at the current impl of QEMUFile / QIOChannel. > If we're handling any error scenario though there could be a > "Error *local_err" that needs freeing before cancellation. > > If the fclose is processed before cancellation takes affect > on the target thread though we could have a race. > > 1. proxy_mig_out blocked in read from qemu_fill_buffer > > 2. main thread request async cancel > > 3. main thread calls qemu_fclose which closes the FD > and free's the QEMUFile object > > 4. proxy_mig_out thread returns from read() with > ret == 0 (EOF) This wasn't happening. It would be convenient if it did. When the file was closed by the main thread, the async thread was still hung at qemu_fill_buffer(), instead of returning 0 (EOF). That's reason why we took the thread-cancellation route. We'd be glad to remove qemu_thread_cancel(). > > 5. proxy_mig_out thread calls qemu_file_set_error_obj > on a QEMUFole object free'd in (3). use after free. opps > > 6. ..async cancel request gets delivered.... > > admittedly it is fairly unlikely for the async cancel > to be delayed for so long that this sequence happens, but > unexpected things can happen when we really don't want them. Absolutely, we don't want to leave anything to chance. > > IMHO the safe way to deal with this would be a lock-step > sequence between the threads > > 1. proxy_mig_out blocked in read from qemu_fill_buffer > > 2. main thread closes the FD with qemu_file_shutdown() > closing both directions Will give qemu_file_shutdown() a try. Thank you! -- Jag > > 3. proxy_mig_out returns from read with ret == 0 (EOF) > > 4. proxy_mig_out thread breaks out of its inifinite loop > due to EOF and exits > > 5. main thread calls pthread_join on proxy_mig_out > > 6. main thread calls qemu_fclose() > > this is easier to reason about the safety of than the cancel based > approach IMHO. > > Regards, > Daniel >
* Jag Raman (jag.raman@oracle.com) wrote: > > > On 11/13/2019 12:11 PM, Daniel P. Berrangé wrote: > > On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote: > > > > > > > > > On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote: > > > > On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote: > > > > > Collect the VMSD from remote process on the source and save > > > > > it to the channel leading to the destination > > > > > > > > > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com> > > > > > Signed-off-by: John G Johnson <john.g.johnson@oracle.com> > > > > > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com> > > > > > --- > > > > > New patch in v4 > > > > > > > > > > hw/proxy/qemu-proxy.c | 132 ++++++++++++++++++++++++++++++++++++++++++ > > > > > include/hw/proxy/qemu-proxy.h | 2 + > > > > > include/io/mpqemu-link.h | 1 + > > > > > 3 files changed, 135 insertions(+) > > > > > > > > > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c > > > > > index 623a6c5..ce72e6a 100644 > > > > > --- a/hw/proxy/qemu-proxy.c > > > > > +++ b/hw/proxy/qemu-proxy.c > > > > > @@ -52,6 +52,14 @@ > > > > > #include "util/event_notifier-posix.c" > > > > > #include "hw/boards.h" > > > > > #include "include/qemu/log.h" > > > > > +#include "io/channel.h" > > > > > +#include "migration/qemu-file-types.h" > > > > > +#include "qapi/error.h" > > > > > +#include "io/channel-util.h" > > > > > +#include "migration/qemu-file-channel.h" > > > > > +#include "migration/qemu-file.h" > > > > > +#include "migration/migration.h" > > > > > +#include "migration/vmstate.h" > > > > > QEMUTimer *hb_timer; > > > > > static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); > > > > > @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); > > > > > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); > > > > > static void broadcast_msg(MPQemuMsg *msg, bool need_reply); > > > > > +#define PAGE_SIZE getpagesize() > > > > > +uint8_t *mig_data; > > > > > + > > > > > static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) > > > > > { > > > > > /* TODO: Add proper handler. */ > > > > > @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) > > > > > dev->mem_init = false; > > > > > } > > > > > +typedef struct { > > > > > + QEMUFile *rem; > > > > > + PCIProxyDev *dev; > > > > > +} proxy_mig_data; > > > > > + > > > > > +static void *proxy_mig_out(void *opaque) > > > > > +{ > > > > > + proxy_mig_data *data = opaque; > > > > > + PCIProxyDev *dev = data->dev; > > > > > + uint8_t byte; > > > > > + uint64_t data_size = PAGE_SIZE; > > > > > + > > > > > + mig_data = g_malloc(data_size); > > > > > + > > > > > + while (true) { > > > > > + byte = qemu_get_byte(data->rem); > > > > > > > > There is a pretty large set of APIs hiding behind the qemu_get_byte > > > > call, which does not give me confidence that... > > > > > > > > > + mig_data[dev->migsize++] = byte; > > > > > + if (dev->migsize == data_size) { > > > > > + data_size += PAGE_SIZE; > > > > > + mig_data = g_realloc(mig_data, data_size); > > > > > + } > > > > > + } > > > > > + > > > > > + return NULL; > > > > > +} > > > > > + > > > > > +static int proxy_pre_save(void *opaque) > > > > > +{ > > > > > + PCIProxyDev *pdev = opaque; > > > > > + proxy_mig_data *mig_data; > > > > > + QEMUFile *f_remote; > > > > > + MPQemuMsg msg = {0}; > > > > > + QemuThread thread; > > > > > + Error *err = NULL; > > > > > + QIOChannel *ioc; > > > > > + uint64_t size; > > > > > + int fd[2]; > > > > > + > > > > > + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { > > > > > + return -1; > > > > > + } > > > > > + > > > > > + ioc = qio_channel_new_fd(fd[0], &err); > > > > > + if (err) { > > > > > + error_report_err(err); > > > > > + return -1; > > > > > + } > > > > > + > > > > > + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); > > > > > + > > > > > + f_remote = qemu_fopen_channel_input(ioc); > > > > > + > > > > > + pdev->migsize = 0; > > > > > + > > > > > + mig_data = g_malloc0(sizeof(proxy_mig_data)); > > > > > + mig_data->rem = f_remote; > > > > > + mig_data->dev = pdev; > > > > > + > > > > > + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, > > > > > + QEMU_THREAD_DETACHED); > > > > > + > > > > > + msg.cmd = START_MIG_OUT; > > > > > + msg.bytestream = 0; > > > > > + msg.num_fds = 2; > > > > > + msg.fds[0] = fd[1]; > > > > > + msg.fds[1] = GET_REMOTE_WAIT; > > > > > + > > > > > + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); > > > > > + size = wait_for_remote(msg.fds[1]); > > > > > + PUT_REMOTE_WAIT(msg.fds[1]); > > > > > + > > > > > + assert(size != ULLONG_MAX); > > > > > + > > > > > + /* > > > > > + * migsize is being update by a separate thread. Using volatile to > > > > > + * instruct the compiler to fetch the value of this variable from > > > > > + * memory during every read > > > > > + */ > > > > > + while (*((volatile uint64_t *)&pdev->migsize) < size) { > > > > > + } > > > > > + > > > > > + qemu_thread_cancel(&thread); > > > > > > > > ....this is a safe way to stop the thread executing without > > > > resulting in memory being leaked. > > > > > > > > In addition thread cancellation is asynchronous, so the thread > > > > may still be using the QEMUFile object while.... > > > > > > > > > + qemu_fclose(f_remote); > > > > > > The above "wait_for_remote()" call waits for the remote process to > > > finish with Migration, and return the size of the VMSD. > > > > > > It should be safe to cancel the thread and close the file, once the > > > remote process is done sending the VMSD and we have read "size" bytes > > > from it, is it not? > > > > Ok, so the thread is doing > > > > while (true) { > > byte = qemu_get_byte(data->rem); > > ...do something with byte... > > } > > > > so when the thread is cancelled it is almost certainly in the > > qemu_get_byte() call. Since you say wait_for_remote() syncs > > with the end of migration, I'll presume there's no more data > > to be read but the file is still open. > > > > If we're using a blocking FD here we'll probably be stuck in > > read() when we're cancelled, and cancellation would probably > > be ok from looking at the current impl of QEMUFile / QIOChannel. > > If we're handling any error scenario though there could be a > > "Error *local_err" that needs freeing before cancellation. > > > > If the fclose is processed before cancellation takes affect > > on the target thread though we could have a race. > > > > 1. proxy_mig_out blocked in read from qemu_fill_buffer > > > > 2. main thread request async cancel > > > > 3. main thread calls qemu_fclose which closes the FD > > and free's the QEMUFile object > > > > 4. proxy_mig_out thread returns from read() with > > ret == 0 (EOF) > > This wasn't happening. It would be convenient if it did. > > When the file was closed by the main thread, the async thread was still > hung at qemu_fill_buffer(), instead of returning 0 (EOF). That's reason > why we took the thread-cancellation route. We'd be glad to remove > qemu_thread_cancel(). > > > > > 5. proxy_mig_out thread calls qemu_file_set_error_obj > > on a QEMUFole object free'd in (3). use after free. opps > > > > 6. ..async cancel request gets delivered.... > > > > admittedly it is fairly unlikely for the async cancel > > to be delayed for so long that this sequence happens, but > > unexpected things can happen when we really don't want them. > > Absolutely, we don't want to leave anything to chance. > > > > > IMHO the safe way to deal with this would be a lock-step > > sequence between the threads > > > > 1. proxy_mig_out blocked in read from qemu_fill_buffer > > 2. main thread closes the FD with qemu_file_shutdown() > > closing both directions > > Will give qemu_file_shutdown() a try. Yes, shutdown() is quite nice - but note it does need to be a socket. Dave > Thank you! > -- > Jag > > > > > 3. proxy_mig_out returns from read with ret == 0 (EOF) > > > > 4. proxy_mig_out thread breaks out of its inifinite loop > > due to EOF and exits > > > > 5. main thread calls pthread_join on proxy_mig_out > > > > 6. main thread calls qemu_fclose() > > > > this is easier to reason about the safety of than the cancel based > > approach IMHO. > > > > Regards, > > Daniel > > > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c index 623a6c5..ce72e6a 100644 --- a/hw/proxy/qemu-proxy.c +++ b/hw/proxy/qemu-proxy.c @@ -52,6 +52,14 @@ #include "util/event_notifier-posix.c" #include "hw/boards.h" #include "include/qemu/log.h" +#include "io/channel.h" +#include "migration/qemu-file-types.h" +#include "qapi/error.h" +#include "io/channel-util.h" +#include "migration/qemu-file-channel.h" +#include "migration/qemu-file.h" +#include "migration/migration.h" +#include "migration/vmstate.h" QEMUTimer *hb_timer; static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp); @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void); static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx); static void broadcast_msg(MPQemuMsg *msg, bool need_reply); +#define PAGE_SIZE getpagesize() +uint8_t *mig_data; + static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx) { /* TODO: Add proper handler. */ @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj) dev->mem_init = false; } +typedef struct { + QEMUFile *rem; + PCIProxyDev *dev; +} proxy_mig_data; + +static void *proxy_mig_out(void *opaque) +{ + proxy_mig_data *data = opaque; + PCIProxyDev *dev = data->dev; + uint8_t byte; + uint64_t data_size = PAGE_SIZE; + + mig_data = g_malloc(data_size); + + while (true) { + byte = qemu_get_byte(data->rem); + mig_data[dev->migsize++] = byte; + if (dev->migsize == data_size) { + data_size += PAGE_SIZE; + mig_data = g_realloc(mig_data, data_size); + } + } + + return NULL; +} + +static int proxy_pre_save(void *opaque) +{ + PCIProxyDev *pdev = opaque; + proxy_mig_data *mig_data; + QEMUFile *f_remote; + MPQemuMsg msg = {0}; + QemuThread thread; + Error *err = NULL; + QIOChannel *ioc; + uint64_t size; + int fd[2]; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) { + return -1; + } + + ioc = qio_channel_new_fd(fd[0], &err); + if (err) { + error_report_err(err); + return -1; + } + + qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig"); + + f_remote = qemu_fopen_channel_input(ioc); + + pdev->migsize = 0; + + mig_data = g_malloc0(sizeof(proxy_mig_data)); + mig_data->rem = f_remote; + mig_data->dev = pdev; + + qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data, + QEMU_THREAD_DETACHED); + + msg.cmd = START_MIG_OUT; + msg.bytestream = 0; + msg.num_fds = 2; + msg.fds[0] = fd[1]; + msg.fds[1] = GET_REMOTE_WAIT; + + mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com); + size = wait_for_remote(msg.fds[1]); + PUT_REMOTE_WAIT(msg.fds[1]); + + assert(size != ULLONG_MAX); + + /* + * migsize is being update by a separate thread. Using volatile to + * instruct the compiler to fetch the value of this variable from + * memory during every read + */ + while (*((volatile uint64_t *)&pdev->migsize) < size) { + } + + qemu_thread_cancel(&thread); + + qemu_fclose(f_remote); + close(fd[1]); + + return 0; +} + +static int proxy_post_save(void *opaque) +{ + MigrationState *ms = migrate_get_current(); + PCIProxyDev *pdev = opaque; + uint64_t pos = 0; + + while (pos < pdev->migsize) { + qemu_put_byte(ms->to_dst_file, mig_data[pos]); + pos++; + } + + qemu_fflush(ms->to_dst_file); + + return 0; +} + +const VMStateDescription vmstate_pci_proxy_device = { + .name = "PCIProxyDevice", + .version_id = 2, + .minimum_version_id = 1, + .pre_save = proxy_pre_save, + .post_save = proxy_post_save, + .fields = (VMStateField[]) { + VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev), + VMSTATE_UINT64(migsize, PCIProxyDev), + VMSTATE_END_OF_LIST() + } +}; + static void pci_proxy_dev_class_init(ObjectClass *klass, void *data) { PCIDeviceClass *k = PCI_DEVICE_CLASS(klass); + DeviceClass *dc = DEVICE_CLASS(klass); k->realize = pci_proxy_dev_realize; k->exit = pci_dev_exit; k->config_read = pci_proxy_read_config; k->config_write = pci_proxy_write_config; + + dc->vmsd = &vmstate_pci_proxy_device; } static const TypeInfo pci_proxy_dev_type_info = { diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h index 17e07ac..b122e6d 100644 --- a/include/hw/proxy/qemu-proxy.h +++ b/include/hw/proxy/qemu-proxy.h @@ -89,6 +89,8 @@ struct PCIProxyDev { void (*init_proxy) (PCIDevice *dev, char *command, bool need_spawn, Error **errp); ProxyMemoryRegion region[PCI_NUM_REGIONS]; + + uint64_t migsize; }; typedef struct PCIProxyDevClass { diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h index 6fcc6f5..0ed7750 100644 --- a/include/io/mpqemu-link.h +++ b/include/io/mpqemu-link.h @@ -75,6 +75,7 @@ typedef enum { PROXY_PING, MMIO_RETURN, DEVICE_RESET, + START_MIG_OUT, MAX, } mpqemu_cmd_t;