diff mbox

[RFC,v3,1/4] separate thread for VM migration

Message ID 6ac256e1f481ea28678bae846a13714302f258db.1313076455.git.udeshpan@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Umesh Deshpande Aug. 11, 2011, 3:32 p.m. UTC
This patch creates a separate thread for the guest migration on the source side.
migrate_cancel request from the iothread is handled asynchronously. That is,
iothread submits migrate_cancel to the migration thread and returns, while the
migration thread attends this request at the next iteration to terminate its
execution.

Signed-off-by: Umesh Deshpande <udeshpan@redhat.com>
---
 buffered_file.c |   85 ++++++++++++++++++++++++++++++++----------------------
 buffered_file.h |    4 ++
 migration.c     |   49 ++++++++++++++-----------------
 migration.h     |    6 ++++
 4 files changed, 82 insertions(+), 62 deletions(-)

Comments

Paolo Bonzini Aug. 11, 2011, 4:18 p.m. UTC | #1
On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
> This patch creates a separate thread for the guest migration on the source side.
> migrate_cancel request from the iothread is handled asynchronously. That is,
> iothread submits migrate_cancel to the migration thread and returns, while the
> migration thread attends this request at the next iteration to terminate its
> execution.

Looks pretty good!  I hope you agree. :)  Just one note inside.

> Signed-off-by: Umesh Deshpande<udeshpan@redhat.com>
> ---
>   buffered_file.c |   85 ++++++++++++++++++++++++++++++++----------------------
>   buffered_file.h |    4 ++
>   migration.c     |   49 ++++++++++++++-----------------
>   migration.h     |    6 ++++
>   4 files changed, 82 insertions(+), 62 deletions(-)
>
> diff --git a/buffered_file.c b/buffered_file.c
> index 41b42c3..19932b6 100644
> --- a/buffered_file.c
> +++ b/buffered_file.c
> @@ -16,6 +16,8 @@
>   #include "qemu-timer.h"
>   #include "qemu-char.h"
>   #include "buffered_file.h"
> +#include "migration.h"
> +#include "qemu-thread.h"
>
>   //#define DEBUG_BUFFERED_FILE
>
> @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
>       void *opaque;
>       QEMUFile *file;
>       int has_error;
> +    int closed;
>       int freeze_output;
>       size_t bytes_xfer;
>       size_t xfer_limit;
>       uint8_t *buffer;
>       size_t buffer_size;
>       size_t buffer_capacity;
> -    QEMUTimer *timer;
> +    QemuThread thread;
>   } QEMUFileBuffered;
>
>   #ifdef DEBUG_BUFFERED_FILE
> @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
>           offset = size;
>       }
>
> -    if (pos == 0&&  size == 0) {
> -        DPRINTF("file is ready\n");
> -        if (s->bytes_xfer<= s->xfer_limit) {
> -            DPRINTF("notifying client\n");
> -            s->put_ready(s->opaque);
> -        }
> -    }
> -
>       return offset;
>   }
>
> @@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
>
>       while (!s->has_error&&  s->buffer_size) {
>           buffered_flush(s);
> -        if (s->freeze_output)
> +        if (s->freeze_output) {
>               s->wait_for_unfreeze(s);
> +        }
>       }

This is racy; you might end up calling buffered_put_buffer twice from 
two different threads.

> -    ret = s->close(s->opaque);
> +    s->closed = 1;
>
> -    qemu_del_timer(s->timer);
> -    qemu_free_timer(s->timer);
> +    ret = s->close(s->opaque);
>       qemu_free(s->buffer);
> -    qemu_free(s);

... similarly, here the migration thread might end up using the buffer. 
  Just set s->closed here and wait for thread completion; the migration 
thread can handle the flushes free the buffer etc.  Let the migration 
thread do as much as possible, it will simplify your life.

