diff mbox series

[v4,17/18] migration/rdma: send data for both rdma-pin-all and NOT rdma-pin-all mode

Message ID 1612339311-114805-18-git-send-email-zhengchuan@huawei.com (mailing list archive)
State New, archived
Headers show
Series Support Multifd for RDMA migration | expand

Commit Message

Zheng Chuan Feb. 3, 2021, 8:01 a.m. UTC
Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
---
 migration/rdma.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 61 insertions(+), 4 deletions(-)

Comments

Dr. David Alan Gilbert Feb. 4, 2021, 10:18 a.m. UTC | #1
* Chuan Zheng (zhengchuan@huawei.com) wrote:
> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
> ---
>  migration/rdma.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  1 file changed, 61 insertions(+), 4 deletions(-)
> 
> diff --git a/migration/rdma.c b/migration/rdma.c
> index 2097839..c19a91f 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -2002,6 +2002,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
>                                 .repeat = 1,
>                               };
>  
> +    /* use multifd to send data */
> +    if (migrate_use_multifd()) {
> +        int channel = get_multifd_RDMA_channel();
> +        int ret = 0;
> +        MultiFDSendParams *multifd_send_param = NULL;
> +        ret = get_multifd_send_param(channel, &multifd_send_param);
> +        if (ret) {
> +            error_report("rdma: error getting multifd_send_param(%d)", channel);
> +            return -EINVAL;
> +        }
> +        rdma = (RDMAContext *)multifd_send_param->rdma;
> +        block = &(rdma->local_ram_blocks.block[current_index]);
> +    }
> +
>  retry:
>      sge.addr = (uintptr_t)(block->local_host_addr +
>                              (current_addr - block->offset));
> @@ -2197,6 +2211,27 @@ retry:
>      return 0;
>  }
>  
> +static int multifd_rdma_write_flush(void)
> +{
> +    /* The multifd RDMA threads send data */
> +    MultiFDSendParams *multifd_send_param = NULL;
> +    RDMAContext *rdma = NULL;
> +    MigrationState *s = migrate_get_current();
> +    int ret = 0;
> +
> +    ret = get_multifd_send_param(s->rdma_channel,
> +                                 &multifd_send_param);
> +    if (ret) {
> +        error_report("rdma: error getting multifd_send_param(%d)",
> +                     s->rdma_channel);

Do we need these error_report's for get_multifd_send_param calls - how
can they fail in practice?

> +        return ret;
> +    }
> +    rdma = (RDMAContext *)(multifd_send_param->rdma);
> +    rdma->nb_sent++;
> +
> +    return ret;

But this doesn't actually 'flush' anything?

> +}
> +
>  /*
>   * Push out any unwritten RDMA operations.
>   *
> @@ -2219,8 +2254,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
>      }
>  
>      if (ret == 0) {
> -        rdma->nb_sent++;
> -        trace_qemu_rdma_write_flush(rdma->nb_sent);
> +        if (migrate_use_multifd()) {
> +            ret = multifd_rdma_write_flush();
> +            if (ret) {
> +                return ret;
> +            }
> +        } else {
> +            rdma->nb_sent++;
> +            trace_qemu_rdma_write_flush(rdma->nb_sent);
> +        }
>      }
>  
>      rdma->current_length = 0;
> @@ -4062,6 +4104,7 @@ wait_reg_complete:
>              }
>  
>              qemu_sem_post(&multifd_send_param->sem_sync);
> +            qemu_sem_wait(&multifd_send_param->sem);

why?

>          }
>      }
>  
> @@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque)
>      Error *local_err = NULL;
>      int ret = 0;
>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
> +    RDMAContext *rdma = p->rdma;
>  
>      trace_multifd_send_thread_start(p->id);
>      if (multifd_send_initial_packet(p, &local_err) < 0) {
> @@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque)
>  
>      /* wait for semaphore notification to register memory */
>      qemu_sem_wait(&p->sem_sync);
> -    if (qemu_rdma_registration(p->rdma) < 0) {
> +    if (qemu_rdma_registration(rdma) < 0) {
>          goto out;
>      }
>      /*
> @@ -4466,12 +4510,25 @@ static void *multifd_rdma_send_thread(void *opaque)
>                  break;
>              }
>          }
> +        /* To complete polling(CQE) */
> +        while (rdma->nb_sent) {

Where is nb_sent decremented?

> +            ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
> +            if (ret < 0) {
> +                error_report("multifd RDMA migration: "
> +                             "complete polling error!");
> +                return NULL;
> +            }
> +        }
>          /* Send FINISHED to the destination */
>          head.type = RDMA_CONTROL_REGISTER_FINISHED;
> -        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
> +        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
>          if (ret < 0) {
> +            error_report("multifd RDMA migration: "
> +                         "sending remote error!");
>              return NULL;
>          }
> +        /* sync main thread */
> +        qemu_sem_post(&p->sem);
>      }
>  
>  out:
> -- 
> 1.8.3.1
>
Zheng Chuan March 6, 2021, 8:45 a.m. UTC | #2
On 2021/2/4 18:18, Dr. David Alan Gilbert wrote:
> * Chuan Zheng (zhengchuan@huawei.com) wrote:
>> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
>> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
>> ---
>>  migration/rdma.c | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
>>  1 file changed, 61 insertions(+), 4 deletions(-)
>>
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index 2097839..c19a91f 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -2002,6 +2002,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
>>                                 .repeat = 1,
>>                               };
>>  
>> +    /* use multifd to send data */
>> +    if (migrate_use_multifd()) {
>> +        int channel = get_multifd_RDMA_channel();
>> +        int ret = 0;
>> +        MultiFDSendParams *multifd_send_param = NULL;
>> +        ret = get_multifd_send_param(channel, &multifd_send_param);
>> +        if (ret) {
>> +            error_report("rdma: error getting multifd_send_param(%d)", channel);
>> +            return -EINVAL;
>> +        }
>> +        rdma = (RDMAContext *)multifd_send_param->rdma;
>> +        block = &(rdma->local_ram_blocks.block[current_index]);
>> +    }
>> +
>>  retry:
>>      sge.addr = (uintptr_t)(block->local_host_addr +
>>                              (current_addr - block->offset));
>> @@ -2197,6 +2211,27 @@ retry:
>>      return 0;
>>  }
>>  
>> +static int multifd_rdma_write_flush(void)
>> +{
>> +    /* The multifd RDMA threads send data */
>> +    MultiFDSendParams *multifd_send_param = NULL;
>> +    RDMAContext *rdma = NULL;
>> +    MigrationState *s = migrate_get_current();
>> +    int ret = 0;
>> +
>> +    ret = get_multifd_send_param(s->rdma_channel,
>> +                                 &multifd_send_param);
>> +    if (ret) {
>> +        error_report("rdma: error getting multifd_send_param(%d)",
>> +                     s->rdma_channel);
> 
> Do we need these error_report's for get_multifd_send_param calls - how
> can they fail in practice?
> 
Maybe we do not need it.
The s->rdma_channel should not exceed the migrate_multifd_channels and should not negative.

>> +        return ret;
>> +    }
>> +    rdma = (RDMAContext *)(multifd_send_param->rdma);
>> +    rdma->nb_sent++;
>> +
>> +    return ret;
> 
> But this doesn't actually 'flush' anything?
> 
Yes, it just use to increase the nb_sent. we need to choose a more suitable function name.

>> +}
>> +
>>  /*
>>   * Push out any unwritten RDMA operations.
>>   *
>> @@ -2219,8 +2254,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
>>      }
>>  
>>      if (ret == 0) {
>> -        rdma->nb_sent++;
>> -        trace_qemu_rdma_write_flush(rdma->nb_sent);
>> +        if (migrate_use_multifd()) {
>> +            ret = multifd_rdma_write_flush();
>> +            if (ret) {
>> +                return ret;
>> +            }
>> +        } else {
>> +            rdma->nb_sent++;
>> +            trace_qemu_rdma_write_flush(rdma->nb_sent);
>> +        }
>>      }
>>  
>>      rdma->current_length = 0;
>> @@ -4062,6 +4104,7 @@ wait_reg_complete:
>>              }
>>  
>>              qemu_sem_post(&multifd_send_param->sem_sync);
>> +            qemu_sem_wait(&multifd_send_param->sem);
> 
> why?
> 
The multifd send thread would post sem signal after finishing sending data.
The main thread need wait for multifd RDMA send threads to poll the CQE.
>>          }
>>      }
>>  
>> @@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque)
>>      Error *local_err = NULL;
>>      int ret = 0;
>>      RDMAControlHeader head = { .len = 0, .repeat = 1 };
>> +    RDMAContext *rdma = p->rdma;
>>  
>>      trace_multifd_send_thread_start(p->id);
>>      if (multifd_send_initial_packet(p, &local_err) < 0) {
>> @@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque)
>>  
>>      /* wait for semaphore notification to register memory */
>>      qemu_sem_wait(&p->sem_sync);
>> -    if (qemu_rdma_registration(p->rdma) < 0) {
>> +    if (qemu_rdma_registration(rdma) < 0) {
>>          goto out;
>>      }
>>      /*
>> @@ -4466,12 +4510,25 @@ static void *multifd_rdma_send_thread(void *opaque)
>>                  break;
>>              }
>>          }
>> +        /* To complete polling(CQE) */
>> +        while (rdma->nb_sent) {
> 
> Where is nb_sent decremented?
> 
the nb_sent is decreased in qemu_rdma_poll which is called by qemu_rdma_block_for_wrid.

>> +            ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
>> +            if (ret < 0) {
>> +                error_report("multifd RDMA migration: "
>> +                             "complete polling error!");
>> +                return NULL;
>> +            }
>> +        }
>>          /* Send FINISHED to the destination */
>>          head.type = RDMA_CONTROL_REGISTER_FINISHED;
>> -        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
>> +        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
>>          if (ret < 0) {
>> +            error_report("multifd RDMA migration: "
>> +                         "sending remote error!");
>>              return NULL;
>>          }
>> +        /* sync main thread */
>> +        qemu_sem_post(&p->sem);
>>      }
>>  
>>  out:
>> -- 
>> 1.8.3.1
>>
diff mbox series

