diff mbox

[v19,08/10] Implement new driver for block replication

Message ID 1463729780-31982-9-git-send-email-xiecl.fnst@cn.fujitsu.com (mailing list archive)
State New, archived
Headers show

Commit Message

Changlong Xie May 20, 2016, 7:36 a.m. UTC
From: Wen Congyang <wency@cn.fujitsu.com>

Signed-off-by: Wen Congyang <wency@cn.fujitsu.com>
Signed-off-by: zhanghailiang <zhang.zhanghailiang@huawei.com>
Signed-off-by: Gonglei <arei.gonglei@huawei.com>
Signed-off-by: Changlong Xie <xiecl.fnst@cn.fujitsu.com>
---
 block/Makefile.objs |   1 +
 block/replication.c | 666 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 667 insertions(+)
 create mode 100644 block/replication.c

Comments

Stefan Hajnoczi May 30, 2016, 6:14 p.m. UTC | #1
On Fri, May 20, 2016 at 03:36:18PM +0800, Changlong Xie wrote:
> +        /* start backup job now */
> +        error_setg(&s->blocker,
> +                   "block device is in use by internal backup job");
> +
> +        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, errp);
> +        if (!top_bs || !check_top_bs(top_bs, bs)) {
> +            reopen_backing_file(s, false, NULL);
> +            aio_context_release(aio_context);
> +            return;
> +        }

Missing error_setg() with an error message when check_top_bs() fails.
Changlong Xie May 31, 2016, 1:20 a.m. UTC | #2
On 05/31/2016 02:14 AM, Stefan Hajnoczi wrote:
> On Fri, May 20, 2016 at 03:36:18PM +0800, Changlong Xie wrote:
>> +        /* start backup job now */
>> +        error_setg(&s->blocker,
>> +                   "block device is in use by internal backup job");
>> +
>> +        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, errp);
>> +        if (!top_bs || !check_top_bs(top_bs, bs)) {
>> +            reopen_backing_file(s, false, NULL);
>> +            aio_context_release(aio_context);
>> +            return;
>> +        }
>
> Missing error_setg() with an error message when check_top_bs() fails.
>

Will add.

Thanks
	-Xie
Changlong Xie June 7, 2016, 4:59 a.m. UTC | #3
On 05/20/2016 03:36 PM, Changlong Xie wrote:
> +
> +        /*
> +         * Must protect backup target if backup job was stopped/cancelled
> +         * unexpectedly
> +         */
> +        bdrv_ref(s->hidden_disk->bs);
> +
> +        backup_start(s->secondary_disk->bs, s->hidden_disk->bs, 0,
> +                     MIRROR_SYNC_MODE_NONE, NULL, BLOCKDEV_ON_ERROR_REPORT,
> +                     BLOCKDEV_ON_ERROR_REPORT, backup_job_completed,
> +                     s, NULL, &local_err);
> +        if (local_err) {
> +            error_propagate(errp, local_err);
> +            backup_job_cleanup(s);
> +            bdrv_unref(s->hidden_disk->bs);
> +            aio_context_release(aio_context);
> +            return;
> +        }
> +        break;
> +    default:
> +        aio_context_release(aio_context);
> +        abort();
> +    }

commit 5c438bc6 introduce BB for I/O, so we don't need protect backup 
target by ourself now.