>       return ret;
>   }
>
> +
>   static int buffered_rate_limit(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
>       return s->xfer_limit;
>   }
>
> -static void buffered_rate_tick(void *opaque)
> +static void *migrate_vm(void *opaque)
>   {
>       QEMUFileBuffered *s = opaque;
> +    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
>
> -    if (s->has_error) {
> -        buffered_close(s);
> -        return;
> -    }
> +    qemu_mutex_lock_iothread();
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    while (!s->closed) {

... This can be in fact

     while (!s->closed || s->buffered_size)

and that alone will subsume the loop in buffered_close, no?

> +        if (s->freeze_output) {
> +            s->wait_for_unfreeze(s);
> +            s->freeze_output = 0;
> +            continue;
> +        }
>
> -    if (s->freeze_output)
> -        return;
> +        if (s->has_error) {
> +            break;
> +        }
> +
> +        current_time = qemu_get_clock_ms(rt_clock);
> +        if (!s->closed&&  (expire_time>  current_time)) {
> +            tv.tv_usec = 1000 * (expire_time - current_time);
> +            select(0, NULL, NULL, NULL,&tv);
> +            continue;
> +        }
>
> -    s->bytes_xfer = 0;
> +        s->bytes_xfer = 0;
> +        buffered_flush(s);
>
> -    buffered_flush(s);
> +        expire_time = qemu_get_clock_ms(rt_clock) + 100;
> +        s->put_ready(s->opaque);
> +    }
>
> -    /* Add some checks around this */
> -    s->put_ready(s->opaque);
> +    if (s->has_error) {
> +        buffered_close(s);
> +    }
> +    qemu_free(s);
> +
> +    qemu_mutex_unlock_iothread();
> +
> +    return NULL;
>   }
>
>   QEMUFile *qemu_fopen_ops_buffered(void *opaque,
> -                                  size_t bytes_per_sec,
> -                                  BufferedPutFunc *put_buffer,
> -                                  BufferedPutReadyFunc *put_ready,
> -                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> -                                  BufferedCloseFunc *close)
> +        size_t bytes_per_sec,
> +        BufferedPutFunc *put_buffer,
> +        BufferedPutReadyFunc *put_ready,
> +        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
> +        BufferedCloseFunc *close)
>   {
>       QEMUFileBuffered *s;
>
> @@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
>       s->put_ready = put_ready;
>       s->wait_for_unfreeze = wait_for_unfreeze;
>       s->close = close;
> +    s->closed = 0;
>
>       s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
>                                buffered_close, buffered_rate_limit,
>                                buffered_set_rate_limit,
> -			     buffered_get_rate_limit);
> -
> -    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
> +                             buffered_get_rate_limit);
>
> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
> +    qemu_thread_create(&s->thread, migrate_vm, s);
>
>       return s->file;
>   }
> diff --git a/buffered_file.h b/buffered_file.h
> index 98d358b..477bf7c 100644
> --- a/buffered_file.h
> +++ b/buffered_file.h
> @@ -17,9 +17,13 @@
>   #include "hw/hw.h"
>
>   typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
> +typedef void (BufferedBeginFunc)(void *opaque);

Unused typedef.

>   typedef void (BufferedPutReadyFunc)(void *opaque);
>   typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
>   typedef int (BufferedCloseFunc)(void *opaque);
> +typedef void (BufferedWaitForCancelFunc)(void *opaque);
> +
> +void wait_for_cancel(void *opaque);

BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.

