diff mbox

[V2,3/3] colo-compare: introduce packet comparison thread

Message ID 1459326950-17708-4-git-send-email-zhangchen.fnst@cn.fujitsu.com (mailing list archive)
State New, archived
Headers show

Commit Message

Zhang Chen March 30, 2016, 8:35 a.m. UTC
if packets are same, we send primary packet and drop secondary
packet, otherwise notify COLO do checkpoint.

Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
---
 net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 121 insertions(+), 1 deletion(-)

Comments

Dr. David Alan Gilbert March 30, 2016, 11:41 a.m. UTC | #1
* Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
> if packets are same, we send primary packet and drop secondary
> packet, otherwise notify COLO do checkpoint.
> 
> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
> ---
>  net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 121 insertions(+), 1 deletion(-)
> 
> diff --git a/net/colo-compare.c b/net/colo-compare.c
> index 0bb5a51..1debc0e 100644
> --- a/net/colo-compare.c
> +++ b/net/colo-compare.c
> @@ -36,6 +36,7 @@
>  static QTAILQ_HEAD(, CompareState) net_compares =
>         QTAILQ_HEAD_INITIALIZER(net_compares);
>  static ssize_t hashtable_max_size;
> +static int colo_need_checkpoint;
>  
>  typedef struct ReadState {
>      int state; /* 0 = getting length, 1 = getting data */
> @@ -91,6 +92,13 @@ typedef struct CompareState {
>      GQueue unprocessed_connections;
>      /* proxy current hash size */
>      ssize_t hashtable_size;
> +
> +    /* notify compare thread */
> +    QemuEvent event;
> +    /* compare thread, a thread for each NIC */
> +    QemuThread thread;
> +    int thread_status;
> +
>  } CompareState;
>  
>  typedef struct Packet {
> @@ -129,6 +137,15 @@ enum {
>      SECONDARY_IN,
>  };
>  
> +enum {
> +    /* compare thread isn't started */
> +    COMPARE_THREAD_NONE,
> +    /* compare thread is running */
> +    COMPARE_THREAD_RUNNING,
> +    /* compare thread exit */
> +    COMPARE_THREAD_EXIT,
> +};
> +
>  static void packet_destroy(void *opaque, void *user_data);
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>  
> @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data)
>      qemu_mutex_unlock(&conn->list_lock);
>  }
>  
> +static void colo_notify_checkpoint(void)
> +{
> +    colo_need_checkpoint = true;
> +}
> +
> +/* TODO colo_do_checkpoint() {
> + * we flush the connections and reset 'colo_need_checkpoint'
> + * }
> + */
> +
> +static inline void colo_dump_packet(Packet *pkt)
> +{
> +    int i;
> +    for (i = 0; i < pkt->size; i++) {
> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
> +    }
> +    printf("\n");
> +}
> +
> +/*
> + * The IP packets sent by primary and secondary
> + * will be compared in here
> + * TODO support ip fragment, Out-Of-Order
> + * return:    0  means packet same
> + *            > 0 || < 0 means packet different
> + */
> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
> +{
> +    colo_dump_packet(ppkt);
> +    colo_dump_packet(spkt);

Obviously those need to become conditional on something.

> +    if (ppkt->size == spkt->size) {
> +        return memcmp(ppkt->data, spkt->data, spkt->size);
> +    } else {
> +        return -1;
> +    }
> +}
> +
> +static void colo_compare_connection(void *opaque, void *user_data)
> +{
> +    Connection *conn = opaque;
> +    Packet *pkt = NULL;
> +    GList *result = NULL;
> +    int ret;
> +
> +    qemu_mutex_lock(&conn->list_lock);
> +    while (!g_queue_is_empty(&conn->primary_list) &&
> +           !g_queue_is_empty(&conn->secondary_list)) {
> +        pkt = g_queue_pop_head(&conn->primary_list);
> +        result = g_queue_find_custom(&conn->secondary_list,
> +                              pkt, (GCompareFunc)colo_packet_compare);

I think the order of parameters passed to the colo_packet_compare
is the wrong way around - although it doesn't really matter with your current
simple comparison;  https://developer.gnome.org/glib/stable/glib-Double-ended-Queues.html
says that

    'The function takes two gconstpointer arguments, the GQueue element's data as the
     first argument and the given user data as the second argument'

  so that makes the first argument the element out of the secondary_list and
the second argument the 'pkt' that you popped off the primary.

> +
> +        if (result) {
> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
> +            if (ret < 0) {
> +                error_report("colo_send_primary_packet failed");
> +            }
> +            g_queue_remove(&conn->secondary_list, result);
> +        } else {
> +            g_queue_push_head(&conn->primary_list, pkt);
> +            colo_notify_checkpoint();
> +            break;
> +        }
> +    }
> +    qemu_mutex_unlock(&conn->list_lock);
> +}
> +
> +static void *colo_compare_thread(void *opaque)
> +{
> +    CompareState *s = opaque;
> +
> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
> +        qemu_event_wait(&s->event);
> +        qemu_event_reset(&s->event);
> +        qemu_mutex_lock(&s->conn_list_lock);
> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
> +        qemu_mutex_unlock(&s->conn_list_lock);

Interesting; holding the 'conn_list_lock' around the whole of the comparison
is probably quite expensive if you've got a lot of packets coming in then
the lock could be held for most of the time.
I'm not sure of a better solution; maybe use the qemu/rcu_queue.h ?

> +    }
> +
> +    return NULL;
> +}
> +
>  static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>  {
>      int ret = 0;
> @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, PRIMARY_IN)) {
>              error_report("primary: unsupported packet in");
> -            compare_chr_send(s->chr_out, buf, size);
> +            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);