Patch

diff --git a/migration/rdma.c b/migration/rdma.c
index 2097839..c19a91f 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -2002,6 +2002,20 @@  static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
                                .repeat = 1,
                              };
 
+    /* use multifd to send data */
+    if (migrate_use_multifd()) {
+        int channel = get_multifd_RDMA_channel();
+        int ret = 0;
+        MultiFDSendParams *multifd_send_param = NULL;
+        ret = get_multifd_send_param(channel, &multifd_send_param);
+        if (ret) {
+            error_report("rdma: error getting multifd_send_param(%d)", channel);
+            return -EINVAL;
+        }
+        rdma = (RDMAContext *)multifd_send_param->rdma;
+        block = &(rdma->local_ram_blocks.block[current_index]);
+    }
+
 retry:
     sge.addr = (uintptr_t)(block->local_host_addr +
                             (current_addr - block->offset));
@@ -2197,6 +2211,27 @@  retry:
     return 0;
 }
 
+static int multifd_rdma_write_flush(void)
+{
+    /* The multifd RDMA threads send data */
+    MultiFDSendParams *multifd_send_param = NULL;
+    RDMAContext *rdma = NULL;
+    MigrationState *s = migrate_get_current();
+    int ret = 0;
+
+    ret = get_multifd_send_param(s->rdma_channel,
+                                 &multifd_send_param);
+    if (ret) {
+        error_report("rdma: error getting multifd_send_param(%d)",
+                     s->rdma_channel);
+        return ret;
+    }
+    rdma = (RDMAContext *)(multifd_send_param->rdma);
+    rdma->nb_sent++;
+
+    return ret;
+}
+
 /*
  * Push out any unwritten RDMA operations.
  *
@@ -2219,8 +2254,15 @@  static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
     }
 
     if (ret == 0) {
-        rdma->nb_sent++;
-        trace_qemu_rdma_write_flush(rdma->nb_sent);
+        if (migrate_use_multifd()) {
+            ret = multifd_rdma_write_flush();
+            if (ret) {
+                return ret;
+            }
+        } else {
+            rdma->nb_sent++;
+            trace_qemu_rdma_write_flush(rdma->nb_sent);
+        }
     }
 
     rdma->current_length = 0;
@@ -4062,6 +4104,7 @@  wait_reg_complete:
             }
 
             qemu_sem_post(&multifd_send_param->sem_sync);
+            qemu_sem_wait(&multifd_send_param->sem);
         }
     }
 
@@ -4443,6 +4486,7 @@  static void *multifd_rdma_send_thread(void *opaque)
     Error *local_err = NULL;
     int ret = 0;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
+    RDMAContext *rdma = p->rdma;
 
     trace_multifd_send_thread_start(p->id);
     if (multifd_send_initial_packet(p, &local_err) < 0) {
@@ -4451,7 +4495,7 @@  static void *multifd_rdma_send_thread(void *opaque)
 
     /* wait for semaphore notification to register memory */
     qemu_sem_wait(&p->sem_sync);
-    if (qemu_rdma_registration(p->rdma) < 0) {
+    if (qemu_rdma_registration(rdma) < 0) {
         goto out;
     }
     /*
@@ -4466,12 +4510,25 @@  static void *multifd_rdma_send_thread(void *opaque)
                 break;
             }
         }
+        /* To complete polling(CQE) */
+        while (rdma->nb_sent) {
+            ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+            if (ret < 0) {
+                error_report("multifd RDMA migration: "
+                             "complete polling error!");
+                return NULL;
+            }
+        }
         /* Send FINISHED to the destination */
         head.type = RDMA_CONTROL_REGISTER_FINISHED;
-        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
+        ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
         if (ret < 0) {
+            error_report("multifd RDMA migration: "
+                         "sending remote error!");
             return NULL;
         }
+        /* sync main thread */
+        qemu_sem_post(&p->sem);
     }
 
 out: