diff mbox series

[RFC,v4,41/49] multi-process/mig: Enable VMSD save in the Proxy object

Message ID 8b15ab3d4fe51b792897ffc87e221bfb9317a836.1571905346.git.jag.raman@oracle.com (mailing list archive)
State New, archived
Headers show
Series Initial support of multi-process qemu | expand

Commit Message

Jag Raman Oct. 24, 2019, 9:09 a.m. UTC
Collect the VMSD from remote process on the source and save
it to the channel leading to the destination

Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
---
 New patch in v4

 hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
 include/hw/proxy/qemu-proxy.h |   2 +
 include/io/mpqemu-link.h      |   1 +
 3 files changed, 135 insertions(+)

Comments

Daniel P. Berrangé Nov. 13, 2019, 3:50 p.m. UTC | #1
On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote:
> Collect the VMSD from remote process on the source and save
> it to the channel leading to the destination
> 
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>  New patch in v4
> 
>  hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
>  include/hw/proxy/qemu-proxy.h |   2 +
>  include/io/mpqemu-link.h      |   1 +
>  3 files changed, 135 insertions(+)
> 
> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
> index 623a6c5..ce72e6a 100644
> --- a/hw/proxy/qemu-proxy.c
> +++ b/hw/proxy/qemu-proxy.c
> @@ -52,6 +52,14 @@
>  #include "util/event_notifier-posix.c"
>  #include "hw/boards.h"
>  #include "include/qemu/log.h"
> +#include "io/channel.h"
> +#include "migration/qemu-file-types.h"
> +#include "qapi/error.h"
> +#include "io/channel-util.h"
> +#include "migration/qemu-file-channel.h"
> +#include "migration/qemu-file.h"
> +#include "migration/migration.h"
> +#include "migration/vmstate.h"
>  
>  QEMUTimer *hb_timer;
>  static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
> @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void);
>  static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
>  static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
>  
> +#define PAGE_SIZE getpagesize()
> +uint8_t *mig_data;
> +
>  static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
>  {
>      /* TODO: Add proper handler. */
> @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj)
>      dev->mem_init = false;
>  }
>  
> +typedef struct {
> +    QEMUFile *rem;
> +    PCIProxyDev *dev;
> +} proxy_mig_data;
> +
> +static void *proxy_mig_out(void *opaque)
> +{
> +    proxy_mig_data *data = opaque;
> +    PCIProxyDev *dev = data->dev;
> +    uint8_t byte;
> +    uint64_t data_size = PAGE_SIZE;
> +
> +    mig_data = g_malloc(data_size);
> +
> +    while (true) {
> +        byte = qemu_get_byte(data->rem);

There is a pretty large set of APIs hiding behind the qemu_get_byte
call, which does not give me confidence that...

> +        mig_data[dev->migsize++] = byte;
> +        if (dev->migsize == data_size) {
> +            data_size += PAGE_SIZE;
> +            mig_data = g_realloc(mig_data, data_size);
> +        }
> +    }
> +
> +    return NULL;
> +}
> +
> +static int proxy_pre_save(void *opaque)
> +{
> +    PCIProxyDev *pdev = opaque;
> +    proxy_mig_data *mig_data;
> +    QEMUFile *f_remote;
> +    MPQemuMsg msg = {0};
> +    QemuThread thread;
> +    Error *err = NULL;
> +    QIOChannel *ioc;
> +    uint64_t size;
> +    int fd[2];
> +
> +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
> +        return -1;
> +    }
> +
> +    ioc = qio_channel_new_fd(fd[0], &err);
> +    if (err) {
> +        error_report_err(err);
> +        return -1;
> +    }
> +
> +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
> +
> +    f_remote = qemu_fopen_channel_input(ioc);
> +
> +    pdev->migsize = 0;
> +
> +    mig_data = g_malloc0(sizeof(proxy_mig_data));
> +    mig_data->rem = f_remote;
> +    mig_data->dev = pdev;
> +
> +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
> +                       QEMU_THREAD_DETACHED);
> +
> +    msg.cmd = START_MIG_OUT;
> +    msg.bytestream = 0;
> +    msg.num_fds = 2;
> +    msg.fds[0] = fd[1];
> +    msg.fds[1] = GET_REMOTE_WAIT;
> +
> +    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
> +    size = wait_for_remote(msg.fds[1]);
> +    PUT_REMOTE_WAIT(msg.fds[1]);
> +
> +    assert(size != ULLONG_MAX);
> +
> +    /*
> +     * migsize is being update by a separate thread. Using volatile to
> +     * instruct the compiler to fetch the value of this variable from
> +     * memory during every read
> +     */
> +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
> +    }
> +
> +    qemu_thread_cancel(&thread);

....this is a safe way to stop the thread executing without
resulting in memory being leaked.

In addition thread cancellation is asynchronous, so the thread
may still be using the QEMUFile object while....

> +    qemu_fclose(f_remote);

..this is closing it. This feels like it is a crash danger.


> +    close(fd[1]);
> +
> +    return 0;
> +}