>   QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
>                                     BufferedPutFunc *put_buffer,
> diff --git a/migration.c b/migration.c
> index af3a1f2..d8a0abb 100644
> --- a/migration.c
> +++ b/migration.c
> @@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>   {
>       int ret = 0;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -
>       if (s->file) {
>           DPRINTF("closing file\n");
>           if (qemu_fclose(s->file) != 0) {
> @@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
>       return ret;
>   }
>
> -void migrate_fd_put_notify(void *opaque)
> -{
> -    FdMigrationState *s = opaque;
> -
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
> -    qemu_file_put_notify(s->file);
> -}
> -

qemu_file_put_notify is also unused now.

>   ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>   {
>       FdMigrationState *s = opaque;
> @@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>       if (ret == -1)
>           ret = -(s->get_error(s));
>
> -    if (ret == -EAGAIN) {
> -        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
> -    } else if (ret<  0) {
> +    if (ret<  0&&  ret != -EAGAIN) {
>           if (s->mon) {
>               monitor_resume(s->mon);
>           }
> @@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
>
>   void migrate_fd_connect(FdMigrationState *s)
>   {
> -    int ret;
> -
> +    s->begin = 1;
>       s->file = qemu_fopen_ops_buffered(s,
>                                         s->bandwidth_limit,
>                                         migrate_fd_put_buffer,
>                                         migrate_fd_put_ready,
>                                         migrate_fd_wait_for_unfreeze,
>                                         migrate_fd_close);
> -
> -    DPRINTF("beginning savevm\n");
> -    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> -                                  s->mig_state.shared);
> -    if (ret<  0) {
> -        DPRINTF("failed, %d\n", ret);
> -        migrate_fd_error(s);
> -        return;
> -    }
> -
> -    migrate_fd_put_ready(s);
>   }
>
>   void migrate_fd_put_ready(void *opaque)
>   {
>       FdMigrationState *s = opaque;
> +    int ret;
>
>       if (s->state != MIG_STATE_ACTIVE) {
>           DPRINTF("put_ready returning because of non-active state\n");
> +        if (s->state == MIG_STATE_CANCELLED) {
> +            migrate_fd_terminate(s);
> +        }
>           return;
>       }
>
> +    if (s->begin) {
> +        DPRINTF("beginning savevm\n");
> +        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
> +                s->mig_state.shared);
> +        if (ret<  0) {
> +            DPRINTF("failed, %d\n", ret);
> +            migrate_fd_error(s);
> +            return;
> +        }
> +        s->begin = 0;
> +    }
> +
>       DPRINTF("iterate\n");
>       if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
>           int state;
> @@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
>       DPRINTF("cancelling migration\n");
>
>       s->state = MIG_STATE_CANCELLED;
> +}
> +
> +void migrate_fd_terminate(FdMigrationState *s)
> +{
>       notifier_list_notify(&migration_state_notifiers);
>       qemu_savevm_state_cancel(s->mon, s->file);
>
> @@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
>   {
>       FdMigrationState *s = opaque;
>
> -    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
>       return s->close(s);
>   }
>
> diff --git a/migration.h b/migration.h
> index 050c56c..887f84c 100644
> --- a/migration.h
> +++ b/migration.h
> @@ -45,9 +45,11 @@ struct FdMigrationState
>       int fd;
>       Monitor *mon;
>       int state;
> +    int begin;
>       int (*get_error)(struct FdMigrationState*);
>       int (*close)(struct FdMigrationState*);
>       int (*write)(struct FdMigrationState*, const void *, size_t);
> +    void (*callback)(void *);
>       void *opaque;
>   };
>
> @@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
>
>   void migrate_fd_connect(FdMigrationState *s);
>
> +void migrate_fd_begin(void *opaque);
> +
>   void migrate_fd_put_ready(void *opaque);
>
>   int migrate_fd_get_status(MigrationState *mig_state);
>
>   void migrate_fd_cancel(MigrationState *mig_state);
>
> +void migrate_fd_terminate(FdMigrationState *s);
> +
>   void migrate_fd_release(MigrationState *mig_state);
>
>   void migrate_fd_wait_for_unfreeze(void *opaque);

--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Umesh Deshpande Aug. 11, 2011, 5:36 p.m. UTC | #2
On 08/11/2011 12:18 PM, Paolo Bonzini wrote:
>> @@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
>>
>>       while (!s->has_error&&  s->buffer_size) {
>>           buffered_flush(s);
>> -        if (s->freeze_output)
>> +        if (s->freeze_output) {
>>               s->wait_for_unfreeze(s);
>> +        }
>>       }
>
> This is racy; you might end up calling buffered_put_buffer twice from 
> two different threads.
Now, migrate_fd_cleanup, buffured_close is just executed by the 
migration thread.
I am not letting iothread call any migration cancellation related 
functions. In stead it just submits the request and waits for the 
migration thread to terminate itself in the next iteration.
The reason is to avoid the call to qemu_fflush,  
qemu_savevm_state_cancel (to carry out migrate_cancel) from iothread 
while migration thread is transferring data without holding the locks.

>
>> -    ret = s->close(s->opaque);
>> +    s->closed = 1;
>>
>> -    qemu_del_timer(s->timer);
>> -    qemu_free_timer(s->timer);
>> +    ret = s->close(s->opaque);
>>       qemu_free(s->buffer);
>> -    qemu_free(s);
>
> ... similarly, here the migration thread might end up using the 
> buffer.  Just set s->closed here and wait for thread completion; the 
> migration thread can handle the flushes free the buffer etc.  Let the 
> migration thread do as much as possible, it will simplify your life.
>
>>       return ret;
>>   }
>>
>> +
>>   static int buffered_rate_limit(void *opaque)
>>   {
>>       QEMUFileBuffered *s = opaque;
>> @@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void 
>> *opaque)
>>       return s->xfer_limit;
>>   }
>>
>> -static void buffered_rate_tick(void *opaque)
>> +static void *migrate_vm(void *opaque)
>>   {
>>       QEMUFileBuffered *s = opaque;
>> +    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) 
>> + 100;
>> +    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
>>
>> -    if (s->has_error) {
>> -        buffered_close(s);
>> -        return;
>> -    }
>> +    qemu_mutex_lock_iothread();
>>
>> -    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
>> +    while (!s->closed) {
>
> ... This can be in fact
>
>     while (!s->closed || s->buffered_size)
>
> and that alone will subsume the loop in buffered_close, no?
s->fd is closed in migrate_fd_cleanup (which calls buffered_close). So I 
flush the buffer in buffered_close before closing the descriptor, and 
then migration thread simply exits because s->closed is set.

- Umesh
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Paolo Bonzini Aug. 12, 2011, 6:40 a.m. UTC | #3
On 08/11/2011 07:36 PM, Umesh Deshpande wrote:
>>
> Now, migrate_fd_cleanup, buffured_close is just executed by the
> migration thread.
> I am not letting iothread call any migration cancellation related
> functions. In stead it just submits the request and waits for the
> migration thread to terminate itself in the next iteration.
> The reason is to avoid the call to qemu_fflush,
> qemu_savevm_state_cancel (to carry out migrate_cancel) from iothread
> while migration thread is transferring data without holding the locks.

Got it now, thanks for explaining.

Paolo
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..19932b6 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,6 +16,8 @@ 
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"
 
 //#define DEBUG_BUFFERED_FILE
 
@@ -28,13 +30,14 @@  typedef struct QEMUFileBuffered
     void *opaque;
     QEMUFile *file;
     int has_error;
+    int closed;
     int freeze_output;
     size_t bytes_xfer;
     size_t xfer_limit;
     uint8_t *buffer;
     size_t buffer_size;
     size_t buffer_capacity;
-    QEMUTimer *timer;
+    QemuThread thread;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@  static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in
         offset = size;
     }
 
-    if (pos == 0 && size == 0) {
-        DPRINTF("file is ready\n");
-        if (s->bytes_xfer <= s->xfer_limit) {
-            DPRINTF("notifying client\n");
-            s->put_ready(s->opaque);
-        }
-    }
-
     return offset;
 }
 
