diff mbox series

[RFC,v1,7/7] migration/snapshot: Implementation of qemu-snapshot load path in postcopy mode

Message ID 20210512192619.537268-8-andrey.gruzdev@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series migration/snapshot: External snapshot utility | expand

Commit Message

Andrey Gruzdev May 12, 2021, 7:26 p.m. UTC
The commit enables asynchronous snapshot loading using standard postcopy
migration mechanism on destination VM.

The point of switchover to postcopy is trivially selected based on
percentage of non-zero pages loaded in precopy.

Signed-off-by: Andrey Gruzdev <andrey.gruzdev@virtuozzo.com>
---
 include/qemu-snapshot.h |  12 +
 qemu-snapshot-vm.c      | 485 +++++++++++++++++++++++++++++++++++++++-
 qemu-snapshot.c         |  16 ++
 3 files changed, 508 insertions(+), 5 deletions(-)
diff mbox series

Patch

diff --git a/include/qemu-snapshot.h b/include/qemu-snapshot.h
index aae730d70e..84a0c38e08 100644
--- a/include/qemu-snapshot.h
+++ b/include/qemu-snapshot.h
@@ -36,10 +36,14 @@ 
 
 /* AIO transfer size */
 #define AIO_TRANSFER_SIZE           BDRV_CLUSTER_SIZE
+/* AIO transfer size for postcopy */
+#define AIO_TRANSFER_SIZE_LOWLAT    (BDRV_CLUSTER_SIZE / 4)
 /* AIO ring size */
 #define AIO_RING_SIZE               64
 /* AIO ring in-flight limit */
 #define AIO_RING_INFLIGHT           16
+/* AIO ring in-flight limit for postcopy */
+#define AIO_RING_INFLIGHT_LOWLAT    4
 
 typedef struct AioRing AioRing;
 