Regards,
Daniel
Jag Raman Nov. 13, 2019, 4:32 p.m. UTC | #2
On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote:
> On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote:
>> Collect the VMSD from remote process on the source and save
>> it to the channel leading to the destination
>>
>> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
>> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
>> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
>> ---
>>   New patch in v4
>>
>>   hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
>>   include/hw/proxy/qemu-proxy.h |   2 +
>>   include/io/mpqemu-link.h      |   1 +
>>   3 files changed, 135 insertions(+)
>>
>> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
>> index 623a6c5..ce72e6a 100644
>> --- a/hw/proxy/qemu-proxy.c
>> +++ b/hw/proxy/qemu-proxy.c
>> @@ -52,6 +52,14 @@
>>   #include "util/event_notifier-posix.c"
>>   #include "hw/boards.h"
>>   #include "include/qemu/log.h"
>> +#include "io/channel.h"
>> +#include "migration/qemu-file-types.h"
>> +#include "qapi/error.h"
>> +#include "io/channel-util.h"
>> +#include "migration/qemu-file-channel.h"
>> +#include "migration/qemu-file.h"
>> +#include "migration/migration.h"
>> +#include "migration/vmstate.h"
>>   
>>   QEMUTimer *hb_timer;
>>   static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
>> @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void);
>>   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
>>   static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
>>   
>> +#define PAGE_SIZE getpagesize()
>> +uint8_t *mig_data;
>> +
>>   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
>>   {
>>       /* TODO: Add proper handler. */
>> @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj)
>>       dev->mem_init = false;
>>   }
>>   
>> +typedef struct {
>> +    QEMUFile *rem;
>> +    PCIProxyDev *dev;
>> +} proxy_mig_data;
>> +
>> +static void *proxy_mig_out(void *opaque)
>> +{
>> +    proxy_mig_data *data = opaque;
>> +    PCIProxyDev *dev = data->dev;
>> +    uint8_t byte;
>> +    uint64_t data_size = PAGE_SIZE;
>> +
>> +    mig_data = g_malloc(data_size);
>> +
>> +    while (true) {
>> +        byte = qemu_get_byte(data->rem);
> 
> There is a pretty large set of APIs hiding behind the qemu_get_byte
> call, which does not give me confidence that...
> 
>> +        mig_data[dev->migsize++] = byte;
>> +        if (dev->migsize == data_size) {
>> +            data_size += PAGE_SIZE;
>> +            mig_data = g_realloc(mig_data, data_size);
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static int proxy_pre_save(void *opaque)
>> +{
>> +    PCIProxyDev *pdev = opaque;
>> +    proxy_mig_data *mig_data;
>> +    QEMUFile *f_remote;
>> +    MPQemuMsg msg = {0};
>> +    QemuThread thread;
>> +    Error *err = NULL;
>> +    QIOChannel *ioc;
>> +    uint64_t size;
>> +    int fd[2];
>> +
>> +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
>> +        return -1;
>> +    }
>> +
>> +    ioc = qio_channel_new_fd(fd[0], &err);
>> +    if (err) {
>> +        error_report_err(err);
>> +        return -1;
>> +    }
>> +
>> +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
>> +
>> +    f_remote = qemu_fopen_channel_input(ioc);
>> +
>> +    pdev->migsize = 0;
>> +
>> +    mig_data = g_malloc0(sizeof(proxy_mig_data));
>> +    mig_data->rem = f_remote;
>> +    mig_data->dev = pdev;
>> +
>> +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
>> +                       QEMU_THREAD_DETACHED);
>> +
>> +    msg.cmd = START_MIG_OUT;
>> +    msg.bytestream = 0;
>> +    msg.num_fds = 2;
>> +    msg.fds[0] = fd[1];
>> +    msg.fds[1] = GET_REMOTE_WAIT;
>> +
>> +    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
>> +    size = wait_for_remote(msg.fds[1]);
>> +    PUT_REMOTE_WAIT(msg.fds[1]);
>> +
>> +    assert(size != ULLONG_MAX);
>> +
>> +    /*
>> +     * migsize is being update by a separate thread. Using volatile to
>> +     * instruct the compiler to fetch the value of this variable from
>> +     * memory during every read
>> +     */
>> +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
>> +    }
>> +
>> +    qemu_thread_cancel(&thread);
> 
> ....this is a safe way to stop the thread executing without
> resulting in memory being leaked.
> 
> In addition thread cancellation is asynchronous, so the thread
> may still be using the QEMUFile object while....
> 
>> +    qemu_fclose(f_remote);

The above "wait_for_remote()" call waits for the remote process to
finish with Migration, and return the size of the VMSD.

It should be safe to cancel the thread and close the file, once the
remote process is done sending the VMSD and we have read "size" bytes
from it, is it not?

Thank you very much!
--
Jag

> 
> ..this is closing it. This feels like it is a crash danger.
> 
> 
>> +    close(fd[1]);
>> +
>> +    return 0;
>> +}
> 
> Regards,
> Daniel
>
Daniel P. Berrangé Nov. 13, 2019, 5:11 p.m. UTC | #3
On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote:
> 
> 
> On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote:
> > On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote:
> > > Collect the VMSD from remote process on the source and save
> > > it to the channel leading to the destination
> > > 
> > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> > > Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> > > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> > > ---
> > >   New patch in v4
> > > 
> > >   hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
> > >   include/hw/proxy/qemu-proxy.h |   2 +
> > >   include/io/mpqemu-link.h      |   1 +
> > >   3 files changed, 135 insertions(+)
> > > 
> > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
> > > index 623a6c5..ce72e6a 100644
> > > --- a/hw/proxy/qemu-proxy.c
> > > +++ b/hw/proxy/qemu-proxy.c
> > > @@ -52,6 +52,14 @@
> > >   #include "util/event_notifier-posix.c"
> > >   #include "hw/boards.h"
> > >   #include "include/qemu/log.h"
> > > +#include "io/channel.h"
> > > +#include "migration/qemu-file-types.h"
> > > +#include "qapi/error.h"
> > > +#include "io/channel-util.h"
> > > +#include "migration/qemu-file-channel.h"
> > > +#include "migration/qemu-file.h"
> > > +#include "migration/migration.h"
> > > +#include "migration/vmstate.h"
> > >   QEMUTimer *hb_timer;
> > >   static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
> > > @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void);
> > >   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
> > >   static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
> > > +#define PAGE_SIZE getpagesize()
> > > +uint8_t *mig_data;
> > > +
> > >   static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
> > >   {
> > >       /* TODO: Add proper handler. */
> > > @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj)
> > >       dev->mem_init = false;
> > >   }
> > > +typedef struct {
> > > +    QEMUFile *rem;
> > > +    PCIProxyDev *dev;
> > > +} proxy_mig_data;
> > > +
> > > +static void *proxy_mig_out(void *opaque)
> > > +{
> > > +    proxy_mig_data *data = opaque;
> > > +    PCIProxyDev *dev = data->dev;
> > > +    uint8_t byte;
> > > +    uint64_t data_size = PAGE_SIZE;
> > > +
> > > +    mig_data = g_malloc(data_size);
> > > +
> > > +    while (true) {
> > > +        byte = qemu_get_byte(data->rem);
> > 
> > There is a pretty large set of APIs hiding behind the qemu_get_byte
> > call, which does not give me confidence that...
> > 
> > > +        mig_data[dev->migsize++] = byte;
> > > +        if (dev->migsize == data_size) {
> > > +            data_size += PAGE_SIZE;
> > > +            mig_data = g_realloc(mig_data, data_size);
> > > +        }
> > > +    }
> > > +
> > > +    return NULL;
> > > +}
> > > +
> > > +static int proxy_pre_save(void *opaque)
> > > +{
> > > +    PCIProxyDev *pdev = opaque;
> > > +    proxy_mig_data *mig_data;
> > > +    QEMUFile *f_remote;
> > > +    MPQemuMsg msg = {0};
> > > +    QemuThread thread;
> > > +    Error *err = NULL;
> > > +    QIOChannel *ioc;
> > > +    uint64_t size;
> > > +    int fd[2];
> > > +
> > > +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
> > > +        return -1;
> > > +    }
> > > +
> > > +    ioc = qio_channel_new_fd(fd[0], &err);
> > > +    if (err) {
> > > +        error_report_err(err);
> > > +        return -1;
> > > +    }
> > > +
> > > +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
> > > +
> > > +    f_remote = qemu_fopen_channel_input(ioc);
> > > +
> > > +    pdev->migsize = 0;
> > > +
> > > +    mig_data = g_malloc0(sizeof(proxy_mig_data));
> > > +    mig_data->rem = f_remote;
> > > +    mig_data->dev = pdev;
> > > +
> > > +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
> > > +                       QEMU_THREAD_DETACHED);
> > > +
> > > +    msg.cmd = START_MIG_OUT;
> > > +    msg.bytestream = 0;
> > > +    msg.num_fds = 2;
> > > +    msg.fds[0] = fd[1];
> > > +    msg.fds[1] = GET_REMOTE_WAIT;
> > > +
> > > +    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
> > > +    size = wait_for_remote(msg.fds[1]);
> > > +    PUT_REMOTE_WAIT(msg.fds[1]);
> > > +
> > > +    assert(size != ULLONG_MAX);
> > > +
> > > +    /*
> > > +     * migsize is being update by a separate thread. Using volatile to
> > > +     * instruct the compiler to fetch the value of this variable from
> > > +     * memory during every read
> > > +     */
> > > +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
> > > +    }
> > > +
> > > +    qemu_thread_cancel(&thread);
> > 
> > ....this is a safe way to stop the thread executing without
> > resulting in memory being leaked.
> > 
> > In addition thread cancellation is asynchronous, so the thread
> > may still be using the QEMUFile object while....
> > 
> > > +    qemu_fclose(f_remote);
> 
> The above "wait_for_remote()" call waits for the remote process to
> finish with Migration, and return the size of the VMSD.
> 
> It should be safe to cancel the thread and close the file, once the
> remote process is done sending the VMSD and we have read "size" bytes
> from it, is it not?

Ok, so the thread is doing 

    while (true) {
        byte = qemu_get_byte(data->rem);
        ...do something with byte...
    }

so when the thread is cancelled it is almost certainly in the
qemu_get_byte() call. Since you say wait_for_remote() syncs
with the end of migration, I'll presume there's no more data
to be read but the file is still open.

If we're using a blocking FD here we'll probably be stuck in
read() when we're cancelled, and cancellation would probably
be ok from looking at the current impl of QEMUFile / QIOChannel.
If we're handling any error scenario though there could be a
"Error *local_err" that needs freeing before cancellation.

If the fclose is processed before cancellation takes affect
on the target thread though we could have a race.

  1. proxy_mig_out blocked in read from qemu_fill_buffer

  2. main thread request async cancel

  3. main thread calls qemu_fclose which closes the FD
     and free's the QEMUFile object

  4. proxy_mig_out thread returns from read() with
     ret == 0 (EOF)

  5. proxy_mig_out thread calls qemu_file_set_error_obj
     on a QEMUFole object free'd in (3). use after free. opps

  6. ..async cancel request gets delivered....

admittedly it is fairly unlikely for the async cancel
to be delayed for so long that this sequence happens, but
unexpected things can happen when we really don't want them.

IMHO the safe way to deal with this would be a lock-step
sequence between the threads

   1. proxy_mig_out blocked in read from qemu_fill_buffer
   
   2. main thread closes the FD with qemu_file_shutdown()
      closing both directions

   3. proxy_mig_out returns from read with ret == 0 (EOF)

   4. proxy_mig_out thread breaks out of its inifinite loop
      due to EOF and exits

   5. main thread calls pthread_join on proxy_mig_out

   6. main thread calls qemu_fclose()