@@ -175,20 +170,20 @@  static int buffered_close(void *opaque)
 
     while (!s->has_error && s->buffer_size) {
         buffered_flush(s);
-        if (s->freeze_output)
+        if (s->freeze_output) {
             s->wait_for_unfreeze(s);
+        }
     }
 
-    ret = s->close(s->opaque);
+    s->closed = 1;
 
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
+    ret = s->close(s->opaque);
     qemu_free(s->buffer);
-    qemu_free(s);
 
     return ret;
 }
 
+
 static int buffered_rate_limit(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
@@ -228,34 +223,55 @@  static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }
 
-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
+    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
 
-    if (s->has_error) {
-        buffered_close(s);
-        return;
-    }
+    qemu_mutex_lock_iothread();
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    while (!s->closed) {
+        if (s->freeze_output) {
+            s->wait_for_unfreeze(s);
+            s->freeze_output = 0;
+            continue;
+        }
 
-    if (s->freeze_output)
-        return;
+        if (s->has_error) {
+            break;
+        }
+
+        current_time = qemu_get_clock_ms(rt_clock);
+        if (!s->closed && (expire_time > current_time)) {
+            tv.tv_usec = 1000 * (expire_time - current_time);
+            select(0, NULL, NULL, NULL, &tv);
+            continue;
+        }
 
-    s->bytes_xfer = 0;
+        s->bytes_xfer = 0;
+        buffered_flush(s);
 
-    buffered_flush(s);
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        s->put_ready(s->opaque);
+    }
 