Doesn't that change belong in an earlier patch?

> +        } else {
> +            qemu_event_set(&s->event);

Also these - why are these in this patch?

>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
> @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>      if (ret == 1) {
>          if (packet_enqueue(s, SECONDARY_IN)) {
>              error_report("secondary: unsupported packet in");
> +        } else {
> +            qemu_event_set(&s->event);
>          }
>      } else if (ret == -1) {
>          qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
> @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>  {
>      CompareState *s = COLO_COMPARE(uc);
>      struct sysinfo si;
> +    char thread_name[64];
> +    static int compare_id;
>  
>      if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>          error_setg(errp, "colo compare needs 'primary_in' ,"
> @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>      g_queue_init(&s->conn_list);
>      qemu_mutex_init(&s->conn_list_lock);
>  
> +    colo_need_checkpoint = false;
>      s->hashtable_size = 0;
>      /*
>       * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
> @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>                                                        g_free,
>                                                        connection_destroy);
>  
> +    s->thread_status = COMPARE_THREAD_RUNNING;
> +    sprintf(thread_name, "proxy compare %d", compare_id);

As with my comment from last month; the thread names are limited
to 14 characters on Linux (and most other Unixes) so keep this short;
I use "proxy:%s" and the device name.

> +    qemu_thread_create(&s->thread, thread_name,
> +                       colo_compare_thread, s,
> +                       QEMU_THREAD_JOINABLE);
> +    compare_id++;
> +
>      return;
>  
>  out:
> @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>          QTAILQ_REMOVE(&net_compares, s, next);
>      }
>      qemu_mutex_destroy(&s->conn_list_lock);
> +
> +    if (s->thread.thread) {
> +        s->thread_status = COMPARE_THREAD_EXIT;
> +        qemu_event_set(&s->event);
> +        qemu_thread_join(&s->thread);
> +    }
> +    qemu_event_destroy(&s->event);
>  }
>  
>  static void colo_compare_init(Object *obj)
> -- 
> 1.9.1
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Li Zhijian March 31, 2016, 2:17 a.m. UTC | #2
On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
[...]

>> >@@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>> >      if (ret == 1) {
>> >          if (packet_enqueue(s, PRIMARY_IN)) {
>> >              error_report("primary: unsupported packet in");
>> >-            compare_chr_send(s->chr_out, buf, size);
>> >+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> Doesn't that change belong in an earlier patch?
>
>> >+        } else {
>> >+            qemu_event_set(&s->event);
> Also these - why are these in this patch?
This event is to wakeup comparison thread to do compare.
Do you think we should put event related code to patch 2 ?

Thanks
Li
>
>> >          }
Zhang Chen March 31, 2016, 6 a.m. UTC | #3
On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
> * Zhang Chen (zhangchen.fnst@cn.fujitsu.com) wrote:
>> if packets are same, we send primary packet and drop secondary
>> packet, otherwise notify COLO do checkpoint.
>>
>> Signed-off-by: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
>> Signed-off-by: Li Zhijian <lizhijian@cn.fujitsu.com>
>> Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
>> ---
>>   net/colo-compare.c | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 121 insertions(+), 1 deletion(-)
>>
>> diff --git a/net/colo-compare.c b/net/colo-compare.c
>> index 0bb5a51..1debc0e 100644
>> --- a/net/colo-compare.c
>> +++ b/net/colo-compare.c
>> @@ -36,6 +36,7 @@
>>   static QTAILQ_HEAD(, CompareState) net_compares =
>>          QTAILQ_HEAD_INITIALIZER(net_compares);
>>   static ssize_t hashtable_max_size;
>> +static int colo_need_checkpoint;
>>   
>>   typedef struct ReadState {
>>       int state; /* 0 = getting length, 1 = getting data */
>> @@ -91,6 +92,13 @@ typedef struct CompareState {
>>       GQueue unprocessed_connections;
>>       /* proxy current hash size */
>>       ssize_t hashtable_size;
>> +
>> +    /* notify compare thread */
>> +    QemuEvent event;
>> +    /* compare thread, a thread for each NIC */
>> +    QemuThread thread;
>> +    int thread_status;
>> +
>>   } CompareState;
>>   
>>   typedef struct Packet {
>> @@ -129,6 +137,15 @@ enum {
>>       SECONDARY_IN,
>>   };
>>   
>> +enum {
>> +    /* compare thread isn't started */
>> +    COMPARE_THREAD_NONE,
>> +    /* compare thread is running */
>> +    COMPARE_THREAD_RUNNING,
>> +    /* compare thread exit */
>> +    COMPARE_THREAD_EXIT,
>> +};
>> +
>>   static void packet_destroy(void *opaque, void *user_data);
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
>>   
>> @@ -340,6 +357,88 @@ static inline void colo_flush_connection(void *opaque, void *user_data)
>>       qemu_mutex_unlock(&conn->list_lock);
>>   }
>>   
>> +static void colo_notify_checkpoint(void)
>> +{
>> +    colo_need_checkpoint = true;
>> +}
>> +
>> +/* TODO colo_do_checkpoint() {
>> + * we flush the connections and reset 'colo_need_checkpoint'
>> + * }
>> + */
>> +
>> +static inline void colo_dump_packet(Packet *pkt)
>> +{
>> +    int i;
>> +    for (i = 0; i < pkt->size; i++) {
>> +        printf("%02x ", ((uint8_t *)pkt->data)[i]);
>> +    }
>> +    printf("\n");
>> +}
>> +
>> +/*
>> + * The IP packets sent by primary and secondary
>> + * will be compared in here
>> + * TODO support ip fragment, Out-Of-Order
>> + * return:    0  means packet same
>> + *            > 0 || < 0 means packet different
>> + */
>> +static int colo_packet_compare(Packet *ppkt, Packet *spkt)
>> +{
>> +    colo_dump_packet(ppkt);
>> +    colo_dump_packet(spkt);
> Obviously those need to become conditional on something.

OK, I will add trace in next.

>
>> +    if (ppkt->size == spkt->size) {
>> +        return memcmp(ppkt->data, spkt->data, spkt->size);
>> +    } else {
>> +        return -1;
>> +    }
>> +}
>> +
>> +static void colo_compare_connection(void *opaque, void *user_data)
>> +{
>> +    Connection *conn = opaque;
>> +    Packet *pkt = NULL;
>> +    GList *result = NULL;
>> +    int ret;
>> +
>> +    qemu_mutex_lock(&conn->list_lock);
>> +    while (!g_queue_is_empty(&conn->primary_list) &&
>> +           !g_queue_is_empty(&conn->secondary_list)) {
>> +        pkt = g_queue_pop_head(&conn->primary_list);
>> +        result = g_queue_find_custom(&conn->secondary_list,
>> +                              pkt, (GCompareFunc)colo_packet_compare);
> I think the order of parameters passed to the colo_packet_compare
> is the wrong way around - although it doesn't really matter with your current
> simple comparison;  https://developer.gnome.org/glib/stable/glib-Double-ended-Queues.html
> says that
>
>      'The function takes two gconstpointer arguments, the GQueue element's data as the
>       first argument and the given user data as the second argument'
>
>    so that makes the first argument the element out of the secondary_list and
> the second argument the 'pkt' that you popped off the primary.

OK, thinks a lot. will fix.

>
>> +
>> +        if (result) {
>> +            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
>> +            if (ret < 0) {
>> +                error_report("colo_send_primary_packet failed");
>> +            }
>> +            g_queue_remove(&conn->secondary_list, result);
>> +        } else {
>> +            g_queue_push_head(&conn->primary_list, pkt);
>> +            colo_notify_checkpoint();
>> +            break;
>> +        }
>> +    }
>> +    qemu_mutex_unlock(&conn->list_lock);
>> +}
>> +
>> +static void *colo_compare_thread(void *opaque)
>> +{
>> +    CompareState *s = opaque;
>> +
>> +    while (s->thread_status == COMPARE_THREAD_RUNNING) {
>> +        qemu_event_wait(&s->event);
>> +        qemu_event_reset(&s->event);
>> +        qemu_mutex_lock(&s->conn_list_lock);
>> +        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
>> +        qemu_mutex_unlock(&s->conn_list_lock);
> Interesting; holding the 'conn_list_lock' around the whole of the comparison
> is probably quite expensive if you've got a lot of packets coming in then
> the lock could be held for most of the time.
> I'm not sure of a better solution; maybe use the qemu/rcu_queue.h ?
>
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>>   static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
>>   {
>>       int ret = 0;
>> @@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
>>       if (ret == 1) {
>>           if (packet_enqueue(s, PRIMARY_IN)) {
>>               error_report("primary: unsupported packet in");
>> -            compare_chr_send(s->chr_out, buf, size);
>> +            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> Doesn't that change belong in an earlier patch?

Yes, will fix

>
>> +        } else {
>> +            qemu_event_set(&s->event);
> Also these - why are these in this patch?

will fix

>
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
>> @@ -449,6 +550,8 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
>>       if (ret == 1) {
>>           if (packet_enqueue(s, SECONDARY_IN)) {
>>               error_report("secondary: unsupported packet in");
>> +        } else {
>> +            qemu_event_set(&s->event);
>>           }
>>       } else if (ret == -1) {
>>           qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
>> @@ -504,6 +607,8 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>   {
>>       CompareState *s = COLO_COMPARE(uc);
>>       struct sysinfo si;
>> +    char thread_name[64];
>> +    static int compare_id;
>>   
>>       if (!s->pri_indev || !s->sec_indev || !s->outdev) {
>>           error_setg(errp, "colo compare needs 'primary_in' ,"
>> @@ -552,6 +657,7 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>       g_queue_init(&s->conn_list);
>>       qemu_mutex_init(&s->conn_list_lock);
>>   
>> +    colo_need_checkpoint = false;
>>       s->hashtable_size = 0;
>>       /*
>>        * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
>> @@ -572,6 +678,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
>>                                                         g_free,
>>                                                         connection_destroy);
>>   
>> +    s->thread_status = COMPARE_THREAD_RUNNING;
>> +    sprintf(thread_name, "proxy compare %d", compare_id);
> As with my comment from last month; the thread names are limited
> to 14 characters on Linux (and most other Unixes) so keep this short;
> I use "proxy:%s" and the device name.

OK, I will fix it.

>
>> +    qemu_thread_create(&s->thread, thread_name,
>> +                       colo_compare_thread, s,
>> +                       QEMU_THREAD_JOINABLE);
>> +    compare_id++;
>> +
>>       return;
>>   
>>   out:
>> @@ -615,6 +728,13 @@ static void colo_compare_class_finalize(ObjectClass *oc, void *data)
>>           QTAILQ_REMOVE(&net_compares, s, next);
>>       }
>>       qemu_mutex_destroy(&s->conn_list_lock);
>> +
>> +    if (s->thread.thread) {
>> +        s->thread_status = COMPARE_THREAD_EXIT;
>> +        qemu_event_set(&s->event);
>> +        qemu_thread_join(&s->thread);
>> +    }
>> +    qemu_event_destroy(&s->event);
>>   }
>>   
>>   static void colo_compare_init(Object *obj)
>> -- 
>> 1.9.1
>>
>>
>>
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
>
> .
>
Dr. David Alan Gilbert March 31, 2016, 8:50 a.m. UTC | #4
* Li Zhijian (lizhijian@cn.fujitsu.com) wrote:
> 
> 
> On 03/30/2016 07:41 PM, Dr. David Alan Gilbert wrote:
> [...]
> 
> >>>@@ -433,7 +532,9 @@ static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
> >>>      if (ret == 1) {
> >>>          if (packet_enqueue(s, PRIMARY_IN)) {
> >>>              error_report("primary: unsupported packet in");
> >>>-            compare_chr_send(s->chr_out, buf, size);
> >>>+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
> >Doesn't that change belong in an earlier patch?
> >
> >>>+        } else {
> >>>+            qemu_event_set(&s->event);
> >Also these - why are these in this patch?
> This event is to wakeup comparison thread to do compare.
> Do you think we should put event related code to patch 2 ?

Ah OK; yes the event_set makes sense in this patch.

Dave

> Thanks
> Li
> >
> >>>          }
> 
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/net/colo-compare.c b/net/colo-compare.c
index 0bb5a51..1debc0e 100644
--- a/net/colo-compare.c
+++ b/net/colo-compare.c
@@ -36,6 +36,7 @@ 
 static QTAILQ_HEAD(, CompareState) net_compares =
        QTAILQ_HEAD_INITIALIZER(net_compares);
 static ssize_t hashtable_max_size;
+static int colo_need_checkpoint;
 
 typedef struct ReadState {
     int state; /* 0 = getting length, 1 = getting data */
@@ -91,6 +92,13 @@  typedef struct CompareState {
     GQueue unprocessed_connections;
     /* proxy current hash size */
     ssize_t hashtable_size;
+
+    /* notify compare thread */
+    QemuEvent event;
+    /* compare thread, a thread for each NIC */
+    QemuThread thread;
+    int thread_status;
+
 } CompareState;
 
 typedef struct Packet {
@@ -129,6 +137,15 @@  enum {
     SECONDARY_IN,
 };
 
+enum {
+    /* compare thread isn't started */
+    COMPARE_THREAD_NONE,
+    /* compare thread is running */
+    COMPARE_THREAD_RUNNING,
+    /* compare thread exit */
+    COMPARE_THREAD_EXIT,
+};
+
 static void packet_destroy(void *opaque, void *user_data);
 static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size);
 
@@ -340,6 +357,88 @@  static inline void colo_flush_connection(void *opaque, void *user_data)
     qemu_mutex_unlock(&conn->list_lock);
 }
 
+static void colo_notify_checkpoint(void)
+{
+    colo_need_checkpoint = true;
+}
+
+/* TODO colo_do_checkpoint() {
+ * we flush the connections and reset 'colo_need_checkpoint'
+ * }
+ */
+
+static inline void colo_dump_packet(Packet *pkt)
+{
+    int i;
+    for (i = 0; i < pkt->size; i++) {
+        printf("%02x ", ((uint8_t *)pkt->data)[i]);
+    }
+    printf("\n");
+}
+
+/*
+ * The IP packets sent by primary and secondary
+ * will be compared in here
+ * TODO support ip fragment, Out-Of-Order
+ * return:    0  means packet same
+ *            > 0 || < 0 means packet different
+ */
+static int colo_packet_compare(Packet *ppkt, Packet *spkt)
+{
+    colo_dump_packet(ppkt);
+    colo_dump_packet(spkt);
+
+    if (ppkt->size == spkt->size) {
+        return memcmp(ppkt->data, spkt->data, spkt->size);
+    } else {
+        return -1;
+    }
+}
+
+static void colo_compare_connection(void *opaque, void *user_data)
+{
+    Connection *conn = opaque;
+    Packet *pkt = NULL;
+    GList *result = NULL;
+    int ret;
+
+    qemu_mutex_lock(&conn->list_lock);
+    while (!g_queue_is_empty(&conn->primary_list) &&
+           !g_queue_is_empty(&conn->secondary_list)) {
+        pkt = g_queue_pop_head(&conn->primary_list);
+        result = g_queue_find_custom(&conn->secondary_list,
+                              pkt, (GCompareFunc)colo_packet_compare);
+
+        if (result) {
+            ret = compare_chr_send(pkt->s->chr_out, pkt->data, pkt->size);
+            if (ret < 0) {
+                error_report("colo_send_primary_packet failed");
+            }
+            g_queue_remove(&conn->secondary_list, result);
+        } else {
+            g_queue_push_head(&conn->primary_list, pkt);
+            colo_notify_checkpoint();
+            break;
+        }
+    }
+    qemu_mutex_unlock(&conn->list_lock);
+}
+
+static void *colo_compare_thread(void *opaque)
+{
+    CompareState *s = opaque;
+
+    while (s->thread_status == COMPARE_THREAD_RUNNING) {
+        qemu_event_wait(&s->event);
+        qemu_event_reset(&s->event);
+        qemu_mutex_lock(&s->conn_list_lock);
+        g_queue_foreach(&s->conn_list, colo_compare_connection, NULL);
+        qemu_mutex_unlock(&s->conn_list_lock);
+    }
+
+    return NULL;
+}
+
 static int compare_chr_send(CharDriverState *out, const uint8_t *buf, int size)
 {
     int ret = 0;
@@ -433,7 +532,9 @@  static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
     if (ret == 1) {
         if (packet_enqueue(s, PRIMARY_IN)) {
             error_report("primary: unsupported packet in");
-            compare_chr_send(s->chr_out, buf, size);
+            compare_chr_send(s->chr_out, s->pri_rs.buf, s->pri_rs.packet_len);
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_pri_in, NULL, NULL, NULL, NULL);
@@ -449,6 +550,8 @@  static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
     if (ret == 1) {
         if (packet_enqueue(s, SECONDARY_IN)) {
             error_report("secondary: unsupported packet in");
+        } else {
+            qemu_event_set(&s->event);
         }
     } else if (ret == -1) {
         qemu_chr_add_handlers(s->chr_sec_in, NULL, NULL, NULL, NULL);
@@ -504,6 +607,8 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
 {
     CompareState *s = COLO_COMPARE(uc);
     struct sysinfo si;
+    char thread_name[64];
+    static int compare_id;
 
     if (!s->pri_indev || !s->sec_indev || !s->outdev) {
         error_setg(errp, "colo compare needs 'primary_in' ,"
@@ -552,6 +657,7 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
     g_queue_init(&s->conn_list);
     qemu_mutex_init(&s->conn_list_lock);
 
+    colo_need_checkpoint = false;
     s->hashtable_size = 0;
     /*
      * Idea from kernel tcp.c: use 1/16384 of memory.  On i386: 32MB
@@ -572,6 +678,13 @@  static void colo_compare_complete(UserCreatable *uc, Error **errp)
                                                       g_free,
                                                       connection_destroy);
 
+    s->thread_status = COMPARE_THREAD_RUNNING;
+    sprintf(thread_name, "proxy compare %d", compare_id);
+    qemu_thread_create(&s->thread, thread_name,
+                       colo_compare_thread, s,
+                       QEMU_THREAD_JOINABLE);
+    compare_id++;
+
     return;
 
 out:
@@ -615,6 +728,13 @@  static void colo_compare_class_finalize(ObjectClass *oc, void *data)
         QTAILQ_REMOVE(&net_compares, s, next);
     }
     qemu_mutex_destroy(&s->conn_list_lock);
+
+    if (s->thread.thread) {
+        s->thread_status = COMPARE_THREAD_EXIT;
+        qemu_event_set(&s->event);
+        qemu_thread_join(&s->thread);
+    }
+    qemu_event_destroy(&s->event);
 }
 
 static void colo_compare_init(Object *obj)