this is easier to reason about the safety of than the cancel based
approach IMHO.

Regards,
Daniel
Jag Raman Nov. 18, 2019, 3:42 p.m. UTC | #4
On 11/13/2019 12:11 PM, Daniel P. Berrangé wrote:
> On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote:
>>
>>
>> On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote:
>>> On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote:
>>>> Collect the VMSD from remote process on the source and save
>>>> it to the channel leading to the destination
>>>>
>>>> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
>>>> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
>>>> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
>>>> ---
>>>>    New patch in v4
>>>>
>>>>    hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
>>>>    include/hw/proxy/qemu-proxy.h |   2 +
>>>>    include/io/mpqemu-link.h      |   1 +
>>>>    3 files changed, 135 insertions(+)
>>>>
>>>> diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
>>>> index 623a6c5..ce72e6a 100644
>>>> --- a/hw/proxy/qemu-proxy.c
>>>> +++ b/hw/proxy/qemu-proxy.c
>>>> @@ -52,6 +52,14 @@
>>>>    #include "util/event_notifier-posix.c"
>>>>    #include "hw/boards.h"
>>>>    #include "include/qemu/log.h"
>>>> +#include "io/channel.h"
>>>> +#include "migration/qemu-file-types.h"
>>>> +#include "qapi/error.h"
>>>> +#include "io/channel-util.h"
>>>> +#include "migration/qemu-file-channel.h"
>>>> +#include "migration/qemu-file.h"
>>>> +#include "migration/migration.h"
>>>> +#include "migration/vmstate.h"
>>>>    QEMUTimer *hb_timer;
>>>>    static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
>>>> @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void);
>>>>    static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
>>>>    static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
>>>> +#define PAGE_SIZE getpagesize()
>>>> +uint8_t *mig_data;
>>>> +
>>>>    static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
>>>>    {
>>>>        /* TODO: Add proper handler. */
>>>> @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj)
>>>>        dev->mem_init = false;
>>>>    }
>>>> +typedef struct {
>>>> +    QEMUFile *rem;
>>>> +    PCIProxyDev *dev;
>>>> +} proxy_mig_data;
>>>> +
>>>> +static void *proxy_mig_out(void *opaque)
>>>> +{
>>>> +    proxy_mig_data *data = opaque;
>>>> +    PCIProxyDev *dev = data->dev;
>>>> +    uint8_t byte;
>>>> +    uint64_t data_size = PAGE_SIZE;
>>>> +
>>>> +    mig_data = g_malloc(data_size);
>>>> +
>>>> +    while (true) {
>>>> +        byte = qemu_get_byte(data->rem);
>>>
>>> There is a pretty large set of APIs hiding behind the qemu_get_byte
>>> call, which does not give me confidence that...
>>>
>>>> +        mig_data[dev->migsize++] = byte;
>>>> +        if (dev->migsize == data_size) {
>>>> +            data_size += PAGE_SIZE;
>>>> +            mig_data = g_realloc(mig_data, data_size);
>>>> +        }
>>>> +    }
>>>> +
>>>> +    return NULL;
>>>> +}
>>>> +
>>>> +static int proxy_pre_save(void *opaque)
>>>> +{
>>>> +    PCIProxyDev *pdev = opaque;
>>>> +    proxy_mig_data *mig_data;
>>>> +    QEMUFile *f_remote;
>>>> +    MPQemuMsg msg = {0};
>>>> +    QemuThread thread;
>>>> +    Error *err = NULL;
>>>> +    QIOChannel *ioc;
>>>> +    uint64_t size;
>>>> +    int fd[2];
>>>> +
>>>> +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
>>>> +        return -1;
>>>> +    }
>>>> +
>>>> +    ioc = qio_channel_new_fd(fd[0], &err);
>>>> +    if (err) {
>>>> +        error_report_err(err);
>>>> +        return -1;
>>>> +    }
>>>> +
>>>> +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
>>>> +
>>>> +    f_remote = qemu_fopen_channel_input(ioc);
>>>> +
>>>> +    pdev->migsize = 0;
>>>> +
>>>> +    mig_data = g_malloc0(sizeof(proxy_mig_data));
>>>> +    mig_data->rem = f_remote;
>>>> +    mig_data->dev = pdev;
>>>> +
>>>> +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
>>>> +                       QEMU_THREAD_DETACHED);
>>>> +
>>>> +    msg.cmd = START_MIG_OUT;
>>>> +    msg.bytestream = 0;
>>>> +    msg.num_fds = 2;
>>>> +    msg.fds[0] = fd[1];
>>>> +    msg.fds[1] = GET_REMOTE_WAIT;
>>>> +
>>>> +    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
>>>> +    size = wait_for_remote(msg.fds[1]);
>>>> +    PUT_REMOTE_WAIT(msg.fds[1]);
>>>> +
>>>> +    assert(size != ULLONG_MAX);
>>>> +
>>>> +    /*
>>>> +     * migsize is being update by a separate thread. Using volatile to
>>>> +     * instruct the compiler to fetch the value of this variable from
>>>> +     * memory during every read
>>>> +     */
>>>> +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
>>>> +    }
>>>> +
>>>> +    qemu_thread_cancel(&thread);
>>>
>>> ....this is a safe way to stop the thread executing without
>>> resulting in memory being leaked.
>>>
>>> In addition thread cancellation is asynchronous, so the thread
>>> may still be using the QEMUFile object while....
>>>
>>>> +    qemu_fclose(f_remote);
>>
>> The above "wait_for_remote()" call waits for the remote process to
>> finish with Migration, and return the size of the VMSD.
>>
>> It should be safe to cancel the thread and close the file, once the
>> remote process is done sending the VMSD and we have read "size" bytes
>> from it, is it not?
> 
> Ok, so the thread is doing
> 
>      while (true) {
>          byte = qemu_get_byte(data->rem);
>          ...do something with byte...
>      }
> 
> so when the thread is cancelled it is almost certainly in the
> qemu_get_byte() call. Since you say wait_for_remote() syncs
> with the end of migration, I'll presume there's no more data
> to be read but the file is still open.
> 
> If we're using a blocking FD here we'll probably be stuck in
> read() when we're cancelled, and cancellation would probably
> be ok from looking at the current impl of QEMUFile / QIOChannel.
> If we're handling any error scenario though there could be a
> "Error *local_err" that needs freeing before cancellation.
> 
> If the fclose is processed before cancellation takes affect
> on the target thread though we could have a race.
> 
>    1. proxy_mig_out blocked in read from qemu_fill_buffer
> 
>    2. main thread request async cancel
> 
>    3. main thread calls qemu_fclose which closes the FD
>       and free's the QEMUFile object
> 
>    4. proxy_mig_out thread returns from read() with
>       ret == 0 (EOF)

