diff mbox series

[v2,09/17] migration/multifd: Device state transfer support - receive side

Message ID 84141182083a8417c25b4d82a9c4b6228b22ac67.1724701542.git.maciej.szmigiero@oracle.com (mailing list archive)
State New
Headers show
Series Multifd | expand

Commit Message

Maciej S. Szmigiero Aug. 27, 2024, 5:54 p.m. UTC
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Add a basic support for receiving device state via multifd channels -
channels that are shared with RAM transfers.

To differentiate between a device state and a RAM packet the packet
header is read first.

Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
packet header either device state (MultiFDPacketDeviceState_t) or RAM
data (existing MultiFDPacket_t) is then read.

The received device state data is provided to
qemu_loadvm_load_state_buffer() function for processing in the
device's load_state_buffer handler.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
 migration/multifd.h |  31 ++++++++++-
 2 files changed, 138 insertions(+), 20 deletions(-)

Comments

Fabiano Rosas Aug. 30, 2024, 8:22 p.m. UTC | #1
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Add a basic support for receiving device state via multifd channels -
> channels that are shared with RAM transfers.
>
> To differentiate between a device state and a RAM packet the packet
> header is read first.
>
> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
> packet header either device state (MultiFDPacketDeviceState_t) or RAM
> data (existing MultiFDPacket_t) is then read.
>
> The received device state data is provided to
> qemu_loadvm_load_state_buffer() function for processing in the
> device's load_state_buffer handler.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>  migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>  migration/multifd.h |  31 ++++++++++-
>  2 files changed, 138 insertions(+), 20 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b06a9fab500e..d5a8e5a9c9b5 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -21,6 +21,7 @@
>  #include "file.h"
>  #include "migration.h"
>  #include "migration-stats.h"
> +#include "savevm.h"
>  #include "socket.h"
>  #include "tls.h"
>  #include "qemu-file.h"
> @@ -209,10 +210,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>  
>      memset(packet, 0, p->packet_len);
>  
> -    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> -    packet->version = cpu_to_be32(MULTIFD_VERSION);
> +    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
> +    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
>  
> -    packet->flags = cpu_to_be32(p->flags);
> +    packet->hdr.flags = cpu_to_be32(p->flags);
>      packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>  
>      packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
> @@ -228,31 +229,49 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>                              p->flags, p->next_packet_size);
>  }
>  
> -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
> +                                             MultiFDPacketHdr_t *hdr,
> +                                             Error **errp)
>  {
> -    MultiFDPacket_t *packet = p->packet;
> -    int ret = 0;
> -
> -    packet->magic = be32_to_cpu(packet->magic);
> -    if (packet->magic != MULTIFD_MAGIC) {
> +    hdr->magic = be32_to_cpu(hdr->magic);
> +    if (hdr->magic != MULTIFD_MAGIC) {
>          error_setg(errp, "multifd: received packet "
>                     "magic %x and expected magic %x",
> -                   packet->magic, MULTIFD_MAGIC);
> +                   hdr->magic, MULTIFD_MAGIC);
>          return -1;
>      }
>  
> -    packet->version = be32_to_cpu(packet->version);
> -    if (packet->version != MULTIFD_VERSION) {
> +    hdr->version = be32_to_cpu(hdr->version);
> +    if (hdr->version != MULTIFD_VERSION) {
>          error_setg(errp, "multifd: received packet "
>                     "version %u and expected version %u",
> -                   packet->version, MULTIFD_VERSION);
> +                   hdr->version, MULTIFD_VERSION);
>          return -1;
>      }
>  
> -    p->flags = be32_to_cpu(packet->flags);
> +    p->flags = be32_to_cpu(hdr->flags);
> +
> +    return 0;
> +}
> +
> +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
> +                                                   Error **errp)
> +{
> +    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
> +
> +    packet->instance_id = be32_to_cpu(packet->instance_id);
> +    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
> +
> +    return 0;
> +}
> +
> +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
> +{
> +    MultiFDPacket_t *packet = p->packet;
> +    int ret = 0;
> +
>      p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>      p->packet_num = be64_to_cpu(packet->packet_num);
> -    p->packets_recved++;
>  
>      if (!(p->flags & MULTIFD_FLAG_SYNC)) {
>          ret = multifd_ram_unfill_packet(p, errp);
> @@ -264,6 +283,19 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>      return ret;
>  }
>  
> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> +{
> +    p->packets_recved++;
> +
> +    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
> +        return multifd_recv_unfill_packet_device_state(p, errp);
> +    } else {
> +        return multifd_recv_unfill_packet_ram(p, errp);
> +    }
> +
> +    g_assert_not_reached();
> +}
> +
>  static bool multifd_send_should_exit(void)
>  {
>      return qatomic_read(&multifd_send_state->exiting);
> @@ -1014,6 +1046,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>      p->packet_len = 0;
>      g_free(p->packet);
>      p->packet = NULL;
> +    g_clear_pointer(&p->packet_dev_state, g_free);
>      g_free(p->normal);
>      p->normal = NULL;
>      g_free(p->zero);
> @@ -1126,8 +1159,13 @@ static void *multifd_recv_thread(void *opaque)
>      rcu_register_thread();
>  
>      while (true) {
> +        MultiFDPacketHdr_t hdr;
>          uint32_t flags = 0;
> +        bool is_device_state = false;
>          bool has_data = false;
> +        uint8_t *pkt_buf;
> +        size_t pkt_len;
> +
>          p->normal_num = 0;
>  
>          if (use_packets) {
> @@ -1135,8 +1173,28 @@ static void *multifd_recv_thread(void *opaque)
>                  break;
>              }
>  
> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                           p->packet_len, &local_err);
> +            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
> +                                           sizeof(hdr), &local_err);
> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> +                break;
> +            }
> +
> +            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
> +            if (ret) {
> +                break;
> +            }
> +
> +            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
> +            if (is_device_state) {
> +                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
> +                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
> +            } else {
> +                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
> +                pkt_len = p->packet_len - sizeof(hdr);
> +            }

Should we have made the packet an union as well? Would simplify these
sorts of operations. Not sure I want to start messing with that at this
point to be honest. But OTOH, look at this...

> +
> +            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
> +                                           &local_err);
>              if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>                  break;
>              }
> @@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
>              has_data = !!p->data->size;
>          }
>  
> -        if (has_data) {
> -            ret = multifd_recv_state->ops->recv(p, &local_err);
> +        if (!is_device_state) {
> +            if (has_data) {
> +                ret = multifd_recv_state->ops->recv(p, &local_err);
> +                if (ret != 0) {
> +                    break;
> +                }
> +            }
> +        } else {
> +            g_autofree char *idstr = NULL;
> +            g_autofree char *dev_state_buf = NULL;
> +
> +            assert(use_packets);
> +
> +            if (p->next_packet_size > 0) {
> +                dev_state_buf = g_malloc(p->next_packet_size);
> +
> +                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
> +                if (ret != 0) {
> +                    break;
> +                }
> +            }

What's the use case for !next_packet_size and still call
load_state_buffer below? I can't see it.

...because I would suggest to set has_data up there with
p->next_packet_size:

if (use_packets) {
   ...
   has_data = p->next_packet_size || p->zero_num;
} else {
   ...
   has_data = !!p->data_size;
}

and this whole block would be:

if (has_data) {
   if (is_device_state) {
       multifd_device_state_recv(p, &local_err);
   } else {
       ret = multifd_recv_state->ops->recv(p, &local_err);
   }
}

> +
> +            idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
> +            ret = qemu_loadvm_load_state_buffer(idstr,
> +                                                p->packet_dev_state->instance_id,
> +                                                dev_state_buf, p->next_packet_size,
> +                                                &local_err);
>              if (ret != 0) {
>                  break;
>              }
> @@ -1190,6 +1273,11 @@ static void *multifd_recv_thread(void *opaque)
>  
>          if (use_packets) {
>              if (flags & MULTIFD_FLAG_SYNC) {
> +                if (is_device_state) {
> +                    error_setg(&local_err, "multifd: received SYNC device state packet");
> +                    break;
> +                }

assert(!is_device_state) enough?

> +
>                  qemu_sem_post(&multifd_recv_state->sem_sync);
>                  qemu_sem_wait(&p->sem_sync);
>              }
> @@ -1258,6 +1346,7 @@ int multifd_recv_setup(Error **errp)
>              p->packet_len = sizeof(MultiFDPacket_t)
>                  + sizeof(uint64_t) * page_count;
>              p->packet = g_malloc0(p->packet_len);
> +            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
>          }
>          p->name = g_strdup_printf("mig/dst/recv_%d", i);
>          p->normal = g_new0(ram_addr_t, page_count);
> diff --git a/migration/multifd.h b/migration/multifd.h
> index a3e35196d179..a8f3e4838c01 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>  #define MULTIFD_FLAG_QPL (4 << 1)
>  #define MULTIFD_FLAG_UADK (8 << 1)
>  
> +/*
> + * If set it means that this packet contains device state
> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
> + */
> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)

Overlaps with UADK. I assume on purpose because device_state doesn't
support compression? Might be worth a comment.

> +
>  /* This value needs to be a multiple of qemu_target_page_size() */
>  #define MULTIFD_PACKET_SIZE (512 * 1024)
>  
> @@ -52,6 +58,11 @@ typedef struct {
>      uint32_t magic;
>      uint32_t version;
>      uint32_t flags;
> +} __attribute__((packed)) MultiFDPacketHdr_t;
> +
> +typedef struct {
> +    MultiFDPacketHdr_t hdr;
> +
>      /* maximum number of allocated pages */
>      uint32_t pages_alloc;
>      /* non zero pages */
> @@ -72,6 +83,16 @@ typedef struct {
>      uint64_t offset[];
>  } __attribute__((packed)) MultiFDPacket_t;
>  
> +typedef struct {
> +    MultiFDPacketHdr_t hdr;
> +
> +    char idstr[256] QEMU_NONSTRING;
> +    uint32_t instance_id;
> +
> +    /* size of the next packet that contains the actual data */
> +    uint32_t next_packet_size;
> +} __attribute__((packed)) MultiFDPacketDeviceState_t;
> +
>  typedef struct {
>      /* number of used pages */
>      uint32_t num;
> @@ -89,6 +110,13 @@ struct MultiFDRecvData {
>      off_t file_offset;
>  };
>  
> +typedef struct {
> +    char *idstr;
> +    uint32_t instance_id;
> +    char *buf;
> +    size_t buf_len;
> +} MultiFDDeviceState_t;
> +
>  typedef enum {
>      MULTIFD_PAYLOAD_NONE,
>      MULTIFD_PAYLOAD_RAM,
> @@ -204,8 +232,9 @@ typedef struct {
>  
>      /* thread local variables. No locking required */
>  
> -    /* pointer to the packet */
> +    /* pointers to the possible packet types */
>      MultiFDPacket_t *packet;
> +    MultiFDPacketDeviceState_t *packet_dev_state;
>      /* size of the next packet that contains pages */
>      uint32_t next_packet_size;
>      /* packets received through this channel */
Maciej S. Szmigiero Sept. 2, 2024, 8:12 p.m. UTC | #2
On 30.08.2024 22:22, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Add a basic support for receiving device state via multifd channels -
>> channels that are shared with RAM transfers.
>>
>> To differentiate between a device state and a RAM packet the packet
>> header is read first.
>>
>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>> data (existing MultiFDPacket_t) is then read.
>>
>> The received device state data is provided to
>> qemu_loadvm_load_state_buffer() function for processing in the
>> device's load_state_buffer handler.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>>   migration/multifd.h |  31 ++++++++++-
>>   2 files changed, 138 insertions(+), 20 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index b06a9fab500e..d5a8e5a9c9b5 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
(..)
>>       g_free(p->zero);
>> @@ -1126,8 +1159,13 @@ static void *multifd_recv_thread(void *opaque)
>>       rcu_register_thread();
>>   
>>       while (true) {
>> +        MultiFDPacketHdr_t hdr;
>>           uint32_t flags = 0;
>> +        bool is_device_state = false;
>>           bool has_data = false;
>> +        uint8_t *pkt_buf;
>> +        size_t pkt_len;
>> +
>>           p->normal_num = 0;
>>   
>>           if (use_packets) {
>> @@ -1135,8 +1173,28 @@ static void *multifd_recv_thread(void *opaque)
>>                   break;
>>               }
>>   
>> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>> -                                           p->packet_len, &local_err);
>> +            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
>> +                                           sizeof(hdr), &local_err);
>> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>> +                break;
>> +            }
>> +
>> +            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
>> +            if (ret) {
>> +                break;
>> +            }
>> +
>> +            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
>> +            if (is_device_state) {
>> +                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
>> +                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
>> +            } else {
>> +                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
>> +                pkt_len = p->packet_len - sizeof(hdr);
>> +            }
> 
> Should we have made the packet an union as well? Would simplify these
> sorts of operations. Not sure I want to start messing with that at this
> point to be honest. But OTOH, look at this...

RAM packet length is not constant (at least from the viewpoint of the
migration code) so the union allocation would need some kind of a
"multifd_ram_packet_size()" runtime size determination.

Also, since RAM and device state packet body size is different then
for the extra complexity introduced by that union we'll just get rid of
that single pkt_buf assignment.

>> +
>> +            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
>> +                                           &local_err);
>>               if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>>                   break;
>>               }
>> @@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
>>               has_data = !!p->data->size;
>>           }
>>   
>> -        if (has_data) {
>> -            ret = multifd_recv_state->ops->recv(p, &local_err);
>> +        if (!is_device_state) {
>> +            if (has_data) {
>> +                ret = multifd_recv_state->ops->recv(p, &local_err);
>> +                if (ret != 0) {
>> +                    break;
>> +                }
>> +            }
>> +        } else {
>> +            g_autofree char *idstr = NULL;
>> +            g_autofree char *dev_state_buf = NULL;
>> +
>> +            assert(use_packets);
>> +
>> +            if (p->next_packet_size > 0) {
>> +                dev_state_buf = g_malloc(p->next_packet_size);
>> +
>> +                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
>> +                if (ret != 0) {
>> +                    break;
>> +                }
>> +            }
> 
> What's the use case for !next_packet_size and still call
> load_state_buffer below? I can't see it.

Currently, next_packet_size == 0 has not usage indeed - it is
a leftover from an early version of the patch set (not public)
that had device state packet (chunk) indexing done by
the common migration code, rather than by the VFIO consumer.

And then an empty packet could be used to mark the stream
boundary - like the max chunk number to expect.

> ...because I would suggest to set has_data up there with
> p->next_packet_size:
> 
> if (use_packets) {
>     ...
>     has_data = p->next_packet_size || p->zero_num;
> } else {
>     ...
>     has_data = !!p->data_size;
> }
> 
> and this whole block would be:
> 
> if (has_data) {
>     if (is_device_state) {
>         multifd_device_state_recv(p, &local_err);
>     } else {
>         ret = multifd_recv_state->ops->recv(p, &local_err);
>     }
> }

The above block makes sense to me with two caveats:
1) If empty device state packets (next_packet_size == 0) were
to be unsupported they need to be rejected cleanly rather
than silently skipped,

2) has_data has to have its value computed depending on whether
this is a RAM or a device state packet since looking at
p->normal_num and p->zero_num makes no sense for a device state
packet while I am not sure that looking at p->next_packet_size
for a RAM packet won't introduce some subtle regression.

>> +
>> +            idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
>> +            ret = qemu_loadvm_load_state_buffer(idstr,
>> +                                                p->packet_dev_state->instance_id,
>> +                                                dev_state_buf, p->next_packet_size,
>> +                                                &local_err);
>>               if (ret != 0) {
>>                   break;
>>               }
>> @@ -1190,6 +1273,11 @@ static void *multifd_recv_thread(void *opaque)
>>   
>>           if (use_packets) {
>>               if (flags & MULTIFD_FLAG_SYNC) {
>> +                if (is_device_state) {
>> +                    error_setg(&local_err, "multifd: received SYNC device state packet");
>> +                    break;
>> +                }
> 
> assert(!is_device_state) enough?

It's not bug in the receiver code but rather an issue with the
remote QEMU sending us wrong data if we get a SYNC device state
packet.

So I think returning an error is more appropriate than triggering
an assert() failure for that.

>> +
>>                   qemu_sem_post(&multifd_recv_state->sem_sync);
>>                   qemu_sem_wait(&p->sem_sync);
>>               }
>> @@ -1258,6 +1346,7 @@ int multifd_recv_setup(Error **errp)
>>               p->packet_len = sizeof(MultiFDPacket_t)
>>                   + sizeof(uint64_t) * page_count;
>>               p->packet = g_malloc0(p->packet_len);
>> +            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
>>           }
>>           p->name = g_strdup_printf("mig/dst/recv_%d", i);
>>           p->normal = g_new0(ram_addr_t, page_count);
>> diff --git a/migration/multifd.h b/migration/multifd.h
>> index a3e35196d179..a8f3e4838c01 100644
>> --- a/migration/multifd.h
>> +++ b/migration/multifd.h
>> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>>   #define MULTIFD_FLAG_QPL (4 << 1)
>>   #define MULTIFD_FLAG_UADK (8 << 1)
>>   
>> +/*
>> + * If set it means that this packet contains device state
>> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
>> + */
>> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
> 
> Overlaps with UADK. I assume on purpose because device_state doesn't
> support compression? Might be worth a comment.
> 

Yes, the device state transfer bit stream does not support compression
so it is not a problem since these "compression type" flags will never
be set in such bit stream anyway.