@@ -97,12 +101,20 @@  typedef struct StateSaveCtx {
 typedef struct StateLoadCtx {
     BlockBackend *blk;              /* Block backend */
     QEMUFile *f_fd;                 /* QEMUFile for outgoing stream */
+    QEMUFile *f_rp_fd;              /* QEMUFile for return path stream */
     QEMUFile *f_vmstate;            /* QEMUFile for vmstate backing */
 
     QIOChannelBuffer *ioc_leader;   /* vmstate stream leader */
 
     AioRing *aio_ring;              /* AIO ring */
 
+    bool postcopy;                  /* From command-line --postcopy */
+    int postcopy_percent;           /* From command-line --postcopy */
+    bool in_postcopy;               /* In postcopy mode */
+
+    QemuThread rp_listen_thread;    /* Return path listening thread */
+    bool has_rp_listen_thread;      /* Have listening thread */
+
     /* vmstate offset of the section containing list of RAM blocks */
     int64_t ram_list_offset;
     /* vmstate offset of the first non-iterable device section */
diff --git a/qemu-snapshot-vm.c b/qemu-snapshot-vm.c
index dae5f84b80..76980520ea 100644
--- a/qemu-snapshot-vm.c
+++ b/qemu-snapshot-vm.c
@@ -40,7 +40,9 @@  typedef struct RAMBlock {
     int64_t nr_pages;           /* Page count */
     int64_t nr_slices;          /* Number of slices (for bitmap bookkeeping) */
 
-    unsigned long *bitmap;      /* Bitmap of RAM slices */
+    int64_t discard_offset;     /* Last page offset sent in precopy */
+
+    unsigned long *bitmap;      /* Bitmap of transferred RAM slices */
 
     /* Link into ram_list */
     QSIMPLEQ_ENTRY(RAMBlock) next;
@@ -54,17 +56,33 @@  typedef struct RAMPage {
     int64_t offset;             /* Page offset in RAM block */
 } RAMPage;
 
+/* Page request from destination in postcopy */
+typedef struct RAMPageRequest {
+    RAMBlock *block;            /* RAM block */
+    int64_t offset;             /* Offset in RAM block */
+    unsigned size;              /* Size of request */
+
+    /* Link into ram_ctx.page_req */
+    QSIMPLEQ_ENTRY(RAMPageRequest) next;
+} RAMPageRequest;
+
 /* RAM transfer context */
 typedef struct RAMCtx {
     int64_t normal_pages;       /* Total number of normal pages */
+    int64_t precopy_pages;      /* Normal pages to load in precopy */
     int64_t loaded_pages;       /* Number of normal pages loaded */
 
     RAMPage last_page;          /* Last loaded page */
 
     RAMBlock *last_sent_block;  /* RAM block of last sent page */
+    RAMBlock *last_req_block;   /* RAM block of last page request */
 
     /* RAM block list head */
     QSIMPLEQ_HEAD(, RAMBlock) ram_block_list;
+
+    /* Page request queue for postcopy */
+    QemuMutex page_req_mutex;
+    QSIMPLEQ_HEAD(, RAMPageRequest) page_req;
 } RAMCtx;
 
 /* Section handler ops */
@@ -848,6 +866,433 @@  static void load_state_check_errors(StateLoadCtx *s, int *res)
     }
 }
 
+static bool get_queued_page(RAMPage *page)
+{
+    RAMCtx *ram = &ram_ctx;
+
+    if (QSIMPLEQ_EMPTY_ATOMIC(&ram->page_req)) {
+        return false;
+    }
+
+    QEMU_LOCK_GUARD(&ram->page_req_mutex);
+
+    while (!QSIMPLEQ_EMPTY(&ram->page_req)) {
+        RAMPageRequest *entry = QSIMPLEQ_FIRST(&ram->page_req);
+        RAMBlock *block = entry->block;
+        int64_t slice = entry->offset >> slice_bits;
+
+        QSIMPLEQ_REMOVE_HEAD(&ram->page_req, next);
+        g_free(entry);
+
+        /*
+         * Test respective bit in RAM block's slice bitmap to check if
+         * we still haven't read that slice from the image.
+         */
+        if (test_bit(slice, block->bitmap)) {
+            page->block = block;
+            page->offset = slice << slice_bits;
+
+            return true;
+        }
+    }
+
+    return false;
+}
+
+static int queue_page_request(const char *idstr, int64_t offset, unsigned size)
+{
+    RAMCtx *ram = &ram_ctx;
+    RAMBlock *block;
+    RAMPageRequest *new_entry;
+
+    if (!idstr) {
+        block = ram->last_req_block;
+        if (!block) {
+            error_report("RP-REQ_PAGES: no previous block");
+            return -EINVAL;
+        }
+    } else {
+        block = ram_block_by_idstr(idstr);
+        if (!block) {
+            error_report("RP-REQ_PAGES: cannot find block %s", idstr);
+            return -EINVAL;
+        }
+
+        ram->last_req_block = block;
+    }
+
+    if (!ram_offset_in_block(block, offset)) {
+        error_report("RP-REQ_PAGES: offset 0x%" PRIx64 " out of RAM block %s",
+                     offset, idstr);
+        return -EINVAL;
+    }
+
+    new_entry = g_new0(RAMPageRequest, 1);
+    new_entry->block = block;
+    new_entry->offset = offset;
+    new_entry->size = size;
+
+    qemu_mutex_lock(&ram->page_req_mutex);
+    QSIMPLEQ_INSERT_TAIL(&ram->page_req, new_entry, next);
+    qemu_mutex_unlock(&ram->page_req_mutex);
+
+    return 0;
+}
+
+/* QEMU_VM_COMMAND sub-commands */
+typedef enum VmSubCmd {
+    MIG_CMD_OPEN_RETURN_PATH = 1,
+    MIG_CMD_POSTCOPY_ADVISE = 3,
+    MIG_CMD_POSTCOPY_LISTEN = 4,
+    MIG_CMD_POSTCOPY_RUN = 5,
+    MIG_CMD_POSTCOPY_RAM_DISCARD = 6,
+    MIG_CMD_PACKAGED = 7,
+} VmSubCmd;
+
+/* Return-path message types */
+typedef enum RpMsgType {
+    MIG_RP_MSG_INVALID = 0,
+    MIG_RP_MSG_SHUT = 1,
+    MIG_RP_MSG_REQ_PAGES_ID = 3,
+    MIG_RP_MSG_REQ_PAGES = 4,
+    MIG_RP_MSG_MAX = 7,
+} RpMsgType;
+
+typedef struct RpMsgArgs {
+    int len;
+    const char *name;
+} RpMsgArgs;
+
+/*
+ * Return-path message length/name indexed by message type.
+ * -1 value stands for variable message length.
+ */
+static RpMsgArgs rp_msg_args[] = {
+    [MIG_RP_MSG_INVALID]        = { .len = -1, .name = "INVALID" },
+    [MIG_RP_MSG_SHUT]           = { .len =  4, .name = "SHUT" },
+    [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
+    [MIG_RP_MSG_REQ_PAGES]      = { .len = 12, .name = "REQ_PAGES" },
+    [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
+};
+
+/* Return-path message processing thread */
+static void *rp_listen_thread(void *opaque)
+{
+    StateLoadCtx *s = (StateLoadCtx *) opaque;
+    QEMUFile *f = s->f_rp_fd;
+    int res = 0;
+
+    while (!res) {
+        uint8_t h_buf[512];
+        const int h_max_len = sizeof(h_buf);
+        int h_type;
+        int h_len;
+        size_t count;
+
+        h_type = qemu_get_be16(f);
+        h_len = qemu_get_be16(f);
+
+        /* Make early check for input errors */
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+
+        /* Check message type */
+        if (h_type >= MIG_RP_MSG_MAX || h_type == MIG_RP_MSG_INVALID) {
+            error_report("RP: received invalid message type %d length %d",
+                         h_type, h_len);
+            res = -EINVAL;
+            break;
+        }
+
+        /* Check message length */
+        if (rp_msg_args[h_type].len != -1 && h_len != rp_msg_args[h_type].len) {
+            error_report("RP: received %s message len %d expected %d",
+                         rp_msg_args[h_type].name,
+                         h_len, rp_msg_args[h_type].len);
+            res = -EINVAL;
+            break;
+        } else if (h_len > h_max_len) {
+            error_report("RP: received %s message len %d max_len %d",
+                         rp_msg_args[h_type].name, h_len, h_max_len);
+            res = -EINVAL;
+            break;
+        }
+
+        count = qemu_get_buffer(f, h_buf, h_len);
+        if (count != h_len) {
+            break;
+        }
+
+        switch (h_type) {
+        case MIG_RP_MSG_SHUT:
+        {
+            int shut_error;
+
+            shut_error = be32_to_cpu(*(uint32_t *) h_buf);
+            if (shut_error) {
+                error_report("RP: sibling shutdown, error %d", shut_error);
+            }
+
+            /* Exit processing loop */
+            res = 1;
+            break;
+        }
+
+        case MIG_RP_MSG_REQ_PAGES:
+        case MIG_RP_MSG_REQ_PAGES_ID:
+        {
+            uint64_t offset;
+            uint32_t size;
+            char *id_str = NULL;
+
+            offset = be64_to_cpu(*(uint64_t *) (h_buf + 0));
+            size = be32_to_cpu(*(uint32_t *) (h_buf + 8));
+
+            if (h_type == MIG_RP_MSG_REQ_PAGES_ID) {
+                int h_parsed_len = rp_msg_args[MIG_RP_MSG_REQ_PAGES].len;
+
+                if (h_len > h_parsed_len) {
+                    int id_len;
+
+                    /* RAM block id string */
+                    id_len = h_buf[h_parsed_len];
+                    id_str = (char *) &h_buf[h_parsed_len + 1];
+                    id_str[id_len] = 0;
+
+                    h_parsed_len += id_len + 1;
+                }
+
+                if (h_parsed_len != h_len) {
+                    error_report("RP: received %s message len %d expected %d",
+                                 rp_msg_args[MIG_RP_MSG_REQ_PAGES_ID].name,
+                                 h_len, h_parsed_len);
+                    res = -EINVAL;
+                    break;
+                }
+            }
+
+            res = queue_page_request(id_str, offset, size);
+            break;
+        }
+
+        default:
+            error_report("RP: received unexpected message type %d len %d",
+                         h_type, h_len);
+            res = -EINVAL;
+        }
+    }
+
+    if (res >= 0) {
+        res = qemu_file_get_error(f);
+    }
+    if (res) {
+        error_report("RP: listen thread exit, error %d", res);
+    }
+
+    return NULL;
+}
+
+static void send_command(QEMUFile *f, int cmd, uint16_t len, uint8_t *data)
+{
+    qemu_put_byte(f, QEMU_VM_COMMAND);
+    qemu_put_be16(f, (uint16_t) cmd);
+    qemu_put_be16(f, len);
+
+    qemu_put_buffer_async(f, data, len, false);
+    qemu_fflush(f);
+}
+
+static void send_ram_block_discard(QEMUFile *f, RAMBlock *block)
+{
+    int id_len;
+    int msg_len;
+    uint8_t msg_buf[512];
+
+    id_len = strlen(block->idstr);
+    assert(id_len < 256);
+
+    /* Version, always 0 */
+    msg_buf[0] = 0;
+    /* RAM block ID string length, not including terminating 0 */
+    msg_buf[1] = id_len;
+    /* RAM block ID string with terminating zero */
+    memcpy(msg_buf + 2, block->idstr, (id_len + 1));
+    msg_len = 2 + id_len + 1;
+    /* Discard range offset */
+    stq_be_p(msg_buf + msg_len, block->discard_offset);
+    msg_len += 8;
+    /* Discard range length */
+    stq_be_p(msg_buf + msg_len, (block->length - block->discard_offset));
+    msg_len += 8;
+
+    send_command(f, MIG_CMD_POSTCOPY_RAM_DISCARD, msg_len, msg_buf);
+}
+
+static int send_each_ram_block_discard(QEMUFile *f)
+{
+    RAMBlock *block;
+    int res = 0;
+
+    QSIMPLEQ_FOREACH(block, &ram_ctx.ram_block_list, next) {
+        send_ram_block_discard(f, block);
+
+        res = qemu_file_get_error(f);
+        if (res) {
+            break;
+        }
+    }
+
+    return res;
+}
+
+static int prepare_postcopy(StateLoadCtx *s)
+{
+    QEMUFile *f = s->f_fd;
+    uint64_t tmp[2];
+    int res;
+
+    /* Number of pages to load in precopy before switching to postcopy */
+    ram_ctx.precopy_pages = ram_ctx.normal_pages * s->postcopy_percent / 100;
+
+    /* Send POSTCOPY_ADVISE */
+    tmp[0] = cpu_to_be64(page_size);
+    tmp[1] = cpu_to_be64(page_size);
+    send_command(f, MIG_CMD_POSTCOPY_ADVISE, 16, (uint8_t *) tmp);
+
+    /* Open return path on destination */
+    send_command(f, MIG_CMD_OPEN_RETURN_PATH, 0, NULL);
+
+    /*
+     * Check for file errors after sending POSTCOPY_ADVISE command
+     * since destination may already have closed input pipe in case
+     * postcopy had not been enabled in advance.
+     */
+    res = qemu_file_get_error(f);
+    if (!res) {
+        qemu_thread_create(&s->rp_listen_thread, "rp_thread",
+                           rp_listen_thread, s, QEMU_THREAD_JOINABLE);
+        s->has_rp_listen_thread = true;
+    }
+
+    return res;
+}
+
+static int start_postcopy(StateLoadCtx *s)
+{
+    QIOChannelBuffer *bioc;
+    QEMUFile *fb;
+    int eof_pos;
+    uint32_t length;
+    int res = 0;
+
+    /*
+     * Send RAM discards for each block's unsent part. Without discards,
+     * the userfault_fd code on destination will not trigger page requests
+     * as expected. Also, the UFFDIO_COPY ioctl that is used to place incoming
+     * page in postcopy would give an error if that page has not faulted
+     * with MISSING reason.
+     */
+    res = send_each_ram_block_discard(s->f_fd);
+    if (res) {
+        return res;
+    }
+
+    /*
+     * To perform a switch to postcopy on destination, we need to send
+     * commands and the device state data in the following order:
+     *   * MIG_CMD_POSTCOPY_LISTEN
+     *   * Non-iterable device state sections
+     *   * MIG_CMD_POSTCOPY_RUN
+     *
+     * All this has to be packaged into a single blob using MIG_CMD_PACKAGED
+     * command. While loading the device state we may trigger page transfer
+     * requests and the fd must be free to process those, thus the destination
+     * must read the whole device state off the fd before it starts
+     * processing it. To wrap it up in a package, QEMU buffer channel is used.
+     */
+    bioc = qio_channel_buffer_new(512 * 1024);
+    qio_channel_set_name(QIO_CHANNEL(bioc), "migration-postcopy-buffer");
+    fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
+    object_unref(OBJECT(bioc));
+
+    /* MIG_CMD_POSTCOPY_LISTEN command */
+    send_command(fb, MIG_CMD_POSTCOPY_LISTEN, 0, NULL);
+
+    /* The rest of non-iterable device state with an optional vmdesc section */
+    qemu_fsplice(fb, s->f_vmstate, s->eof_offset - s->device_offset);
+    qemu_fflush(fb);
+
+    /*
+     * vmdesc section may optionally be present at the end of the stream
+     * so we'll try to locate it and truncate the trailer.
+     */
+    eof_pos = bioc->usage - 1;
+
+    for (int offset = (bioc->usage - 11); offset >= 0; offset--) {
+        if (bioc->data[offset] == QEMU_VM_SECTION_FOOTER &&
+                bioc->data[offset + 5] == QEMU_VM_EOF &&
+                bioc->data[offset + 6] == QEMU_VM_VMDESCRIPTION) {
+            uint32_t expected_length = bioc->usage - (offset + 11);
+            uint32_t json_length;
+
+            json_length = be32_to_cpu(*(uint32_t  *) &bioc->data[offset + 7]);
+            if (json_length != expected_length) {
+                error_report("Corrupted vmdesc trailer: length %" PRIu32
+                             " expected %" PRIu32,
+                             json_length, expected_length);
+                res = -EINVAL;
+                goto fail;
+            }
+
+            eof_pos = offset + 5;
+            break;
+        }
+    }
+
+    /*
+     * When switching to postcopy we need to skip QEMU_VM_EOF token which
+     * normally is placed after the last non-iterable device state section
+     * (but before the vmdesc section).
+     *
+     * Skipping QEMU_VM_EOF is required to allow migration process to
+     * continue in postcopy. Vmdesc section also has to be skipped here.
+     */
+    if (eof_pos >= 0 && bioc->data[eof_pos] == QEMU_VM_EOF) {
+        bioc->usage = eof_pos;
+        bioc->offset = eof_pos;
+    }
+
+    /* Finally is the MIG_CMD_POSTCOPY_RUN command */
+    send_command(fb, MIG_CMD_POSTCOPY_RUN, 0, NULL);
+
+    /* Now send that blob */
+    length = cpu_to_be32(bioc->usage);
+    send_command(s->f_fd, MIG_CMD_PACKAGED, sizeof(length), (uint8_t *) &length);
+    qemu_put_buffer_async(s->f_fd, bioc->data, bioc->usage, false);
+    qemu_fflush(s->f_fd);
+
+    /*
+     * Switch to lower setting of in-flight requests limit
+     * to reduce page request latencies.
+     */
+    aio_ring_set_max_inflight(s->aio_ring, AIO_RING_INFLIGHT_LOWLAT);
+
+    s->in_postcopy = true;
+
+fail:
+    qemu_fclose(fb);
+    load_state_check_errors(s, &res);
+
+    return res;
+}
+
+static bool is_postcopy_switchover(StateLoadCtx *s)
+{
+    return ram_ctx.loaded_pages > ram_ctx.precopy_pages;
+}
+
 static void send_section_header_part_end(QEMUFile *f, SectionHandlersEntry *se,
         uint8_t section_type)
 {
@@ -987,10 +1432,13 @@  static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
 
     while ((req = aio_ring_get_request(s->aio_ring))) {
         RAMPage page;
-        unsigned max_length = AIO_TRANSFER_SIZE;
+        unsigned max_length = s->in_postcopy ? AIO_TRANSFER_SIZE_LOWLAT :
+                AIO_TRANSFER_SIZE;
         unsigned length;
+        bool urgent;
 
-        if (!find_next_page(&page)) {
+        urgent = get_queued_page(&page);
+        if (!urgent && !find_next_page(&page)) {
             break;
         }
         
@@ -1003,6 +1451,9 @@  static void coroutine_fn ram_load_submit_aio(StateLoadCtx *s)
         ram->last_page.block = page.block;
         ram->last_page.offset = page.offset + length;
 
+        /* Used by send_ram_block_discard() */
+        page.block->discard_offset = ram->last_page.offset;
+
         /* Setup I/O request */
         req->opaque = s->blk;
         req->data = qemu_blockalign(blk_bs(s->blk), length);
@@ -1284,8 +1735,13 @@  static int load_state_ramlist(StateLoadCtx *s)
 
 static int load_state_complete(StateLoadCtx *s)
 {
-    /* Forward non-iterable device state */
-    qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+    if (!s->in_postcopy) {
+        /* Forward non-iterable device state */
+        qemu_fsplice(s->f_fd, s->f_vmstate, s->eof_offset - s->device_offset);
+    } else {
+        /* Send terminating QEMU_VM_EOF if in postcopy */
+        qemu_put_byte(s->f_fd, QEMU_VM_EOF);
+    }
 
     qemu_fflush(s->f_fd);
 
@@ -1364,10 +1820,22 @@  int coroutine_fn load_state_main(StateLoadCtx *s)
         goto fail;
     }
 
+    if (s->postcopy) {
+        res = prepare_postcopy(s);
+        if (res) {
+            goto fail;
+        }
+    }
+
     do {
         res = ram_load_memory(s);
         /* Check for file errors */
         load_state_check_errors(s, &res);
+
+        /* Switch to postcopy? */
+        if (!res && s->postcopy && !s->in_postcopy && is_postcopy_switchover(s)) {
+            res = start_postcopy(s);
+        }
     } while (!res);
 
     if (res == 1) {
@@ -1390,6 +1858,10 @@  void ram_init_state(void)
 
     /* Initialize RAM block list head */
     QSIMPLEQ_INIT(&ram->ram_block_list);
+
+    /* Initialize postcopy page request queue */
+    qemu_mutex_init(&ram->page_req_mutex);
+    QSIMPLEQ_INIT(&ram->page_req);
 }
 
 /* Destroy snapshot RAM state */
@@ -1403,4 +1875,7 @@  void ram_destroy_state(void)
         g_free(block->bitmap);
         g_free(block);
     }
+
+    /* Destroy page request mutex */
+    qemu_mutex_destroy(&ram_ctx.page_req_mutex);
 }
diff --git a/qemu-snapshot.c b/qemu-snapshot.c
index 92956623f7..29d954c5d6 100644
--- a/qemu-snapshot.c
+++ b/qemu-snapshot.c
@@ -123,6 +123,10 @@  static void destroy_load_context(void)
 {
     StateLoadCtx *s = get_load_context();
 
+    if (s->has_rp_listen_thread) {
+        qemu_thread_join(&s->rp_listen_thread);
+    }
+
     if (s->f_vmstate) {
         qemu_fclose(s->f_vmstate);
     }
@@ -226,12 +230,16 @@  static void coroutine_fn snapshot_load_co(void *opaque)
 {
     StateLoadCtx *s = get_load_context();
     QIOChannel *ioc_fd;
+    QIOChannel *ioc_rp_fd;
     uint8_t *buf;
     size_t count;
     int res = -1;
 
     init_load_context();
 
+    s->postcopy = params.postcopy;
+    s->postcopy_percent = params.postcopy_percent;
+
     /* Block backend */
     s->blk = image_open_opts(params.blk_optstr, params.blk_options,
                              params.blk_flags);
@@ -250,6 +258,14 @@  static void coroutine_fn snapshot_load_co(void *opaque)
     object_unref(OBJECT(ioc_fd));
     qemu_file_set_blocking(s->f_fd, false);
 
+    /* QEMUFile on return path fd if we are going to use postcopy */
+    if (params.postcopy) {
+        ioc_rp_fd = qio_channel_new_fd(params.rp_fd, NULL);
+        qio_channel_set_name(QIO_CHANNEL(ioc_fd), "migration-channel-rp");
+        s->f_rp_fd = qemu_fopen_channel_input(ioc_rp_fd);
+        object_unref(OBJECT(ioc_rp_fd));
+    }
+
     /* Buffer channel to store leading part of migration stream */
     s->ioc_leader = qio_channel_buffer_new(INPLACE_READ_MAX);
     qio_channel_set_name(QIO_CHANNEL(s->ioc_leader), "migration-leader-buffer");