This wasn't happening. It would be convenient if it did.

When the file was closed by the main thread, the async thread was still
hung at qemu_fill_buffer(), instead of returning 0 (EOF). That's reason
why we took the thread-cancellation route. We'd be glad to remove
qemu_thread_cancel().

> 
>    5. proxy_mig_out thread calls qemu_file_set_error_obj
>       on a QEMUFole object free'd in (3). use after free. opps
> 
>    6. ..async cancel request gets delivered....
> 
> admittedly it is fairly unlikely for the async cancel
> to be delayed for so long that this sequence happens, but
> unexpected things can happen when we really don't want them.

Absolutely, we don't want to leave anything to chance.

> 
> IMHO the safe way to deal with this would be a lock-step
> sequence between the threads
> 
>     1. proxy_mig_out blocked in read from qemu_fill_buffer
>     
>     2. main thread closes the FD with qemu_file_shutdown()
>        closing both directions

Will give qemu_file_shutdown() a try.

Thank you!
--
Jag

> 
>     3. proxy_mig_out returns from read with ret == 0 (EOF)
> 
>     4. proxy_mig_out thread breaks out of its inifinite loop
>        due to EOF and exits
> 
>     5. main thread calls pthread_join on proxy_mig_out
> 
>     6. main thread calls qemu_fclose()
> 
> this is easier to reason about the safety of than the cancel based
> approach IMHO.
> 
> Regards,
> Daniel
>
Dr. David Alan Gilbert Nov. 22, 2019, 10:34 a.m. UTC | #5
* Jag Raman (jag.raman@oracle.com) wrote:
> 
> 
> On 11/13/2019 12:11 PM, Daniel P. Berrangé wrote:
> > On Wed, Nov 13, 2019 at 11:32:09AM -0500, Jag Raman wrote:
> > > 
> > > 
> > > On 11/13/2019 10:50 AM, Daniel P. Berrangé wrote:
> > > > On Thu, Oct 24, 2019 at 05:09:22AM -0400, Jagannathan Raman wrote:
> > > > > Collect the VMSD from remote process on the source and save
> > > > > it to the channel leading to the destination
> > > > > 
> > > > > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> > > > > Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> > > > > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> > > > > ---
> > > > >    New patch in v4
> > > > > 
> > > > >    hw/proxy/qemu-proxy.c         | 132 ++++++++++++++++++++++++++++++++++++++++++
> > > > >    include/hw/proxy/qemu-proxy.h |   2 +
> > > > >    include/io/mpqemu-link.h      |   1 +
> > > > >    3 files changed, 135 insertions(+)
> > > > > 
> > > > > diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
> > > > > index 623a6c5..ce72e6a 100644
> > > > > --- a/hw/proxy/qemu-proxy.c
> > > > > +++ b/hw/proxy/qemu-proxy.c
> > > > > @@ -52,6 +52,14 @@
> > > > >    #include "util/event_notifier-posix.c"
> > > > >    #include "hw/boards.h"
> > > > >    #include "include/qemu/log.h"
> > > > > +#include "io/channel.h"
> > > > > +#include "migration/qemu-file-types.h"
> > > > > +#include "qapi/error.h"
> > > > > +#include "io/channel-util.h"
> > > > > +#include "migration/qemu-file-channel.h"
> > > > > +#include "migration/qemu-file.h"
> > > > > +#include "migration/migration.h"
> > > > > +#include "migration/vmstate.h"
> > > > >    QEMUTimer *hb_timer;
> > > > >    static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
> > > > > @@ -62,6 +70,9 @@ static void stop_heartbeat_timer(void);
> > > > >    static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
> > > > >    static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
> > > > > +#define PAGE_SIZE getpagesize()
> > > > > +uint8_t *mig_data;
> > > > > +
> > > > >    static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
> > > > >    {
> > > > >        /* TODO: Add proper handler. */
> > > > > @@ -357,14 +368,135 @@ static void pci_proxy_dev_inst_init(Object *obj)
> > > > >        dev->mem_init = false;
> > > > >    }
> > > > > +typedef struct {
> > > > > +    QEMUFile *rem;
> > > > > +    PCIProxyDev *dev;
> > > > > +} proxy_mig_data;
> > > > > +
> > > > > +static void *proxy_mig_out(void *opaque)
> > > > > +{
> > > > > +    proxy_mig_data *data = opaque;
> > > > > +    PCIProxyDev *dev = data->dev;
> > > > > +    uint8_t byte;
> > > > > +    uint64_t data_size = PAGE_SIZE;
> > > > > +
> > > > > +    mig_data = g_malloc(data_size);
> > > > > +
> > > > > +    while (true) {
> > > > > +        byte = qemu_get_byte(data->rem);
> > > > 
> > > > There is a pretty large set of APIs hiding behind the qemu_get_byte
> > > > call, which does not give me confidence that...
> > > > 
> > > > > +        mig_data[dev->migsize++] = byte;
> > > > > +        if (dev->migsize == data_size) {
> > > > > +            data_size += PAGE_SIZE;
> > > > > +            mig_data = g_realloc(mig_data, data_size);
> > > > > +        }
> > > > > +    }
> > > > > +
> > > > > +    return NULL;
> > > > > +}
> > > > > +
> > > > > +static int proxy_pre_save(void *opaque)
> > > > > +{
> > > > > +    PCIProxyDev *pdev = opaque;
> > > > > +    proxy_mig_data *mig_data;
> > > > > +    QEMUFile *f_remote;
> > > > > +    MPQemuMsg msg = {0};
> > > > > +    QemuThread thread;
> > > > > +    Error *err = NULL;
> > > > > +    QIOChannel *ioc;
> > > > > +    uint64_t size;
> > > > > +    int fd[2];
> > > > > +
> > > > > +    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
> > > > > +        return -1;
> > > > > +    }
> > > > > +
> > > > > +    ioc = qio_channel_new_fd(fd[0], &err);
> > > > > +    if (err) {
> > > > > +        error_report_err(err);
> > > > > +        return -1;
> > > > > +    }
> > > > > +
> > > > > +    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
> > > > > +
> > > > > +    f_remote = qemu_fopen_channel_input(ioc);
> > > > > +
> > > > > +    pdev->migsize = 0;
> > > > > +
> > > > > +    mig_data = g_malloc0(sizeof(proxy_mig_data));
> > > > > +    mig_data->rem = f_remote;
> > > > > +    mig_data->dev = pdev;
> > > > > +
> > > > > +    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
> > > > > +                       QEMU_THREAD_DETACHED);
> > > > > +
> > > > > +    msg.cmd = START_MIG_OUT;
> > > > > +    msg.bytestream = 0;
> > > > > +    msg.num_fds = 2;
> > > > > +    msg.fds[0] = fd[1];
> > > > > +    msg.fds[1] = GET_REMOTE_WAIT;
> > > > > +
> > > > > +    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
> > > > > +    size = wait_for_remote(msg.fds[1]);
> > > > > +    PUT_REMOTE_WAIT(msg.fds[1]);
> > > > > +
> > > > > +    assert(size != ULLONG_MAX);
> > > > > +
> > > > > +    /*
> > > > > +     * migsize is being update by a separate thread. Using volatile to
> > > > > +     * instruct the compiler to fetch the value of this variable from
> > > > > +     * memory during every read
> > > > > +     */
> > > > > +    while (*((volatile uint64_t *)&pdev->migsize) < size) {
> > > > > +    }
> > > > > +
> > > > > +    qemu_thread_cancel(&thread);
> > > > 
> > > > ....this is a safe way to stop the thread executing without
> > > > resulting in memory being leaked.
> > > > 
> > > > In addition thread cancellation is asynchronous, so the thread
> > > > may still be using the QEMUFile object while....
> > > > 
> > > > > +    qemu_fclose(f_remote);
> > > 
> > > The above "wait_for_remote()" call waits for the remote process to
> > > finish with Migration, and return the size of the VMSD.
> > > 
> > > It should be safe to cancel the thread and close the file, once the
> > > remote process is done sending the VMSD and we have read "size" bytes
> > > from it, is it not?
> > 
> > Ok, so the thread is doing
> > 
> >      while (true) {
> >          byte = qemu_get_byte(data->rem);
> >          ...do something with byte...
> >      }
> > 
> > so when the thread is cancelled it is almost certainly in the
> > qemu_get_byte() call. Since you say wait_for_remote() syncs
> > with the end of migration, I'll presume there's no more data
> > to be read but the file is still open.
> > 
> > If we're using a blocking FD here we'll probably be stuck in
> > read() when we're cancelled, and cancellation would probably
> > be ok from looking at the current impl of QEMUFile / QIOChannel.
> > If we're handling any error scenario though there could be a
> > "Error *local_err" that needs freeing before cancellation.
> > 
> > If the fclose is processed before cancellation takes affect
> > on the target thread though we could have a race.
> > 
> >    1. proxy_mig_out blocked in read from qemu_fill_buffer
> > 
> >    2. main thread request async cancel
> > 
> >    3. main thread calls qemu_fclose which closes the FD
> >       and free's the QEMUFile object
> > 
> >    4. proxy_mig_out thread returns from read() with
> >       ret == 0 (EOF)
> 
> This wasn't happening. It would be convenient if it did.
> 
> When the file was closed by the main thread, the async thread was still
> hung at qemu_fill_buffer(), instead of returning 0 (EOF). That's reason
> why we took the thread-cancellation route. We'd be glad to remove
> qemu_thread_cancel().
> 
> > 
> >    5. proxy_mig_out thread calls qemu_file_set_error_obj
> >       on a QEMUFole object free'd in (3). use after free. opps
> > 
> >    6. ..async cancel request gets delivered....
> > 
> > admittedly it is fairly unlikely for the async cancel
> > to be delayed for so long that this sequence happens, but
> > unexpected things can happen when we really don't want them.
> 
> Absolutely, we don't want to leave anything to chance.
> 
> > 
> > IMHO the safe way to deal with this would be a lock-step
> > sequence between the threads
> > 
> >     1. proxy_mig_out blocked in read from qemu_fill_buffer
> >     2. main thread closes the FD with qemu_file_shutdown()
> >        closing both directions
> 
> Will give qemu_file_shutdown() a try.