Will add a relevant comment here.

Thanks,
Maciej
Fabiano Rosas Sept. 3, 2024, 2:42 p.m. UTC | #3
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> On 30.08.2024 22:22, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>> 
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Add a basic support for receiving device state via multifd channels -
>>> channels that are shared with RAM transfers.
>>>
>>> To differentiate between a device state and a RAM packet the packet
>>> header is read first.
>>>
>>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
>>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>>> data (existing MultiFDPacket_t) is then read.
>>>
>>> The received device state data is provided to
>>> qemu_loadvm_load_state_buffer() function for processing in the
>>> device's load_state_buffer handler.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>>>   migration/multifd.h |  31 ++++++++++-
>>>   2 files changed, 138 insertions(+), 20 deletions(-)
>>>
>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>> index b06a9fab500e..d5a8e5a9c9b5 100644
>>> --- a/migration/multifd.c
>>> +++ b/migration/multifd.c
> (..)
>>>       g_free(p->zero);
>>> @@ -1126,8 +1159,13 @@ static void *multifd_recv_thread(void *opaque)
>>>       rcu_register_thread();
>>>   
>>>       while (true) {
>>> +        MultiFDPacketHdr_t hdr;
>>>           uint32_t flags = 0;
>>> +        bool is_device_state = false;
>>>           bool has_data = false;
>>> +        uint8_t *pkt_buf;
>>> +        size_t pkt_len;
>>> +
>>>           p->normal_num = 0;
>>>   
>>>           if (use_packets) {
>>> @@ -1135,8 +1173,28 @@ static void *multifd_recv_thread(void *opaque)
>>>                   break;
>>>               }
>>>   
>>> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>>> -                                           p->packet_len, &local_err);
>>> +            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
>>> +                                           sizeof(hdr), &local_err);
>>> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>>> +                break;
>>> +            }
>>> +
>>> +            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
>>> +            if (ret) {
>>> +                break;
>>> +            }
>>> +
>>> +            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
>>> +            if (is_device_state) {
>>> +                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
>>> +                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
>>> +            } else {
>>> +                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
>>> +                pkt_len = p->packet_len - sizeof(hdr);
>>> +            }
>> 
>> Should we have made the packet an union as well? Would simplify these
>> sorts of operations. Not sure I want to start messing with that at this
>> point to be honest. But OTOH, look at this...
>
> RAM packet length is not constant (at least from the viewpoint of the
> migration code) so the union allocation would need some kind of a
> "multifd_ram_packet_size()" runtime size determination.
>
> Also, since RAM and device state packet body size is different then
> for the extra complexity introduced by that union we'll just get rid of
> that single pkt_buf assignment.
>
>>> +
>>> +            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
>>> +                                           &local_err);
>>>               if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>>>                   break;
>>>               }
>>> @@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
>>>               has_data = !!p->data->size;
>>>           }
>>>   
>>> -        if (has_data) {
>>> -            ret = multifd_recv_state->ops->recv(p, &local_err);
>>> +        if (!is_device_state) {
>>> +            if (has_data) {
>>> +                ret = multifd_recv_state->ops->recv(p, &local_err);
>>> +                if (ret != 0) {
>>> +                    break;
>>> +                }
>>> +            }
>>> +        } else {
>>> +            g_autofree char *idstr = NULL;
>>> +            g_autofree char *dev_state_buf = NULL;
>>> +
>>> +            assert(use_packets);
>>> +
>>> +            if (p->next_packet_size > 0) {
>>> +                dev_state_buf = g_malloc(p->next_packet_size);
>>> +
>>> +                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
>>> +                if (ret != 0) {
>>> +                    break;
>>> +                }
>>> +            }
>> 
>> What's the use case for !next_packet_size and still call
>> load_state_buffer below? I can't see it.
>
> Currently, next_packet_size == 0 has not usage indeed - it is
> a leftover from an early version of the patch set (not public)
> that had device state packet (chunk) indexing done by
> the common migration code, rather than by the VFIO consumer.
>
> And then an empty packet could be used to mark the stream
> boundary - like the max chunk number to expect.
>
>> ...because I would suggest to set has_data up there with
>> p->next_packet_size:
>> 
>> if (use_packets) {
>>     ...
>>     has_data = p->next_packet_size || p->zero_num;
>> } else {
>>     ...
>>     has_data = !!p->data_size;
>> }
>> 
>> and this whole block would be:
>> 
>> if (has_data) {
>>     if (is_device_state) {
>>         multifd_device_state_recv(p, &local_err);
>>     } else {
>>         ret = multifd_recv_state->ops->recv(p, &local_err);
>>     }
>> }
>
> The above block makes sense to me with two caveats:

I have suggestions below, but this is no big deal, so feel free to go
with what you think works best.

> 1) If empty device state packets (next_packet_size == 0) were
> to be unsupported they need to be rejected cleanly rather
> than silently skipped,

Should this be rejected on the send side? That's the most likely source
of the problem if it happens. Don't need to send something we know will
cause an error when loading.

And for the case of stream corruption of some sort we could hoist the
check from load_buffer into here:

 else if (is_device_state) {
    error_setg(errp, "empty device state packet);
    break;
}

> 2) has_data has to have its value computed depending on whether
> this is a RAM or a device state packet since looking at
> p->normal_num and p->zero_num makes no sense for a device state
> packet while I am not sure that looking at p->next_packet_size
> for a RAM packet won't introduce some subtle regression.

It should be ok to use next_packet_size for RAM, it must always be in
sync with normal_num.

>
>>> +
>>> +            idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
>>> +            ret = qemu_loadvm_load_state_buffer(idstr,
>>> +                                                p->packet_dev_state->instance_id,
>>> +                                                dev_state_buf, p->next_packet_size,
>>> +                                                &local_err);
>>>               if (ret != 0) {
>>>                   break;
>>>               }
>>> @@ -1190,6 +1273,11 @@ static void *multifd_recv_thread(void *opaque)
>>>   
>>>           if (use_packets) {
>>>               if (flags & MULTIFD_FLAG_SYNC) {
>>> +                if (is_device_state) {
>>> +                    error_setg(&local_err, "multifd: received SYNC device state packet");
>>> +                    break;
>>> +                }
>> 
>> assert(!is_device_state) enough?
>
> It's not bug in the receiver code but rather an issue with the
> remote QEMU sending us wrong data if we get a SYNC device state
> packet.
>
> So I think returning an error is more appropriate than triggering
> an assert() failure for that.

ok

>>> +
>>>                   qemu_sem_post(&multifd_recv_state->sem_sync);
>>>                   qemu_sem_wait(&p->sem_sync);
>>>               }
>>> @@ -1258,6 +1346,7 @@ int multifd_recv_setup(Error **errp)
>>>               p->packet_len = sizeof(MultiFDPacket_t)
>>>                   + sizeof(uint64_t) * page_count;
>>>               p->packet = g_malloc0(p->packet_len);
>>> +            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
>>>           }
>>>           p->name = g_strdup_printf("mig/dst/recv_%d", i);
>>>           p->normal = g_new0(ram_addr_t, page_count);
>>> diff --git a/migration/multifd.h b/migration/multifd.h
>>> index a3e35196d179..a8f3e4838c01 100644
>>> --- a/migration/multifd.h
>>> +++ b/migration/multifd.h
>>> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>>>   #define MULTIFD_FLAG_QPL (4 << 1)
>>>   #define MULTIFD_FLAG_UADK (8 << 1)
>>>   
>>> +/*
>>> + * If set it means that this packet contains device state
>>> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
>>> + */
>>> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
>> 
>> Overlaps with UADK. I assume on purpose because device_state doesn't
>> support compression? Might be worth a comment.
>> 
>
> Yes, the device state transfer bit stream does not support compression
> so it is not a problem since these "compression type" flags will never
> be set in such bit stream anyway.
>
> Will add a relevant comment here.
>
> Thanks,
> Maciej
Maciej S. Szmigiero Sept. 3, 2024, 6:41 p.m. UTC | #4
On 3.09.2024 16:42, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> On 30.08.2024 22:22, Fabiano Rosas wrote:
>>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>>
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> Add a basic support for receiving device state via multifd channels -
>>>> channels that are shared with RAM transfers.
>>>>
>>>> To differentiate between a device state and a RAM packet the packet
>>>> header is read first.
>>>>
>>>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
>>>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>>>> data (existing MultiFDPacket_t) is then read.
>>>>
>>>> The received device state data is provided to
>>>> qemu_loadvm_load_state_buffer() function for processing in the
>>>> device's load_state_buffer handler.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>>    migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>>>>    migration/multifd.h |  31 ++++++++++-
>>>>    2 files changed, 138 insertions(+), 20 deletions(-)
>>>>
>>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>>> index b06a9fab500e..d5a8e5a9c9b5 100644
>>>> --- a/migration/multifd.c
>>>> +++ b/migration/multifd.c
(..)
>>>> +
>>>> +            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
>>>> +                                           &local_err);
>>>>                if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>>>>                    break;
>>>>                }
>>>> @@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
>>>>                has_data = !!p->data->size;
>>>>            }
>>>>    
>>>> -        if (has_data) {
>>>> -            ret = multifd_recv_state->ops->recv(p, &local_err);
>>>> +        if (!is_device_state) {
>>>> +            if (has_data) {
>>>> +                ret = multifd_recv_state->ops->recv(p, &local_err);
>>>> +                if (ret != 0) {
>>>> +                    break;
>>>> +                }
>>>> +            }
>>>> +        } else {
>>>> +            g_autofree char *idstr = NULL;
>>>> +            g_autofree char *dev_state_buf = NULL;
>>>> +
>>>> +            assert(use_packets);
>>>> +
>>>> +            if (p->next_packet_size > 0) {
>>>> +                dev_state_buf = g_malloc(p->next_packet_size);
>>>> +
>>>> +                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
>>>> +                if (ret != 0) {
>>>> +                    break;
>>>> +                }
>>>> +            }
>>>
>>> What's the use case for !next_packet_size and still call
>>> load_state_buffer below? I can't see it.
>>
>> Currently, next_packet_size == 0 has not usage indeed - it is
>> a leftover from an early version of the patch set (not public)
>> that had device state packet (chunk) indexing done by
>> the common migration code, rather than by the VFIO consumer.
>>
>> And then an empty packet could be used to mark the stream
>> boundary - like the max chunk number to expect.
>>
>>> ...because I would suggest to set has_data up there with
>>> p->next_packet_size:
>>>
>>> if (use_packets) {
>>>      ...
>>>      has_data = p->next_packet_size || p->zero_num;
>>> } else {
>>>      ...
>>>      has_data = !!p->data_size;
>>> }
>>>
>>> and this whole block would be:
>>>
>>> if (has_data) {
>>>      if (is_device_state) {
>>>          multifd_device_state_recv(p, &local_err);
>>>      } else {
>>>          ret = multifd_recv_state->ops->recv(p, &local_err);
>>>      }
>>> }
>>
>> The above block makes sense to me with two caveats:
> 
> I have suggestions below, but this is no big deal, so feel free to go
> with what you think works best.
> 
>> 1) If empty device state packets (next_packet_size == 0) were
>> to be unsupported they need to be rejected cleanly rather
>> than silently skipped,
> 
> Should this be rejected on the send side? That's the most likely source
> of the problem if it happens. Don't need to send something we know will
> cause an error when loading.

Definitely we should send correct bit stream :), it was about the
case of bit stream corruption or simply using some future bit stream
format that the QEMU version with this patch set does not understand
yet.

> And for the case of stream corruption of some sort we could hoist the
> check from load_buffer into here:
> 
>   else if (is_device_state) {
>      error_setg(errp, "empty device state packet);
>      break;
> }

Right.

>> 2) has_data has to have its value computed depending on whether
>> this is a RAM or a device state packet since looking at
>> p->normal_num and p->zero_num makes no sense for a device state
>> packet while I am not sure that looking at p->next_packet_size
>> for a RAM packet won't introduce some subtle regression.
> 
> It should be ok to use next_packet_size for RAM, it must always be in
> sync with normal_num.

Then it should be ok, but I'll look at this deeper to be sure when
I will be preparing the next patch set version.

Thanks,
Maciej
Avihai Horon Sept. 5, 2024, 4:47 p.m. UTC | #5
On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Add a basic support for receiving device state via multifd channels -
> channels that are shared with RAM transfers.
>
> To differentiate between a device state and a RAM packet the packet
> header is read first.
>
> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
> packet header either device state (MultiFDPacketDeviceState_t) or RAM
> data (existing MultiFDPacket_t) is then read.
>
> The received device state data is provided to
> qemu_loadvm_load_state_buffer() function for processing in the
> device's load_state_buffer handler.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>   migration/multifd.h |  31 ++++++++++-
>   2 files changed, 138 insertions(+), 20 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b06a9fab500e..d5a8e5a9c9b5 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -21,6 +21,7 @@
>   #include "file.h"
>   #include "migration.h"
>   #include "migration-stats.h"
> +#include "savevm.h"
>   #include "socket.h"
>   #include "tls.h"
>   #include "qemu-file.h"
> @@ -209,10 +210,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>
>       memset(packet, 0, p->packet_len);
>
> -    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
> -    packet->version = cpu_to_be32(MULTIFD_VERSION);
> +    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
> +    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
>
> -    packet->flags = cpu_to_be32(p->flags);
> +    packet->hdr.flags = cpu_to_be32(p->flags);
>       packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>
>       packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
> @@ -228,31 +229,49 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>                               p->flags, p->next_packet_size);
>   }
>
> -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
> +                                             MultiFDPacketHdr_t *hdr,
> +                                             Error **errp)
>   {
> -    MultiFDPacket_t *packet = p->packet;
> -    int ret = 0;
> -
> -    packet->magic = be32_to_cpu(packet->magic);
> -    if (packet->magic != MULTIFD_MAGIC) {
> +    hdr->magic = be32_to_cpu(hdr->magic);
> +    if (hdr->magic != MULTIFD_MAGIC) {
>           error_setg(errp, "multifd: received packet "
>                      "magic %x and expected magic %x",
> -                   packet->magic, MULTIFD_MAGIC);
> +                   hdr->magic, MULTIFD_MAGIC);
>           return -1;
>       }
>
> -    packet->version = be32_to_cpu(packet->version);
> -    if (packet->version != MULTIFD_VERSION) {
> +    hdr->version = be32_to_cpu(hdr->version);
> +    if (hdr->version != MULTIFD_VERSION) {
>           error_setg(errp, "multifd: received packet "
>                      "version %u and expected version %u",
> -                   packet->version, MULTIFD_VERSION);
> +                   hdr->version, MULTIFD_VERSION);
>           return -1;
>       }
>
> -    p->flags = be32_to_cpu(packet->flags);
> +    p->flags = be32_to_cpu(hdr->flags);
> +
> +    return 0;
> +}
> +
> +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
> +                                                   Error **errp)
> +{
> +    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
> +
> +    packet->instance_id = be32_to_cpu(packet->instance_id);
> +    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
> +
> +    return 0;
> +}
> +
> +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
> +{
> +    MultiFDPacket_t *packet = p->packet;
> +    int ret = 0;
> +
>       p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>       p->packet_num = be64_to_cpu(packet->packet_num);
> -    p->packets_recved++;
>
>       if (!(p->flags & MULTIFD_FLAG_SYNC)) {
>           ret = multifd_ram_unfill_packet(p, errp);
> @@ -264,6 +283,19 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>       return ret;
>   }
>
> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
> +{
> +    p->packets_recved++;
> +
> +    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
> +        return multifd_recv_unfill_packet_device_state(p, errp);
> +    } else {
> +        return multifd_recv_unfill_packet_ram(p, errp);
> +    }
> +
> +    g_assert_not_reached();

We can drop the assert and the "else":
if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
     return multifd_recv_unfill_packet_device_state(p, errp);
}

return multifd_recv_unfill_packet_ram(p, errp);