-        /*
-         * Must protect backup target if backup job was stopped/cancelled
-         * unexpectedly
-         */
-        bdrv_ref(s->hidden_disk->bs);
-
          backup_start(s->secondary_disk->bs, s->hidden_disk->bs, 0,
                       MIRROR_SYNC_MODE_NONE, NULL, 
BLOCKDEV_ON_ERROR_REPORT,
                       BLOCKDEV_ON_ERROR_REPORT, backup_job_completed,
@@ -508,7 +502,6 @@ static void replication_start(ReplicationState *rs, 
ReplicationMode mode,
          if (local_err) {
              error_propagate(errp, local_err);
              backup_job_cleanup(s);
-            bdrv_unref(s->hidden_disk->bs);
              aio_context_release(aio_context);
              return;
          }

will update in next version.
Changlong Xie June 7, 2016, 5:36 a.m. UTC | #4
On 05/20/2016 03:36 PM, Changlong Xie wrote:
> +        if (!failover) {
> +            /*
> +             * This BDS will be closed, and the job should be completed
> +             * before the BDS is closed, because we will access hidden
> +             * disk, secondary disk in backup_job_completed().
> +             */
> +            if (s->secondary_disk->bs->job) {
> +                block_job_cancel_sync(s->secondary_disk->bs->job);
> +            }
> +            secondary_do_checkpoint(s, errp);
> +            s->replication_state = BLOCK_REPLICATION_DONE;
> +            aio_context_release(aio_context);
> +            return;
> +        }
> +
> +        s->replication_state = BLOCK_REPLICATION_FAILOVER;
> +        if (s->secondary_disk->bs->job) {
> +            block_job_cancel(s->secondary_disk->bs->job);
> +        }

Since commit b6d2e599 "block: Convert block job core to BlockBackend", 
blockjob uses BB instead of bdrv_ref(), this introduces unexpected 
Segmentation fault with COLO.

In the below backtrace, you can see that. During failover, s->target was 
changed to an illegal value "0x1e1e1e1e1e1e1e1e" in bakup_complete.
Then the active commit job what also has a pointer that refer to 
s->target will use this illegal pointer. To avoid this, we should use 
"bloc_job_cancel_sync" to ensure backup job complete synchronously.

% MALLOC_PERTURB_=$(($RANDOM % 255 + 1))
% export MALLOC_PERTURB_
% gdb --args ./tests/test-replication
(gdb) b backup_complete
(gdb) r
(gdb) n
(gdb) n
(gdb) watch s->target
(gdb) c
Old value = (BlockBackend *) 0x555555f1d990
New value = (BlockBackend *) 0x1e1e1e1e1e1e1e1e
0x00007ffff5a811eb in memset () from /lib64/libc.so.6
(gdb) bt
#0  0x00007ffff5a811eb in memset () from /lib64/libc.so.6
#1  0x00007ffff5a7500e in _int_free () from /lib64/libc.so.6
#2  0x00007ffff705bf7f in g_free () from /lib64/libglib-2.0.so.0
#3  0x000055555557e924 in block_job_unref (job=0x555555f1d630) at 
blockjob.c:124
#4  0x000055555557e9da in block_job_completed_single 
(job=0x555555f1d630) at blockjob.c:143
#5  0x000055555557ecc8 in block_job_completed (job=0x555555f1d630, 
ret=0) at blockjob.c:215
#6  0x00005555555e6d49 in backup_complete (job=0x555555f1d630, 
opaque=0x555555f1dd50) at block/backup.c:325
#7  0x000055555557f5f4 in block_job_defer_to_main_loop_bh 
(opaque=0x5555596e1dc0) at blockjob.c:500
#8  0x00005555555747d7 in aio_bh_call (bh=0x5555596e1c30) at async.c:66
#9  0x0000555555574899 in aio_bh_poll (ctx=0x555555ce2d60) at async.c:94
#10 0x0000555555581d4d in aio_dispatch (ctx=0x555555ce2d60) at 
aio-posix.c:308
#11 0x00005555555823cb in aio_poll (ctx=0x555555ce2d60, blocking=false) 
at aio-posix.c:479
#12 0x00005555555d639b in bdrv_drain_poll (bs=0x555555dec210) at 
block/io.c:190
#13 0x00005555555d6566 in bdrv_drained_begin (bs=0x555555dec210) at 
block/io.c:240
#14 0x0000555555577261 in bdrv_child_cb_drained_begin 
(child=0x5555596e1ab0) at block.c:665
#15 0x00005555555d5e81 in bdrv_parent_drained_begin (bs=0x555555f0e130) 
at block/io.c:54
#16 0x00005555555d652b in bdrv_drained_begin (bs=0x555555f0e130) at 
block/io.c:232
#17 0x0000555555577261 in bdrv_child_cb_drained_begin 
(child=0x5555596e1810) at block.c:665
#18 0x00005555555d5e81 in bdrv_parent_drained_begin (bs=0x5555596d7030) 
at block/io.c:54
#19 0x00005555555d652b in bdrv_drained_begin (bs=0x5555596d7030) at 
block/io.c:232
#20 0x0000555555577261 in bdrv_child_cb_drained_begin 
(child=0x555555df8830) at block.c:665
#21 0x00005555555d5e81 in bdrv_parent_drained_begin (bs=0x555555df0bb0) 
at block/io.c:54
#22 0x00005555555d66ee in bdrv_drain_all () at block/io.c:301
#23 0x0000555555579f9f in bdrv_reopen_multiple (bs_queue=0x555555f1dd30, 
errp=0x7fffffffd768) at block.c:1953
#24 0x000055555557a169 in bdrv_reopen (bs=0x555555df0bb0, 
bdrv_flags=24578, errp=0x7fffffffd8e0) at block.c:1994
#25 0x00005555555d54a7 in commit_active_start (bs=0x555555f0e130, 
base=0x555555df0bb0, speed=0, on_error=BLOCKDEV_ON_ERROR_REPORT, 
cb=0x5555555e8b97 <replication_done>,
     opaque=0x555555dec210, errp=0x7fffffffd8e0, auto_complete=true) at 