Yes, shutdown() is quite nice - but note it does need to be a socket.

Dave

> Thank you!
> --
> Jag
> 
> > 
> >     3. proxy_mig_out returns from read with ret == 0 (EOF)
> > 
> >     4. proxy_mig_out thread breaks out of its inifinite loop
> >        due to EOF and exits
> > 
> >     5. main thread calls pthread_join on proxy_mig_out
> > 
> >     6. main thread calls qemu_fclose()
> > 
> > this is easier to reason about the safety of than the cancel based
> > approach IMHO.
> > 
> > Regards,
> > Daniel
> > 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox series

Patch

diff --git a/hw/proxy/qemu-proxy.c b/hw/proxy/qemu-proxy.c
index 623a6c5..ce72e6a 100644
--- a/hw/proxy/qemu-proxy.c
+++ b/hw/proxy/qemu-proxy.c
@@ -52,6 +52,14 @@ 
 #include "util/event_notifier-posix.c"
 #include "hw/boards.h"
 #include "include/qemu/log.h"
+#include "io/channel.h"
+#include "migration/qemu-file-types.h"
+#include "qapi/error.h"
+#include "io/channel-util.h"
+#include "migration/qemu-file-channel.h"
+#include "migration/qemu-file.h"
+#include "migration/migration.h"
+#include "migration/vmstate.h"
 
 QEMUTimer *hb_timer;
 static void pci_proxy_dev_realize(PCIDevice *dev, Error **errp);
@@ -62,6 +70,9 @@  static void stop_heartbeat_timer(void);
 static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx);
 static void broadcast_msg(MPQemuMsg *msg, bool need_reply);
 
+#define PAGE_SIZE getpagesize()
+uint8_t *mig_data;
+
 static void childsig_handler(int sig, siginfo_t *siginfo, void *ctx)
 {
     /* TODO: Add proper handler. */
@@ -357,14 +368,135 @@  static void pci_proxy_dev_inst_init(Object *obj)
     dev->mem_init = false;
 }
 
+typedef struct {
+    QEMUFile *rem;
+    PCIProxyDev *dev;
+} proxy_mig_data;
+
+static void *proxy_mig_out(void *opaque)
+{
+    proxy_mig_data *data = opaque;
+    PCIProxyDev *dev = data->dev;
+    uint8_t byte;
+    uint64_t data_size = PAGE_SIZE;
+
+    mig_data = g_malloc(data_size);
+
+    while (true) {
+        byte = qemu_get_byte(data->rem);
+        mig_data[dev->migsize++] = byte;
+        if (dev->migsize == data_size) {
+            data_size += PAGE_SIZE;
+            mig_data = g_realloc(mig_data, data_size);
+        }
+    }
+
+    return NULL;
+}
+
+static int proxy_pre_save(void *opaque)
+{
+    PCIProxyDev *pdev = opaque;
+    proxy_mig_data *mig_data;
+    QEMUFile *f_remote;
+    MPQemuMsg msg = {0};
+    QemuThread thread;
+    Error *err = NULL;
+    QIOChannel *ioc;
+    uint64_t size;
+    int fd[2];
+
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) {
+        return -1;
+    }
+
+    ioc = qio_channel_new_fd(fd[0], &err);
+    if (err) {
+        error_report_err(err);
+        return -1;
+    }
+
+    qio_channel_set_name(QIO_CHANNEL(ioc), "PCIProxyDevice-mig");
+
+    f_remote = qemu_fopen_channel_input(ioc);
+
+    pdev->migsize = 0;
+
+    mig_data = g_malloc0(sizeof(proxy_mig_data));
+    mig_data->rem = f_remote;
+    mig_data->dev = pdev;
+
+    qemu_thread_create(&thread, "Proxy MIG_OUT", proxy_mig_out, mig_data,
+                       QEMU_THREAD_DETACHED);
+
+    msg.cmd = START_MIG_OUT;
+    msg.bytestream = 0;
+    msg.num_fds = 2;
+    msg.fds[0] = fd[1];
+    msg.fds[1] = GET_REMOTE_WAIT;
+
+    mpqemu_msg_send(pdev->mpqemu_link, &msg, pdev->mpqemu_link->com);
+    size = wait_for_remote(msg.fds[1]);
+    PUT_REMOTE_WAIT(msg.fds[1]);
+
+    assert(size != ULLONG_MAX);
+
+    /*
+     * migsize is being update by a separate thread. Using volatile to
+     * instruct the compiler to fetch the value of this variable from
+     * memory during every read
+     */
+    while (*((volatile uint64_t *)&pdev->migsize) < size) {
+    }
+
+    qemu_thread_cancel(&thread);
+
+    qemu_fclose(f_remote);
+    close(fd[1]);
+
+    return 0;
+}
+
+static int proxy_post_save(void *opaque)
+{
+    MigrationState *ms = migrate_get_current();
+    PCIProxyDev *pdev = opaque;
+    uint64_t pos = 0;
+
+    while (pos < pdev->migsize) {
+        qemu_put_byte(ms->to_dst_file, mig_data[pos]);
+        pos++;
+    }
+
+    qemu_fflush(ms->to_dst_file);
+
+    return 0;
+}
+
+const VMStateDescription vmstate_pci_proxy_device = {
+    .name = "PCIProxyDevice",
+    .version_id = 2,
+    .minimum_version_id = 1,
+    .pre_save = proxy_pre_save,
+    .post_save = proxy_post_save,
+    .fields = (VMStateField[]) {
+        VMSTATE_PCI_DEVICE(parent_dev, PCIProxyDev),
+        VMSTATE_UINT64(migsize, PCIProxyDev),
+        VMSTATE_END_OF_LIST()
+    }
+};
+
 static void pci_proxy_dev_class_init(ObjectClass *klass, void *data)
 {
     PCIDeviceClass *k = PCI_DEVICE_CLASS(klass);
+    DeviceClass *dc = DEVICE_CLASS(klass);
 
     k->realize = pci_proxy_dev_realize;
     k->exit = pci_dev_exit;
     k->config_read = pci_proxy_read_config;
     k->config_write = pci_proxy_write_config;
+
+    dc->vmsd = &vmstate_pci_proxy_device;
 }
 
 static const TypeInfo pci_proxy_dev_type_info = {
diff --git a/include/hw/proxy/qemu-proxy.h b/include/hw/proxy/qemu-proxy.h
index 17e07ac..b122e6d 100644
--- a/include/hw/proxy/qemu-proxy.h
+++ b/include/hw/proxy/qemu-proxy.h
@@ -89,6 +89,8 @@  struct PCIProxyDev {
     void (*init_proxy) (PCIDevice *dev, char *command, bool need_spawn, Error **errp);
 
     ProxyMemoryRegion region[PCI_NUM_REGIONS];
+
+    uint64_t migsize;
 };
 
 typedef struct PCIProxyDevClass {
diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
index 6fcc6f5..0ed7750 100644
--- a/include/io/mpqemu-link.h
+++ b/include/io/mpqemu-link.h
@@ -75,6 +75,7 @@  typedef enum {
     PROXY_PING,
     MMIO_RETURN,
     DEVICE_RESET,
+    START_MIG_OUT,
     MAX,
 } mpqemu_cmd_t;