> +}
> +
>   static bool multifd_send_should_exit(void)
>   {
>       return qatomic_read(&multifd_send_state->exiting);
> @@ -1014,6 +1046,7 @@ static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
>       p->packet_len = 0;
>       g_free(p->packet);
>       p->packet = NULL;
> +    g_clear_pointer(&p->packet_dev_state, g_free);
>       g_free(p->normal);
>       p->normal = NULL;
>       g_free(p->zero);
> @@ -1126,8 +1159,13 @@ static void *multifd_recv_thread(void *opaque)
>       rcu_register_thread();
>
>       while (true) {
> +        MultiFDPacketHdr_t hdr;
>           uint32_t flags = 0;
> +        bool is_device_state = false;
>           bool has_data = false;
> +        uint8_t *pkt_buf;
> +        size_t pkt_len;
> +
>           p->normal_num = 0;
>
>           if (use_packets) {
> @@ -1135,8 +1173,28 @@ static void *multifd_recv_thread(void *opaque)
>                   break;
>               }
>
> -            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
> -                                           p->packet_len, &local_err);
> +            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
> +                                           sizeof(hdr), &local_err);
> +            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
> +                break;
> +            }
> +
> +            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
> +            if (ret) {
> +                break;
> +            }
> +
> +            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
> +            if (is_device_state) {
> +                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
> +                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
> +            } else {
> +                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
> +                pkt_len = p->packet_len - sizeof(hdr);
> +            }
> +
> +            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
> +                                           &local_err);
>               if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
>                   break;
>               }
> @@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
>               has_data = !!p->data->size;
>           }
>
> -        if (has_data) {
> -            ret = multifd_recv_state->ops->recv(p, &local_err);
> +        if (!is_device_state) {
> +            if (has_data) {
> +                ret = multifd_recv_state->ops->recv(p, &local_err);
> +                if (ret != 0) {
> +                    break;
> +                }
> +            }
> +        } else {
> +            g_autofree char *idstr = NULL;
> +            g_autofree char *dev_state_buf = NULL;
> +
> +            assert(use_packets);
> +
> +            if (p->next_packet_size > 0) {
> +                dev_state_buf = g_malloc(p->next_packet_size);
> +
> +                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
> +                if (ret != 0) {
> +                    break;
> +                }
> +            }
> +
> +            idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
> +            ret = qemu_loadvm_load_state_buffer(idstr,
> +                                                p->packet_dev_state->instance_id,
> +                                                dev_state_buf, p->next_packet_size,
> +                                                &local_err);
>               if (ret != 0) {
>                   break;
>               }
> @@ -1190,6 +1273,11 @@ static void *multifd_recv_thread(void *opaque)
>
>           if (use_packets) {
>               if (flags & MULTIFD_FLAG_SYNC) {
> +                if (is_device_state) {
> +                    error_setg(&local_err, "multifd: received SYNC device state packet");
> +                    break;
> +                }
> +
>                   qemu_sem_post(&multifd_recv_state->sem_sync);
>                   qemu_sem_wait(&p->sem_sync);
>               }
> @@ -1258,6 +1346,7 @@ int multifd_recv_setup(Error **errp)
>               p->packet_len = sizeof(MultiFDPacket_t)
>                   + sizeof(uint64_t) * page_count;
>               p->packet = g_malloc0(p->packet_len);
> +            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
>           }
>           p->name = g_strdup_printf("mig/dst/recv_%d", i);
>           p->normal = g_new0(ram_addr_t, page_count);
> diff --git a/migration/multifd.h b/migration/multifd.h
> index a3e35196d179..a8f3e4838c01 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>   #define MULTIFD_FLAG_QPL (4 << 1)
>   #define MULTIFD_FLAG_UADK (8 << 1)
>
> +/*
> + * If set it means that this packet contains device state
> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
> + */
> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
> +
>   /* This value needs to be a multiple of qemu_target_page_size() */
>   #define MULTIFD_PACKET_SIZE (512 * 1024)
>
> @@ -52,6 +58,11 @@ typedef struct {
>       uint32_t magic;
>       uint32_t version;
>       uint32_t flags;
> +} __attribute__((packed)) MultiFDPacketHdr_t;

Maybe split this patch into two: one that adds the packet header concept 
and another that adds the new device packet?

> +
> +typedef struct {
> +    MultiFDPacketHdr_t hdr;
> +
>       /* maximum number of allocated pages */
>       uint32_t pages_alloc;
>       /* non zero pages */
> @@ -72,6 +83,16 @@ typedef struct {
>       uint64_t offset[];
>   } __attribute__((packed)) MultiFDPacket_t;
>
> +typedef struct {
> +    MultiFDPacketHdr_t hdr;
> +
> +    char idstr[256] QEMU_NONSTRING;

idstr should be null terminated, or am I missing something?

Thanks.

> +    uint32_t instance_id;
> +
> +    /* size of the next packet that contains the actual data */
> +    uint32_t next_packet_size;
> +} __attribute__((packed)) MultiFDPacketDeviceState_t;
> +
>   typedef struct {
>       /* number of used pages */
>       uint32_t num;
> @@ -89,6 +110,13 @@ struct MultiFDRecvData {
>       off_t file_offset;
>   };
>
> +typedef struct {
> +    char *idstr;
> +    uint32_t instance_id;
> +    char *buf;
> +    size_t buf_len;
> +} MultiFDDeviceState_t;
> +
>   typedef enum {
>       MULTIFD_PAYLOAD_NONE,
>       MULTIFD_PAYLOAD_RAM,
> @@ -204,8 +232,9 @@ typedef struct {
>
>       /* thread local variables. No locking required */
>
> -    /* pointer to the packet */
> +    /* pointers to the possible packet types */
>       MultiFDPacket_t *packet;
> +    MultiFDPacketDeviceState_t *packet_dev_state;
>       /* size of the next packet that contains pages */
>       uint32_t next_packet_size;
>       /* packets received through this channel */
Maciej S. Szmigiero Sept. 9, 2024, 6:05 p.m. UTC | #6
On 5.09.2024 18:47, Avihai Horon wrote:
> 
> On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Add a basic support for receiving device state via multifd channels -
>> channels that are shared with RAM transfers.
>>
>> To differentiate between a device state and a RAM packet the packet
>> header is read first.
>>
>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>> data (existing MultiFDPacket_t) is then read.
>>
>> The received device state data is provided to
>> qemu_loadvm_load_state_buffer() function for processing in the
>> device's load_state_buffer handler.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
>>   migration/multifd.h |  31 ++++++++++-
>>   2 files changed, 138 insertions(+), 20 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index b06a9fab500e..d5a8e5a9c9b5 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -21,6 +21,7 @@
>>   #include "file.h"
>>   #include "migration.h"
>>   #include "migration-stats.h"
>> +#include "savevm.h"
>>   #include "socket.h"
>>   #include "tls.h"
>>   #include "qemu-file.h"
>> @@ -209,10 +210,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>>
>>       memset(packet, 0, p->packet_len);
>>
>> -    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>> -    packet->version = cpu_to_be32(MULTIFD_VERSION);
>> +    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
>> +    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
>>
>> -    packet->flags = cpu_to_be32(p->flags);
>> +    packet->hdr.flags = cpu_to_be32(p->flags);
>>       packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>>
>>       packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
>> @@ -228,31 +229,49 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
>>                               p->flags, p->next_packet_size);
>>   }
>>
>> -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>> +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
>> +                                             MultiFDPacketHdr_t *hdr,
>> +                                             Error **errp)
>>   {
>> -    MultiFDPacket_t *packet = p->packet;
>> -    int ret = 0;
>> -
>> -    packet->magic = be32_to_cpu(packet->magic);
>> -    if (packet->magic != MULTIFD_MAGIC) {
>> +    hdr->magic = be32_to_cpu(hdr->magic);
>> +    if (hdr->magic != MULTIFD_MAGIC) {
>>           error_setg(errp, "multifd: received packet "
>>                      "magic %x and expected magic %x",
>> -                   packet->magic, MULTIFD_MAGIC);
>> +                   hdr->magic, MULTIFD_MAGIC);
>>           return -1;
>>       }
>>
>> -    packet->version = be32_to_cpu(packet->version);
>> -    if (packet->version != MULTIFD_VERSION) {
>> +    hdr->version = be32_to_cpu(hdr->version);
>> +    if (hdr->version != MULTIFD_VERSION) {
>>           error_setg(errp, "multifd: received packet "
>>                      "version %u and expected version %u",
>> -                   packet->version, MULTIFD_VERSION);
>> +                   hdr->version, MULTIFD_VERSION);
>>           return -1;
>>       }
>>
>> -    p->flags = be32_to_cpu(packet->flags);
>> +    p->flags = be32_to_cpu(hdr->flags);
>> +
>> +    return 0;
>> +}
>> +
>> +static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
>> +                                                   Error **errp)
>> +{
>> +    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
>> +
>> +    packet->instance_id = be32_to_cpu(packet->instance_id);
>> +    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>> +
>> +    return 0;
>> +}
>> +
>> +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
>> +{
>> +    MultiFDPacket_t *packet = p->packet;
>> +    int ret = 0;
>> +
>>       p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>>       p->packet_num = be64_to_cpu(packet->packet_num);
>> -    p->packets_recved++;
>>
>>       if (!(p->flags & MULTIFD_FLAG_SYNC)) {
>>           ret = multifd_ram_unfill_packet(p, errp);
>> @@ -264,6 +283,19 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>>       return ret;
>>   }
>>
>> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>> +{
>> +    p->packets_recved++;
>> +
>> +    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>> +        return multifd_recv_unfill_packet_device_state(p, errp);
>> +    } else {
>> +        return multifd_recv_unfill_packet_ram(p, errp);
>> +    }
>> +
>> +    g_assert_not_reached();
> 
> We can drop the assert and the "else":
> if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>      return multifd_recv_unfill_packet_device_state(p, errp);
> }
> 
> return multifd_recv_unfill_packet_ram(p, errp);

Ack.

>> +}
>> +
>>   static bool multifd_send_should_exit(void)
>>   {
>>       return qatomic_read(&multifd_send_state->exiting);
>> diff --git a/migration/multifd.h b/migration/multifd.h
>> index a3e35196d179..a8f3e4838c01 100644
>> --- a/migration/multifd.h
>> +++ b/migration/multifd.h
>> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>>   #define MULTIFD_FLAG_QPL (4 << 1)
>>   #define MULTIFD_FLAG_UADK (8 << 1)
>>
>> +/*
>> + * If set it means that this packet contains device state
>> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
>> + */
>> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
>> +
>>   /* This value needs to be a multiple of qemu_target_page_size() */
>>   #define MULTIFD_PACKET_SIZE (512 * 1024)
>>
>> @@ -52,6 +58,11 @@ typedef struct {
>>       uint32_t magic;
>>       uint32_t version;
>>       uint32_t flags;
>> +} __attribute__((packed)) MultiFDPacketHdr_t;
> 
> Maybe split this patch into two: one that adds the packet header concept and another that adds the new device packet?

Can do.

>> +
>> +typedef struct {
>> +    MultiFDPacketHdr_t hdr;
>> +
>>       /* maximum number of allocated pages */
>>       uint32_t pages_alloc;
>>       /* non zero pages */
>> @@ -72,6 +83,16 @@ typedef struct {
>>       uint64_t offset[];
>>   } __attribute__((packed)) MultiFDPacket_t;
>>
>> +typedef struct {
>> +    MultiFDPacketHdr_t hdr;
>> +
>> +    char idstr[256] QEMU_NONSTRING;
> 
> idstr should be null terminated, or am I missing something?

There's no need to always NULL-terminate a constant-size field,
since the strncpy() already stops at the field size, so we can
gain another byte for actual string use this way.

RAM block idstr also uses the same "trick":
> void multifd_ram_fill_packet(MultiFDSendParams *p):
> strncpy(packet->ramblock, pages->block->idstr, 256);

> Thanks.

Thanks,
Maciej
Peter Xu Sept. 9, 2024, 7:52 p.m. UTC | #7
On Mon, Sep 02, 2024 at 10:12:01PM +0200, Maciej S. Szmigiero wrote:
> > > diff --git a/migration/multifd.h b/migration/multifd.h
> > > index a3e35196d179..a8f3e4838c01 100644
> > > --- a/migration/multifd.h
> > > +++ b/migration/multifd.h
> > > @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
> > >   #define MULTIFD_FLAG_QPL (4 << 1)
> > >   #define MULTIFD_FLAG_UADK (8 << 1)
> > > +/*
> > > + * If set it means that this packet contains device state
> > > + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
> > > + */
> > > +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
> > 
> > Overlaps with UADK. I assume on purpose because device_state doesn't
> > support compression? Might be worth a comment.
> > 
> 
> Yes, the device state transfer bit stream does not support compression
> so it is not a problem since these "compression type" flags will never
> be set in such bit stream anyway.
> 
> Will add a relevant comment here.

Why reuse?  Would using a new bit easier if we still have plenty of bits
(just to tell what is what directly from a stream dump)?
Avihai Horon Sept. 12, 2024, 8:13 a.m. UTC | #8
On 09/09/2024 21:05, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> On 5.09.2024 18:47, Avihai Horon wrote:
>>
>> On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
>>> External email: Use caution opening links or attachments
>>>
>>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Add a basic support for receiving device state via multifd channels -
>>> channels that are shared with RAM transfers.
>>>
>>> To differentiate between a device state and a RAM packet the packet
>>> header is read first.
>>>
>>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not 
>>> in the
>>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>>> data (existing MultiFDPacket_t) is then read.
>>>
>>> The received device state data is provided to
>>> qemu_loadvm_load_state_buffer() function for processing in the
>>> device's load_state_buffer handler.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   migration/multifd.c | 127 
>>> +++++++++++++++++++++++++++++++++++++-------
>>>   migration/multifd.h |  31 ++++++++++-
>>>   2 files changed, 138 insertions(+), 20 deletions(-)
>>>
>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>> index b06a9fab500e..d5a8e5a9c9b5 100644
>>> --- a/migration/multifd.c
>>> +++ b/migration/multifd.c
>>> @@ -21,6 +21,7 @@
>>>   #include "file.h"
>>>   #include "migration.h"
>>>   #include "migration-stats.h"
>>> +#include "savevm.h"
>>>   #include "socket.h"
>>>   #include "tls.h"
>>>   #include "qemu-file.h"
>>> @@ -209,10 +210,10 @@ void 
>>> multifd_send_fill_packet(MultiFDSendParams *p)
>>>
>>>       memset(packet, 0, p->packet_len);
>>>
>>> -    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>>> -    packet->version = cpu_to_be32(MULTIFD_VERSION);
>>> +    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
>>> +    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
>>>
>>> -    packet->flags = cpu_to_be32(p->flags);
>>> +    packet->hdr.flags = cpu_to_be32(p->flags);
>>>       packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>>>
>>>       packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
>>> @@ -228,31 +229,49 @@ void 
>>> multifd_send_fill_packet(MultiFDSendParams *p)
>>>                               p->flags, p->next_packet_size);
>>>   }
>>>
>>> -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error 
>>> **errp)
>>> +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
>>> + MultiFDPacketHdr_t *hdr,
>>> +                                             Error **errp)
>>>   {
>>> -    MultiFDPacket_t *packet = p->packet;
>>> -    int ret = 0;
>>> -
>>> -    packet->magic = be32_to_cpu(packet->magic);
>>> -    if (packet->magic != MULTIFD_MAGIC) {
>>> +    hdr->magic = be32_to_cpu(hdr->magic);
>>> +    if (hdr->magic != MULTIFD_MAGIC) {
>>>           error_setg(errp, "multifd: received packet "
>>>                      "magic %x and expected magic %x",
>>> -                   packet->magic, MULTIFD_MAGIC);
>>> +                   hdr->magic, MULTIFD_MAGIC);
>>>           return -1;
>>>       }
>>>
>>> -    packet->version = be32_to_cpu(packet->version);
>>> -    if (packet->version != MULTIFD_VERSION) {
>>> +    hdr->version = be32_to_cpu(hdr->version);
>>> +    if (hdr->version != MULTIFD_VERSION) {
>>>           error_setg(errp, "multifd: received packet "
>>>                      "version %u and expected version %u",
>>> -                   packet->version, MULTIFD_VERSION);
>>> +                   hdr->version, MULTIFD_VERSION);
>>>           return -1;
>>>       }
>>>
>>> -    p->flags = be32_to_cpu(packet->flags);
>>> +    p->flags = be32_to_cpu(hdr->flags);
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int 
>>> multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
>>> +                                                   Error **errp)
>>> +{
>>> +    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
>>> +
>>> +    packet->instance_id = be32_to_cpu(packet->instance_id);
>>> +    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, 
>>> Error **errp)
>>> +{
>>> +    MultiFDPacket_t *packet = p->packet;
>>> +    int ret = 0;
>>> +
>>>       p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>>>       p->packet_num = be64_to_cpu(packet->packet_num);
>>> -    p->packets_recved++;
>>>
>>>       if (!(p->flags & MULTIFD_FLAG_SYNC)) {
>>>           ret = multifd_ram_unfill_packet(p, errp);
>>> @@ -264,6 +283,19 @@ static int 
>>> multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>>>       return ret;
>>>   }
>>>
>>> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error 
>>> **errp)
>>> +{
>>> +    p->packets_recved++;
>>> +
>>> +    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>>> +        return multifd_recv_unfill_packet_device_state(p, errp);
>>> +    } else {
>>> +        return multifd_recv_unfill_packet_ram(p, errp);
>>> +    }
>>> +
>>> +    g_assert_not_reached();
>>
>> We can drop the assert and the "else":
>> if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>>      return multifd_recv_unfill_packet_device_state(p, errp);
>> }
>>
>> return multifd_recv_unfill_packet_ram(p, errp);
>
> Ack.
>
>>> +}
>>> +
>>>   static bool multifd_send_should_exit(void)
>>>   {
>>>       return qatomic_read(&multifd_send_state->exiting);
>>> diff --git a/migration/multifd.h b/migration/multifd.h
>>> index a3e35196d179..a8f3e4838c01 100644
>>> --- a/migration/multifd.h
>>> +++ b/migration/multifd.h
>>> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>>>   #define MULTIFD_FLAG_QPL (4 << 1)
>>>   #define MULTIFD_FLAG_UADK (8 << 1)
>>>
>>> +/*
>>> + * If set it means that this packet contains device state
>>> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
>>> + */
>>> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
>>> +
>>>   /* This value needs to be a multiple of qemu_target_page_size() */
>>>   #define MULTIFD_PACKET_SIZE (512 * 1024)
>>>
>>> @@ -52,6 +58,11 @@ typedef struct {
>>>       uint32_t magic;
>>>       uint32_t version;
>>>       uint32_t flags;
>>> +} __attribute__((packed)) MultiFDPacketHdr_t;
>>
>> Maybe split this patch into two: one that adds the packet header 
>> concept and another that adds the new device packet?
>
> Can do.
>
>>> +
>>> +typedef struct {
>>> +    MultiFDPacketHdr_t hdr;
>>> +
>>>       /* maximum number of allocated pages */
>>>       uint32_t pages_alloc;
>>>       /* non zero pages */
>>> @@ -72,6 +83,16 @@ typedef struct {
>>>       uint64_t offset[];
>>>   } __attribute__((packed)) MultiFDPacket_t;
>>>
>>> +typedef struct {
>>> +    MultiFDPacketHdr_t hdr;
>>> +
>>> +    char idstr[256] QEMU_NONSTRING;
>>
>> idstr should be null terminated, or am I missing something?
>
> There's no need to always NULL-terminate a constant-size field,
> since the strncpy() already stops at the field size, so we can
> gain another byte for actual string use this way.
>
> RAM block idstr also uses the same "trick":
>> void multifd_ram_fill_packet(MultiFDSendParams *p):
>> strncpy(packet->ramblock, pages->block->idstr, 256);
>
But can idstr actually be 256 bytes long without null byte?
There are a lot of places where idstr is a parameter for functions that 
expect null terminated string and it is also printed as such.

Thanks.
Fabiano Rosas Sept. 12, 2024, 1:52 p.m. UTC | #9
Avihai Horon <avihaih@nvidia.com> writes:

> On 09/09/2024 21:05, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> On 5.09.2024 18:47, Avihai Horon wrote:
>>>
>>> On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
>>>> External email: Use caution opening links or attachments
>>>>
>>>>
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> Add a basic support for receiving device state via multifd channels -
>>>> channels that are shared with RAM transfers.
>>>>
>>>> To differentiate between a device state and a RAM packet the packet
>>>> header is read first.
>>>>
>>>> Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not 
>>>> in the
>>>> packet header either device state (MultiFDPacketDeviceState_t) or RAM
>>>> data (existing MultiFDPacket_t) is then read.
>>>>
>>>> The received device state data is provided to
>>>> qemu_loadvm_load_state_buffer() function for processing in the
>>>> device's load_state_buffer handler.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>>   migration/multifd.c | 127 
>>>> +++++++++++++++++++++++++++++++++++++-------
>>>>   migration/multifd.h |  31 ++++++++++-
>>>>   2 files changed, 138 insertions(+), 20 deletions(-)
>>>>
>>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>>> index b06a9fab500e..d5a8e5a9c9b5 100644
>>>> --- a/migration/multifd.c
>>>> +++ b/migration/multifd.c
>>>> @@ -21,6 +21,7 @@
>>>>   #include "file.h"
>>>>   #include "migration.h"
>>>>   #include "migration-stats.h"
>>>> +#include "savevm.h"
>>>>   #include "socket.h"
>>>>   #include "tls.h"
>>>>   #include "qemu-file.h"
>>>> @@ -209,10 +210,10 @@ void 
>>>> multifd_send_fill_packet(MultiFDSendParams *p)
>>>>
>>>>       memset(packet, 0, p->packet_len);
>>>>
>>>> -    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>>>> -    packet->version = cpu_to_be32(MULTIFD_VERSION);
>>>> +    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
>>>> +    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
>>>>
>>>> -    packet->flags = cpu_to_be32(p->flags);
>>>> +    packet->hdr.flags = cpu_to_be32(p->flags);
>>>>       packet->next_packet_size = cpu_to_be32(p->next_packet_size);
>>>>
>>>>       packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
>>>> @@ -228,31 +229,49 @@ void 
>>>> multifd_send_fill_packet(MultiFDSendParams *p)
>>>>                               p->flags, p->next_packet_size);
>>>>   }
>>>>
>>>> -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error 
>>>> **errp)
>>>> +static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
>>>> + MultiFDPacketHdr_t *hdr,
>>>> +                                             Error **errp)
>>>>   {
>>>> -    MultiFDPacket_t *packet = p->packet;
>>>> -    int ret = 0;
>>>> -
>>>> -    packet->magic = be32_to_cpu(packet->magic);
>>>> -    if (packet->magic != MULTIFD_MAGIC) {
>>>> +    hdr->magic = be32_to_cpu(hdr->magic);
>>>> +    if (hdr->magic != MULTIFD_MAGIC) {
>>>>           error_setg(errp, "multifd: received packet "
>>>>                      "magic %x and expected magic %x",
>>>> -                   packet->magic, MULTIFD_MAGIC);
>>>> +                   hdr->magic, MULTIFD_MAGIC);
>>>>           return -1;
>>>>       }
>>>>
>>>> -    packet->version = be32_to_cpu(packet->version);
>>>> -    if (packet->version != MULTIFD_VERSION) {
>>>> +    hdr->version = be32_to_cpu(hdr->version);
>>>> +    if (hdr->version != MULTIFD_VERSION) {
>>>>           error_setg(errp, "multifd: received packet "
>>>>                      "version %u and expected version %u",
>>>> -                   packet->version, MULTIFD_VERSION);
>>>> +                   hdr->version, MULTIFD_VERSION);
>>>>           return -1;
>>>>       }
>>>>
>>>> -    p->flags = be32_to_cpu(packet->flags);
>>>> +    p->flags = be32_to_cpu(hdr->flags);
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int 
>>>> multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
>>>> +                                                   Error **errp)
>>>> +{
>>>> +    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
>>>> +
>>>> +    packet->instance_id = be32_to_cpu(packet->instance_id);
>>>> +    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, 
>>>> Error **errp)
>>>> +{
>>>> +    MultiFDPacket_t *packet = p->packet;
>>>> +    int ret = 0;
>>>> +
>>>>       p->next_packet_size = be32_to_cpu(packet->next_packet_size);
>>>>       p->packet_num = be64_to_cpu(packet->packet_num);
>>>> -    p->packets_recved++;
>>>>
>>>>       if (!(p->flags & MULTIFD_FLAG_SYNC)) {
>>>>           ret = multifd_ram_unfill_packet(p, errp);
>>>> @@ -264,6 +283,19 @@ static int 
>>>> multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
>>>>       return ret;
>>>>   }
>>>>
>>>> +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error 
>>>> **errp)
>>>> +{
>>>> +    p->packets_recved++;
>>>> +
>>>> +    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>>>> +        return multifd_recv_unfill_packet_device_state(p, errp);
>>>> +    } else {
>>>> +        return multifd_recv_unfill_packet_ram(p, errp);
>>>> +    }
>>>> +
>>>> +    g_assert_not_reached();
>>>
>>> We can drop the assert and the "else":
>>> if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
>>>      return multifd_recv_unfill_packet_device_state(p, errp);
>>> }
>>>
>>> return multifd_recv_unfill_packet_ram(p, errp);
>>
>> Ack.
>>
>>>> +}
>>>> +
>>>>   static bool multifd_send_should_exit(void)
>>>>   {
>>>>       return qatomic_read(&multifd_send_state->exiting);
>>>> diff --git a/migration/multifd.h b/migration/multifd.h
>>>> index a3e35196d179..a8f3e4838c01 100644
>>>> --- a/migration/multifd.h
>>>> +++ b/migration/multifd.h
>>>> @@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
>>>>   #define MULTIFD_FLAG_QPL (4 << 1)
>>>>   #define MULTIFD_FLAG_UADK (8 << 1)
>>>>
>>>> +/*
>>>> + * If set it means that this packet contains device state
>>>> + * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
>>>> + */
>>>> +#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
>>>> +
>>>>   /* This value needs to be a multiple of qemu_target_page_size() */
>>>>   #define MULTIFD_PACKET_SIZE (512 * 1024)
>>>>
>>>> @@ -52,6 +58,11 @@ typedef struct {
>>>>       uint32_t magic;
>>>>       uint32_t version;
>>>>       uint32_t flags;
>>>> +} __attribute__((packed)) MultiFDPacketHdr_t;
>>>
>>> Maybe split this patch into two: one that adds the packet header 
>>> concept and another that adds the new device packet?
>>
>> Can do.
>>
>>>> +
>>>> +typedef struct {
>>>> +    MultiFDPacketHdr_t hdr;
>>>> +
>>>>       /* maximum number of allocated pages */
>>>>       uint32_t pages_alloc;
>>>>       /* non zero pages */
>>>> @@ -72,6 +83,16 @@ typedef struct {
>>>>       uint64_t offset[];
>>>>   } __attribute__((packed)) MultiFDPacket_t;
>>>>
>>>> +typedef struct {
>>>> +    MultiFDPacketHdr_t hdr;
>>>> +
>>>> +    char idstr[256] QEMU_NONSTRING;
>>>
>>> idstr should be null terminated, or am I missing something?
>>
>> There's no need to always NULL-terminate a constant-size field,
>> since the strncpy() already stops at the field size, so we can
>> gain another byte for actual string use this way.
>>
>> RAM block idstr also uses the same "trick":
>>> void multifd_ram_fill_packet(MultiFDSendParams *p):
>>> strncpy(packet->ramblock, pages->block->idstr, 256);
>>
> But can idstr actually be 256 bytes long without null byte?
> There are a lot of places where idstr is a parameter for functions that 
> expect null terminated string and it is also printed as such.

Yeah, and I actually don't see the "trick" being used in
RAMBlock. Anyway, it's best to null terminate to be more predictable. We
also had Coverity reports about similar things:

https://lore.kernel.org/r/CAFEAcA_F2qrSAacY=V5Hez1qFGuNW0-XqL2LQ=Y_UKYuHEJWhw@mail.gmail.com

I haven't got the time to send that patch yet.

>
> Thanks.
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index b06a9fab500e..d5a8e5a9c9b5 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -21,6 +21,7 @@ 
 #include "file.h"
 #include "migration.h"
 #include "migration-stats.h"
+#include "savevm.h"
 #include "socket.h"
 #include "tls.h"
 #include "qemu-file.h"
@@ -209,10 +210,10 @@  void multifd_send_fill_packet(MultiFDSendParams *p)
 
     memset(packet, 0, p->packet_len);
 
-    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-    packet->version = cpu_to_be32(MULTIFD_VERSION);
+    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
 
-    packet->flags = cpu_to_be32(p->flags);
+    packet->hdr.flags = cpu_to_be32(p->flags);
     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
 
     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
@@ -228,31 +229,49 @@  void multifd_send_fill_packet(MultiFDSendParams *p)
                             p->flags, p->next_packet_size);
 }
 
-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
+                                             MultiFDPacketHdr_t *hdr,
+                                             Error **errp)
 {
-    MultiFDPacket_t *packet = p->packet;
-    int ret = 0;
-
-    packet->magic = be32_to_cpu(packet->magic);
-    if (packet->magic != MULTIFD_MAGIC) {
+    hdr->magic = be32_to_cpu(hdr->magic);
+    if (hdr->magic != MULTIFD_MAGIC) {
         error_setg(errp, "multifd: received packet "
                    "magic %x and expected magic %x",
-                   packet->magic, MULTIFD_MAGIC);
+                   hdr->magic, MULTIFD_MAGIC);
         return -1;
     }
 
-    packet->version = be32_to_cpu(packet->version);
-    if (packet->version != MULTIFD_VERSION) {
+    hdr->version = be32_to_cpu(hdr->version);
+    if (hdr->version != MULTIFD_VERSION) {
         error_setg(errp, "multifd: received packet "
                    "version %u and expected version %u",
-                   packet->version, MULTIFD_VERSION);
+                   hdr->version, MULTIFD_VERSION);
         return -1;
     }
 
-    p->flags = be32_to_cpu(packet->flags);
+    p->flags = be32_to_cpu(hdr->flags);
+
+    return 0;
+}
+
+static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
+                                                   Error **errp)
+{
+    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
+
+    packet->instance_id = be32_to_cpu(packet->instance_id);
+    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+
+    return 0;
+}
+
+static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int ret = 0;
+
     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
     p->packet_num = be64_to_cpu(packet->packet_num);
-    p->packets_recved++;
 
     if (!(p->flags & MULTIFD_FLAG_SYNC)) {
         ret = multifd_ram_unfill_packet(p, errp);
@@ -264,6 +283,19 @@  static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
     return ret;
 }
 
+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    p->packets_recved++;
+
+    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
+        return multifd_recv_unfill_packet_device_state(p, errp);
+    } else {
+        return multifd_recv_unfill_packet_ram(p, errp);
+    }
+
+    g_assert_not_reached();
+}
+
 static bool multifd_send_should_exit(void)
 {
     return qatomic_read(&multifd_send_state->exiting);
@@ -1014,6 +1046,7 @@  static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
     p->packet_len = 0;
     g_free(p->packet);
     p->packet = NULL;
+    g_clear_pointer(&p->packet_dev_state, g_free);
     g_free(p->normal);
     p->normal = NULL;
     g_free(p->zero);
@@ -1126,8 +1159,13 @@  static void *multifd_recv_thread(void *opaque)
     rcu_register_thread();
 
     while (true) {
+        MultiFDPacketHdr_t hdr;
         uint32_t flags = 0;
+        bool is_device_state = false;
         bool has_data = false;
+        uint8_t *pkt_buf;
+        size_t pkt_len;
+
         p->normal_num = 0;
 
         if (use_packets) {
@@ -1135,8 +1173,28 @@  static void *multifd_recv_thread(void *opaque)
                 break;
             }
 
-            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                           p->packet_len, &local_err);
+            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
+                                           sizeof(hdr), &local_err);
+            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
+                break;
+            }
+
+            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
+            if (ret) {
+                break;
+            }
+
+            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
+            if (is_device_state) {
+                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
+                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
+            } else {
+                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
+                pkt_len = p->packet_len - sizeof(hdr);
+            }
+
+            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
+                                           &local_err);
             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
                 break;
             }
@@ -1181,8 +1239,33 @@  static void *multifd_recv_thread(void *opaque)
             has_data = !!p->data->size;
         }
 
-        if (has_data) {
-            ret = multifd_recv_state->ops->recv(p, &local_err);
+        if (!is_device_state) {
+            if (has_data) {
+                ret = multifd_recv_state->ops->recv(p, &local_err);
+                if (ret != 0) {
+                    break;
+                }
+            }
+        } else {
+            g_autofree char *idstr = NULL;
+            g_autofree char *dev_state_buf = NULL;
+
+            assert(use_packets);
+
+            if (p->next_packet_size > 0) {
+                dev_state_buf = g_malloc(p->next_packet_size);
+
+                ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, &local_err);
+                if (ret != 0) {
+                    break;
+                }
+            }
+
+            idstr = g_strndup(p->packet_dev_state->idstr, sizeof(p->packet_dev_state->idstr));
+            ret = qemu_loadvm_load_state_buffer(idstr,
+                                                p->packet_dev_state->instance_id,
+                                                dev_state_buf, p->next_packet_size,
+                                                &local_err);
             if (ret != 0) {
                 break;
             }
@@ -1190,6 +1273,11 @@  static void *multifd_recv_thread(void *opaque)
 
         if (use_packets) {
             if (flags & MULTIFD_FLAG_SYNC) {
+                if (is_device_state) {
+                    error_setg(&local_err, "multifd: received SYNC device state packet");
+                    break;
+                }
+
                 qemu_sem_post(&multifd_recv_state->sem_sync);
                 qemu_sem_wait(&p->sem_sync);
             }
@@ -1258,6 +1346,7 @@  int multifd_recv_setup(Error **errp)
             p->packet_len = sizeof(MultiFDPacket_t)
                 + sizeof(uint64_t) * page_count;
             p->packet = g_malloc0(p->packet_len);
+            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
         }
         p->name = g_strdup_printf("mig/dst/recv_%d", i);
         p->normal = g_new0(ram_addr_t, page_count);
diff --git a/migration/multifd.h b/migration/multifd.h
index a3e35196d179..a8f3e4838c01 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -45,6 +45,12 @@  MultiFDRecvData *multifd_get_recv_data(void);
 #define MULTIFD_FLAG_QPL (4 << 1)
 #define MULTIFD_FLAG_UADK (8 << 1)
 
+/*
+ * If set it means that this packet contains device state
+ * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
+ */
+#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
+
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
 
@@ -52,6 +58,11 @@  typedef struct {
     uint32_t magic;
     uint32_t version;
     uint32_t flags;
+} __attribute__((packed)) MultiFDPacketHdr_t;
+
+typedef struct {
+    MultiFDPacketHdr_t hdr;
+
     /* maximum number of allocated pages */
     uint32_t pages_alloc;
     /* non zero pages */
@@ -72,6 +83,16 @@  typedef struct {
     uint64_t offset[];
 } __attribute__((packed)) MultiFDPacket_t;
 
+typedef struct {
+    MultiFDPacketHdr_t hdr;
+
+    char idstr[256] QEMU_NONSTRING;
+    uint32_t instance_id;
+
+    /* size of the next packet that contains the actual data */
+    uint32_t next_packet_size;
+} __attribute__((packed)) MultiFDPacketDeviceState_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t num;
@@ -89,6 +110,13 @@  struct MultiFDRecvData {
     off_t file_offset;
 };
 
+typedef struct {
+    char *idstr;
+    uint32_t instance_id;
+    char *buf;
+    size_t buf_len;
+} MultiFDDeviceState_t;
+
 typedef enum {
     MULTIFD_PAYLOAD_NONE,
     MULTIFD_PAYLOAD_RAM,
@@ -204,8 +232,9 @@  typedef struct {
 
     /* thread local variables. No locking required */
 
-    /* pointer to the packet */
+    /* pointers to the possible packet types */
     MultiFDPacket_t *packet;
+    MultiFDPacketDeviceState_t *packet_dev_state;
     /* size of the next packet that contains pages */
     uint32_t next_packet_size;
     /* packets received through this channel */