-    /* Add some checks around this */
-    s->put_ready(s->opaque);
+    if (s->has_error) {
+        buffered_close(s);
+    }
+    qemu_free(s);
+
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
 }
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
-                                  size_t bytes_per_sec,
-                                  BufferedPutFunc *put_buffer,
-                                  BufferedPutReadyFunc *put_ready,
-                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
-                                  BufferedCloseFunc *close)
+        size_t bytes_per_sec,
+        BufferedPutFunc *put_buffer,
+        BufferedPutReadyFunc *put_ready,
+        BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
+        BufferedCloseFunc *close)
 {
     QEMUFileBuffered *s;
 
@@ -267,15 +283,14 @@  QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->put_ready = put_ready;
     s->wait_for_unfreeze = wait_for_unfreeze;
     s->close = close;
+    s->closed = 0;
 
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-			     buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);
 
     return s->file;
 }
diff --git a/buffered_file.h b/buffered_file.h
index 98d358b..477bf7c 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -17,9 +17,13 @@ 
 #include "hw/hw.h"
 
 typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t size);
+typedef void (BufferedBeginFunc)(void *opaque);
 typedef void (BufferedPutReadyFunc)(void *opaque);
 typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
 typedef int (BufferedCloseFunc)(void *opaque);
+typedef void (BufferedWaitForCancelFunc)(void *opaque);
+
+void wait_for_cancel(void *opaque);
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
                                   BufferedPutFunc *put_buffer,
diff --git a/migration.c b/migration.c
index af3a1f2..d8a0abb 100644
--- a/migration.c
+++ b/migration.c
@@ -284,8 +284,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -307,14 +305,6 @@  int migrate_fd_cleanup(FdMigrationState *s)
     return ret;
 }
 
-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -327,9 +317,7 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
     if (ret == -1)
         ret = -(s->get_error(s));
 
-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret < 0) {
+    if (ret < 0 && ret != -EAGAIN) {
         if (s->mon) {
             monitor_resume(s->mon);
         }
@@ -342,36 +330,40 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 
 void migrate_fd_connect(FdMigrationState *s)
 {
-    int ret;
-
+    s->begin = 1;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
                                       migrate_fd_put_ready,
                                       migrate_fd_wait_for_unfreeze,
                                       migrate_fd_close);
-
-    DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-    
-    migrate_fd_put_ready(s);
 }
 
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
+    int ret;
 
     if (s->state != MIG_STATE_ACTIVE) {
         DPRINTF("put_ready returning because of non-active state\n");
+        if (s->state == MIG_STATE_CANCELLED) {
+            migrate_fd_terminate(s);
+        }
         return;
     }
 
+    if (s->begin) {
+        DPRINTF("beginning savevm\n");
+        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+                s->mig_state.shared);
+        if (ret < 0) {
+            DPRINTF("failed, %d\n", ret);
+            migrate_fd_error(s);
+            return;
+        }
+        s->begin = 0;
+    }
+
     DPRINTF("iterate\n");
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
         int state;
@@ -415,6 +407,10 @@  void migrate_fd_cancel(MigrationState *mig_state)
     DPRINTF("cancelling migration\n");
 
     s->state = MIG_STATE_CANCELLED;
+}
+
+void migrate_fd_terminate(FdMigrationState *s)
+{
     notifier_list_notify(&migration_state_notifiers);
     qemu_savevm_state_cancel(s->mon, s->file);
 
@@ -458,7 +454,6 @@  int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..887f84c 100644
--- a/migration.h
+++ b/migration.h
@@ -45,9 +45,11 @@  struct FdMigrationState
     int fd;
     Monitor *mon;
     int state;
+    int begin;
     int (*get_error)(struct FdMigrationState*);
     int (*close)(struct FdMigrationState*);
     int (*write)(struct FdMigrationState*, const void *, size_t);
+    void (*callback)(void *);
     void *opaque;
 };
 
@@ -118,12 +120,16 @@  ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
 
 void migrate_fd_connect(FdMigrationState *s);
 
+void migrate_fd_begin(void *opaque);
+
 void migrate_fd_put_ready(void *opaque);
 
 int migrate_fd_get_status(MigrationState *mig_state);
 
 void migrate_fd_cancel(MigrationState *mig_state);
 
+void migrate_fd_terminate(FdMigrationState *s);
+
 void migrate_fd_release(MigrationState *mig_state);
 
 void migrate_fd_wait_for_unfreeze(void *opaque);