block/mirror.c:901
#26 0x00005555555e8d98 in replication_stop (rs=0x55555746f7b0, 
failover=true, errp=0x7fffffffd8e0) at block/replication.c:623
#27 0x00005555555873d1 in replication_stop_all (failover=true, 
errp=0x7fffffffd928) at replication.c:98
#28 0x000055555557406d in test_secondary_start () at 
tests/test-replication.c:403
#29 0x00007ffff707a5e1 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#30 0x00007ffff707a7a6 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#31 0x00007ffff707a7a6 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#32 0x00007ffff707ab1b in g_test_run_suite () from /lib64/libglib-2.0.so.0
#33 0x00005555555746c8 in main (argc=1, argv=0x7fffffffdda8) at 
tests/test-replication.c:545
(gdb) d breakpoints
(gdb) c
Program received signal SIGSEGV, Segmentation fault.
0x00005555555c7d6c in blk_bs (blk=0x1e1e1e1e1e1e1e1e) at 
block/block-backend.c:389
389         return blk->root ? blk->root->bs : NULL;
(gdb) bt
#0  0x00005555555c7d6c in blk_bs (blk=0x1e1e1e1e1e1e1e1e) at 
block/block-backend.c:389
#1  0x00005555555c79e3 in bdrv_next (it=0x7fffffffd6a0) at 
block/block-backend.c:279
#2  0x00005555555d674d in bdrv_drain_all () at block/io.c:294
#3  0x0000555555579f9f in bdrv_reopen_multiple (bs_queue=0x555555f1dd30, 
errp=0x7fffffffd768) at block.c:1953
#4  0x000055555557a169 in bdrv_reopen (bs=0x555555df0bb0, 
bdrv_flags=24578, errp=0x7fffffffd8e0) at block.c:1994
#5  0x00005555555d54a7 in commit_active_start (bs=0x555555f0e130, 
base=0x555555df0bb0, speed=0, on_error=BLOCKDEV_ON_ERROR_REPORT, 
cb=0x5555555e8b97 <replication_done>,
     opaque=0x555555dec210, errp=0x7fffffffd8e0, auto_complete=true) at 
block/mirror.c:901
#6  0x00005555555e8d98 in replication_stop (rs=0x55555746f7b0, 
failover=true, errp=0x7fffffffd8e0) at block/replication.c:623
#7  0x00005555555873d1 in replication_stop_all (failover=true, 
errp=0x7fffffffd928) at replication.c:98
#8  0x000055555557406d in test_secondary_start () at 
tests/test-replication.c:403
#9  0x00007ffff707a5e1 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#10 0x00007ffff707a7a6 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#11 0x00007ffff707a7a6 in g_test_run_suite_internal () from 
/lib64/libglib-2.0.so.0
#12 0x00007ffff707ab1b in g_test_run_suite () from /lib64/libglib-2.0.so.0
#13 0x00005555555746c8 in main (argc=1, argv=0x7fffffffdda8) at 
tests/test-replication.c:545

> +
> +        commit_active_start(s->active_disk->bs, s->secondary_disk->bs, 0,
> +                            BLOCKDEV_ON_ERROR_REPORT, replication_done,
> +                            bs, errp, true);
> +        break;
diff mbox

Patch

diff --git a/block/Makefile.objs b/block/Makefile.objs
index fbfe647..5e28b45 100644
--- a/block/Makefile.objs
+++ b/block/Makefile.objs
@@ -23,6 +23,7 @@  block-obj-$(CONFIG_LIBSSH2) += ssh.o
 block-obj-y += accounting.o dirty-bitmap.o
 block-obj-y += write-threshold.o
 block-obj-y += backup.o
+block-obj-y += replication.o
 
 block-obj-y += crypto.o
 
diff --git a/block/replication.c b/block/replication.c
new file mode 100644
index 0000000..5228c42
--- /dev/null
+++ b/block/replication.c
@@ -0,0 +1,666 @@ 
+/*
+ * Replication Block filter
+ *
+ * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
+ * Copyright (c) 2016 Intel Corporation
+ * Copyright (c) 2016 FUJITSU LIMITED
+ *
+ * Author:
+ *   Wen Congyang <wency@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu-common.h"
+#include "block/nbd.h"
+#include "block/blockjob.h"
+#include "block/block_int.h"
+#include "block/block_backup.h"
+#include "sysemu/block-backend.h"
+#include "qapi/error.h"
+#include "replication.h"
+
+typedef struct BDRVReplicationState {
+    ReplicationMode mode;
+    int replication_state;
+    BdrvChild *active_disk;
+    BdrvChild *hidden_disk;
+    BdrvChild *secondary_disk;
+    char *top_id;
+    ReplicationState *rs;
+    Error *blocker;
+    int orig_hidden_flags;
+    int orig_secondary_flags;
+    int error;
+} BDRVReplicationState;
+
+enum {
+    BLOCK_REPLICATION_NONE,             /* block replication is not started */
+    BLOCK_REPLICATION_RUNNING,          /* block replication is running */
+    BLOCK_REPLICATION_FAILOVER,         /* failover is running in background */
+    BLOCK_REPLICATION_FAILOVER_FAILED,  /* failover failed */
+    BLOCK_REPLICATION_DONE,             /* block replication is done */
+};
+
+static void replication_start(ReplicationState *rs, ReplicationMode mode,
+                              Error **errp);
+static void replication_do_checkpoint(ReplicationState *rs, Error **errp);
+static void replication_get_error(ReplicationState *rs, Error **errp);
+static void replication_stop(ReplicationState *rs, bool failover,
+                             Error **errp);
+
+#define REPLICATION_MODE        "mode"
+#define REPLICATION_TOP_ID      "top-id"
+static QemuOptsList replication_runtime_opts = {
+    .name = "replication",
+    .head = QTAILQ_HEAD_INITIALIZER(replication_runtime_opts.head),
+    .desc = {
+        {
+            .name = REPLICATION_MODE,
+            .type = QEMU_OPT_STRING,
+        },
+        {
+            .name = REPLICATION_TOP_ID,
+            .type = QEMU_OPT_STRING,
+        },
+        { /* end of list */ }
+    },
+};
+
+static ReplicationOps replication_ops = {
+    .start = replication_start,
+    .checkpoint = replication_do_checkpoint,
+    .get_error = replication_get_error,
+    .stop = replication_stop,
+};
+
+static int replication_open(BlockDriverState *bs, QDict *options,
+                            int flags, Error **errp)
+{
+    int ret;
+    BDRVReplicationState *s = bs->opaque;
+    Error *local_err = NULL;
+    QemuOpts *opts = NULL;
+    const char *mode;
+    const char *top_id;
+
+    ret = -EINVAL;
+    opts = qemu_opts_create(&replication_runtime_opts, NULL, 0, &error_abort);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (local_err) {
+        goto fail;
+    }
+
+    mode = qemu_opt_get(opts, REPLICATION_MODE);
+    if (!mode) {
+        error_setg(&local_err, "Missing the option mode");
+        goto fail;
+    }
+
+    if (!strcmp(mode, "primary")) {
+        s->mode = REPLICATION_MODE_PRIMARY;
+    } else if (!strcmp(mode, "secondary")) {
+        s->mode = REPLICATION_MODE_SECONDARY;
+        top_id = qemu_opt_get(opts, REPLICATION_TOP_ID);
+        s->top_id = g_strdup(top_id);
+        if (!s->top_id) {
+            error_setg(&local_err, "Missing the option top-id");
+            goto fail;
+        }
+    } else {
+        error_setg(&local_err,
+                   "The option mode's value should be primary or secondary");
+        goto fail;
+    }
+
+    s->rs = replication_new(bs, &replication_ops);
+
+    ret = 0;
+
+fail:
+    qemu_opts_del(opts);
+    error_propagate(errp, local_err);
+
+    return ret;
+}
+
+static void replication_close(BlockDriverState *bs)
+{
+    BDRVReplicationState *s = bs->opaque;
+
+    if (s->replication_state == BLOCK_REPLICATION_RUNNING) {
+        replication_stop(s->rs, false, NULL);
+    }
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        g_free(s->top_id);
+    }
+
+    replication_remove(s->rs);
+}
+
+static int64_t replication_getlength(BlockDriverState *bs)
+{
+    return bdrv_getlength(bs->file->bs);
+}
+
+static int replication_get_io_status(BDRVReplicationState *s)
+{
+    switch (s->replication_state) {
+    case BLOCK_REPLICATION_NONE:
+        return -EIO;
+    case BLOCK_REPLICATION_RUNNING:
+        return 0;
+    case BLOCK_REPLICATION_FAILOVER:
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
+    case BLOCK_REPLICATION_FAILOVER_FAILED:
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 1;
+    case BLOCK_REPLICATION_DONE:
+        /*
+         * active commit job completes, and active disk and secondary_disk
+         * is swapped, so we can operate bs->file directly
+         */
+        return s->mode == REPLICATION_MODE_PRIMARY ? -EIO : 0;
+    default:
+        abort();
+    }
+}
+
+static int replication_return_value(BDRVReplicationState *s, int ret)
+{
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        return ret;
+    }
+
+    if (ret < 0) {
+        s->error = ret;
+        ret = 0;
+    }
+
+    return ret;
+}
+
+static coroutine_fn int replication_co_readv(BlockDriverState *bs,
+                                             int64_t sector_num,
+                                             int remaining_sectors,
+                                             QEMUIOVector *qiov)
+{
+    BDRVReplicationState *s = bs->opaque;
+    BdrvChild *child = s->secondary_disk;
+    BlockJob *job = NULL;
+    CowRequest req;
+    int ret;
+
+    if (s->mode == REPLICATION_MODE_PRIMARY) {
+        /* We only use it to forward primary write requests */
+        return -EIO;
+    }
+
+    ret = replication_get_io_status(s);
+    if (ret < 0) {
+        return ret;
+    }
+
+    if (child && child->bs) {
+        job = child->bs->job;
+    }
+
+    if (job) {
+        backup_wait_for_overlapping_requests(child->bs->job, sector_num,
+                                             remaining_sectors);
+        backup_cow_request_begin(&req, child->bs->job, sector_num,
+                                 remaining_sectors);
+        ret = bdrv_co_readv(bs->file->bs, sector_num, remaining_sectors,
+                            qiov);
+        backup_cow_request_end(&req);
+        goto out;
+    }
+
+    ret = bdrv_co_readv(bs->file->bs, sector_num, remaining_sectors, qiov);
+out:
+    return replication_return_value(s, ret);
+}
+
+static coroutine_fn int replication_co_writev(BlockDriverState *bs,
+                                              int64_t sector_num,
+                                              int remaining_sectors,
+                                              QEMUIOVector *qiov)
+{
+    BDRVReplicationState *s = bs->opaque;
+    QEMUIOVector hd_qiov;
+    uint64_t bytes_done = 0;
+    BdrvChild *top = bs->file;
+    BdrvChild *base = s->secondary_disk;
+    BlockDriverState *target;
+    int ret, n;
+
+    ret = replication_get_io_status(s);
+    if (ret < 0) {
+        goto out;
+    }
+
+    if (ret == 0) {
+        ret = bdrv_co_writev(top->bs, sector_num,
+                             remaining_sectors, qiov);
+        return replication_return_value(s, ret);
+    }
+
+    /*
+     * Failover failed, only write to active disk if the sectors
+     * have already been allocated in active disk/hidden disk.
+     */
+    qemu_iovec_init(&hd_qiov, qiov->niov);
+    while (remaining_sectors > 0) {
+        ret = bdrv_is_allocated_above(top->bs, base->bs, sector_num,
+                                      remaining_sectors, &n);
+        if (ret < 0) {
+            goto out1;
+        }
+
+        qemu_iovec_reset(&hd_qiov);
+        qemu_iovec_concat(&hd_qiov, qiov, bytes_done, n * BDRV_SECTOR_SIZE);
+
+        target = ret ? (top->bs) : (base->bs);
+        ret = bdrv_co_writev(target, sector_num, n, &hd_qiov);
+        if (ret < 0) {
+            goto out1;
+        }
+
+        remaining_sectors -= n;
+        sector_num += n;
+        bytes_done += n * BDRV_SECTOR_SIZE;
+    }
+
+out1:
+    qemu_iovec_destroy(&hd_qiov);
+out:
+    return ret;
+}
+
+static bool replication_recurse_is_first_non_filter(BlockDriverState *bs,
+                                                    BlockDriverState *candidate)
+{
+    return bdrv_recurse_is_first_non_filter(bs->file->bs, candidate);
+}
+
+static void secondary_do_checkpoint(BDRVReplicationState *s, Error **errp)
+{
+    Error *local_err = NULL;
+    int ret;
+
+    if (!s->secondary_disk->bs->job) {
+        error_setg(errp, "Backup job was cancelled unexpectedly");
+        return;
+    }
+
+    backup_do_checkpoint(s->secondary_disk->bs->job, &local_err);
+    if (local_err) {
+        error_propagate(errp, local_err);
+        return;
+    }
+
+    ret = s->active_disk->bs->drv->bdrv_make_empty(s->active_disk->bs);
+    if (ret < 0) {
+        error_setg(errp, "Cannot make active disk empty");
+        return;
+    }
+
+    ret = s->hidden_disk->bs->drv->bdrv_make_empty(s->hidden_disk->bs);
+    if (ret < 0) {
+        error_setg(errp, "Cannot make hidden disk empty");
+        return;
+    }
+}
+
+static void reopen_backing_file(BDRVReplicationState *s, bool writable,
+                                Error **errp)
+{
+    BlockReopenQueue *reopen_queue = NULL;
+    int orig_hidden_flags, orig_secondary_flags;
+    int new_hidden_flags, new_secondary_flags;
+    Error *local_err = NULL;
+
+    if (writable) {
+        orig_hidden_flags = s->orig_hidden_flags =
+                                bdrv_get_flags(s->hidden_disk->bs);
+        new_hidden_flags = (orig_hidden_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        orig_secondary_flags = s->orig_secondary_flags =
+                                bdrv_get_flags(s->secondary_disk->bs);
+        new_secondary_flags = (orig_secondary_flags | BDRV_O_RDWR) &
+                                                     ~BDRV_O_INACTIVE;
+    } else {
+        orig_hidden_flags = (s->orig_hidden_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        new_hidden_flags = s->orig_hidden_flags;
+        orig_secondary_flags = (s->orig_secondary_flags | BDRV_O_RDWR) &
+                                                    ~BDRV_O_INACTIVE;
+        new_secondary_flags = s->orig_secondary_flags;
+    }
+
+    if (orig_hidden_flags != new_hidden_flags) {
+        reopen_queue = bdrv_reopen_queue(reopen_queue, s->hidden_disk->bs, NULL,
+                                         new_hidden_flags);
+    }
+
+    if (!(orig_secondary_flags & BDRV_O_RDWR)) {
+        reopen_queue = bdrv_reopen_queue(reopen_queue, s->secondary_disk->bs,
+                                         NULL, new_secondary_flags);
+    }
+
+    if (reopen_queue) {
+        bdrv_reopen_multiple(reopen_queue, &local_err);
+        error_propagate(errp, local_err);
+    }
+}
+
+static void backup_job_cleanup(BDRVReplicationState *s)
+{
+    BlockDriverState *top_bs;
+
+    top_bs = bdrv_lookup_bs(s->top_id, s->top_id, NULL);
+    if (!top_bs) {
+        return;
+    }
+    bdrv_op_unblock_all(top_bs, s->blocker);
+    error_free(s->blocker);
+    reopen_backing_file(s, false, NULL);
+}
+
+static void backup_job_completed(void *opaque, int ret)
+{
+    BDRVReplicationState *s = opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_FAILOVER) {
+        /* The backup job is cancelled unexpectedly */
+        s->error = -EIO;
+    }
+
+    backup_job_cleanup(s);
+}
+
+static bool check_top_bs(BlockDriverState *top_bs, BlockDriverState *bs)
+{
+    BdrvChild *child;
+
+    /* The bs itself is the top_bs */
+    if (top_bs == bs) {
+        return true;
+    }
+
+    /* Iterate over top_bs's children */
+    QLIST_FOREACH(child, &top_bs->children, next) {
+        if (child->bs == bs || check_top_bs(child->bs, bs)) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+static void replication_start(ReplicationState *rs, ReplicationMode mode,
+                              Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    BlockDriverState *top_bs;
+    int64_t active_length, hidden_length, disk_length;
+    AioContext *aio_context;
+    Error *local_err = NULL;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_NONE) {
+        error_setg(errp, "Block replication is running or done");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    if (s->mode != mode) {
+        error_setg(errp, "The parameter mode's value is invalid, needs %d,"
+                   " but got %d", s->mode, mode);
+        aio_context_release(aio_context);
+        return;
+    }
+
+    switch (s->mode) {
+    case REPLICATION_MODE_PRIMARY:
+        break;
+    case REPLICATION_MODE_SECONDARY:
+        s->active_disk = bs->file;
+        if (!s->active_disk || !s->active_disk->bs ||
+                                    !s->active_disk->bs->backing) {
+            error_setg(errp, "Active disk doesn't have backing file");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->hidden_disk = s->active_disk->bs->backing;
+        if (!s->hidden_disk->bs || !s->hidden_disk->bs->backing) {
+            error_setg(errp, "Hidden disk doesn't have backing file");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->secondary_disk = s->hidden_disk->bs->backing;
+        if (!s->secondary_disk->bs || !bdrv_has_blk(s->secondary_disk->bs)) {
+            error_setg(errp, "The secondary disk doesn't have block backend");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* verify the length */
+        active_length = bdrv_getlength(s->active_disk->bs);
+        hidden_length = bdrv_getlength(s->hidden_disk->bs);
+        disk_length = bdrv_getlength(s->secondary_disk->bs);
+        if (active_length < 0 || hidden_length < 0 || disk_length < 0 ||
+            active_length != hidden_length || hidden_length != disk_length) {
+            error_setg(errp, "active disk, hidden disk, secondary disk's length"
+                       " are not the same");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        if (!s->active_disk->bs->drv->bdrv_make_empty ||
+            !s->hidden_disk->bs->drv->bdrv_make_empty) {
+            error_setg(errp,
+                       "active disk or hidden disk doesn't support make_empty");
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* reopen the backing file in r/w mode */
+        reopen_backing_file(s, true, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            aio_context_release(aio_context);
+            return;
+        }
+
+        /* start backup job now */
+        error_setg(&s->blocker,
+                   "block device is in use by internal backup job");
+
+        top_bs = bdrv_lookup_bs(s->top_id, s->top_id, errp);
+        if (!top_bs || !check_top_bs(top_bs, bs)) {
+            reopen_backing_file(s, false, NULL);
+            aio_context_release(aio_context);
+            return;
+        }
+        bdrv_op_block_all(top_bs, s->blocker);
+        bdrv_op_unblock(top_bs, BLOCK_OP_TYPE_DATAPLANE, s->blocker);
+
+        /*
+         * Must protect backup target if backup job was stopped/cancelled
+         * unexpectedly
+         */
+        bdrv_ref(s->hidden_disk->bs);
+
+        backup_start(s->secondary_disk->bs, s->hidden_disk->bs, 0,
+                     MIRROR_SYNC_MODE_NONE, NULL, BLOCKDEV_ON_ERROR_REPORT,
+                     BLOCKDEV_ON_ERROR_REPORT, backup_job_completed,
+                     s, NULL, &local_err);
+        if (local_err) {
+            error_propagate(errp, local_err);
+            backup_job_cleanup(s);
+            bdrv_unref(s->hidden_disk->bs);
+            aio_context_release(aio_context);
+            return;
+        }
+        break;
+    default:
+        aio_context_release(aio_context);
+        abort();
+    }
+
+    s->replication_state = BLOCK_REPLICATION_RUNNING;
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        secondary_do_checkpoint(s, errp);
+    }
+
+    s->error = 0;
+    aio_context_release(aio_context);
+}
+
+static void replication_do_checkpoint(ReplicationState *rs, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->mode == REPLICATION_MODE_SECONDARY) {
+        secondary_do_checkpoint(s, errp);
+    }
+    aio_context_release(aio_context);
+}
+
+static void replication_get_error(ReplicationState *rs, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
+        error_setg(errp, "Block replication is not running");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    if (s->error) {
+        error_setg(errp, "I/O error occurred");
+        aio_context_release(aio_context);
+        return;
+    }
+    aio_context_release(aio_context);
+}
+
+static void replication_done(void *opaque, int ret)
+{
+    BlockDriverState *bs = opaque;
+    BDRVReplicationState *s = bs->opaque;
+
+    if (ret == 0) {
+        s->replication_state = BLOCK_REPLICATION_DONE;
+
+        /* refresh top bs's filename */
+        bdrv_refresh_filename(bs);
+        s->active_disk = NULL;
+        s->secondary_disk = NULL;
+        s->hidden_disk = NULL;
+        s->error = 0;
+    } else {
+        s->replication_state = BLOCK_REPLICATION_FAILOVER_FAILED;
+        s->error = -EIO;
+    }
+}
+
+static void replication_stop(ReplicationState *rs, bool failover, Error **errp)
+{
+    BlockDriverState *bs = rs->opaque;
+    BDRVReplicationState *s;
+    AioContext *aio_context;
+
+    aio_context = bdrv_get_aio_context(bs);
+    aio_context_acquire(aio_context);
+    s = bs->opaque;
+
+    if (s->replication_state != BLOCK_REPLICATION_RUNNING) {
+        error_setg(errp, "Block replication is not running");
+        aio_context_release(aio_context);
+        return;
+    }
+
+    switch (s->mode) {
+    case REPLICATION_MODE_PRIMARY:
+        s->replication_state = BLOCK_REPLICATION_DONE;
+        s->error = 0;
+        break;
+    case REPLICATION_MODE_SECONDARY:
+        if (!failover) {
+            /*
+             * This BDS will be closed, and the job should be completed
+             * before the BDS is closed, because we will access hidden
+             * disk, secondary disk in backup_job_completed().
+             */
+            if (s->secondary_disk->bs->job) {
+                block_job_cancel_sync(s->secondary_disk->bs->job);
+            }
+            secondary_do_checkpoint(s, errp);
+            s->replication_state = BLOCK_REPLICATION_DONE;
+            aio_context_release(aio_context);
+            return;
+        }
+
+        s->replication_state = BLOCK_REPLICATION_FAILOVER;
+        if (s->secondary_disk->bs->job) {
+            block_job_cancel(s->secondary_disk->bs->job);
+        }
+
+        commit_active_start(s->active_disk->bs, s->secondary_disk->bs, 0,
+                            BLOCKDEV_ON_ERROR_REPORT, replication_done,
+                            bs, errp, true);
+        break;
+    default:
+        aio_context_release(aio_context);
+        abort();
+    }
+    aio_context_release(aio_context);
+}
+
+BlockDriver bdrv_replication = {
+    .format_name                = "replication",
+    .protocol_name              = "replication",
+    .instance_size              = sizeof(BDRVReplicationState),
+
+    .bdrv_open                  = replication_open,
+    .bdrv_close                 = replication_close,
+
+    .bdrv_getlength             = replication_getlength,
+    .bdrv_co_readv              = replication_co_readv,
+    .bdrv_co_writev             = replication_co_writev,
+
+    .is_filter                  = true,
+    .bdrv_recurse_is_first_non_filter = replication_recurse_is_first_non_filter,
+
+    .has_variable_length        = true,
+};
+
+static void bdrv_replication_init(void)
+{
+    bdrv_register(&bdrv_replication);
+}
+
+block_init(bdrv_replication_init);