@@ -1034,6 +1034,7 @@ static void domcreate_bootloader_done(libxl__egc *egc,
dcs->srs.dcs = dcs;
dcs->srs.fd = restore_fd;
dcs->srs.legacy = (dcs->restore_params.stream_version == 1);
+ dcs->srs.back_channel = false;
dcs->srs.completion_callback = domcreate_stream_done;
if (restore_fd >= 0) {
@@ -3444,6 +3444,7 @@ struct libxl__stream_read_state {
libxl__domain_create_state *dcs;
int fd;
bool legacy;
+ bool back_channel;
void (*completion_callback)(libxl__egc *egc,
libxl__stream_read_state *srs,
int rc);
@@ -3455,6 +3456,7 @@ struct libxl__stream_read_state {
bool running;
bool in_checkpoint;
bool sync_teardown; /* Only used to coordinate shutdown on error path. */
+ bool in_checkpoint_state;
libxl__save_helper_state shs;
libxl__conversion_helper_state chs;
@@ -3482,6 +3484,8 @@ _hidden void libxl__stream_read_start(libxl__egc *egc,
libxl__stream_read_state *stream);
_hidden void libxl__stream_read_start_checkpoint(libxl__egc *egc,
libxl__stream_read_state *stream);
+_hidden void libxl__stream_read_checkpoint_state(libxl__egc *egc,
+ libxl__stream_read_state *stream);
_hidden void libxl__stream_read_abort(libxl__egc *egc,
libxl__stream_read_state *stream, int rc);
static inline bool
@@ -118,6 +118,15 @@
* record, and therefore the buffered state is inconsistent. In
* libxl__xc_domain_restore_done(), we just complete the stream and
* stream->completion_callback() will be called to resume the guest
+ *
+ * For back channel stream:
+ * - libxl__stream_read_start()
+ * - Set up the stream to running state
+ *
+ * - libxl__stream_read_continue()
+ * - Set up reading the next record from a started stream.
+ * Add some codes to process_record() to handle the record.
+ * Then call stream->checkpoint_callback() to return.
*/
/* Success/error/cleanup handling. */
@@ -157,6 +166,10 @@ static void write_emulator_done(libxl__egc *egc,
libxl__datacopier_state *dc,
int rc, int onwrite, int errnoval);
+/* Handlers for checkpoint state mini-loop */
+static void checkpoint_state_done(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc);
+
/*----- Helpers -----*/
/* Helper to set up reading some data from the stream. */
@@ -221,6 +234,14 @@ void libxl__stream_read_start(libxl__egc *egc,
stream->running = true;
stream->phase = SRS_PHASE_NORMAL;
+ dc->ao = stream->ao;
+ dc->copywhat = "restore v2 stream";
+ dc->readfd = stream->fd;
+ dc->writefd = -1;
+
+ if (stream->back_channel)
+ return;
+
if (stream->legacy) {
/* Convert the legacy stream. */
libxl__conversion_helper_state *chs = &stream->chs;
@@ -241,12 +262,6 @@ void libxl__stream_read_start(libxl__egc *egc,
stream->fd = libxl__carefd_fd(stream->chs.v2_carefd);
stream->dcs->libxc_fd = stream->fd;
}
- /* stream->fd is now a v2 stream. */
-
- dc->ao = stream->ao;
- dc->copywhat = "restore v2 stream";
- dc->readfd = stream->fd;
- dc->writefd = -1;
/* Start reading the stream header. */
rc = setup_read(stream, "stream header",
@@ -532,6 +547,7 @@ static bool process_record(libxl__egc *egc,
STATE_AO_GC(stream->ao);
libxl__domain_create_state *dcs = stream->dcs;
libxl__sr_record_buf *rec;
+ libxl_sr_checkpoint_state *srcs;
bool further_action_needed = false;
int rc = 0;
@@ -602,6 +618,17 @@ static bool process_record(libxl__egc *egc,
checkpoint_done(egc, stream, 0);
break;
+ case REC_TYPE_CHECKPOINT_STATE:
+ if (!stream->in_checkpoint_state) {
+ LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream");
+ rc = ERROR_FAIL;
+ goto err;
+ }
+
+ srcs = rec->body;
+ checkpoint_state_done(egc, stream, srcs->id);
+ break;
+
default:
LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type);
rc = ERROR_FAIL;
@@ -713,6 +740,21 @@ static void stream_complete(libxl__egc *egc,
return;
}
+ if (stream->in_checkpoint_state) {
+ assert(rc);
+
+ /*
+ * If an error is encountered while in a checkpoint, pass it
+ * back to libxc. The failure will come back around to us via
+ * 1. normal stream
+ * libxl__xc_domain_restore_done()
+ * 2. back_channel stream
+ * libxl__stream_read_abort()
+ */
+ checkpoint_state_done(egc, stream, rc);
+ return;
+ }
+
stream_done(egc, stream, rc);
}
@@ -743,6 +785,7 @@ static void stream_done(libxl__egc *egc,
assert(stream->running);
assert(!stream->in_checkpoint);
+ assert(!stream->in_checkpoint_state);
stream->running = false;
if (stream->incoming_record)
@@ -762,7 +805,19 @@ static void stream_done(libxl__egc *egc,
LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec)
free_record(rec);
- check_all_finished(egc, stream, rc);
+ if (!stream->back_channel) {
+ /*
+ * 1. In stream_done(), stream->running is set to false, so
+ * the stream itself is not in use.
+ * 2. Read stream is a back channel stream, this means it is
+ * only used by primary(save side) to read records sent by
+ * secondary(restore side), so it doesn't have restore helper.
+ * 3. Back channel stream doesn't support legacy stream, so
+ * there is no conversion helper.
+ * So we don't need invoke check_all_finished here
+ */
+ check_all_finished(egc, stream, rc);
+ }
}
void libxl__xc_domain_restore_done(libxl__egc *egc, void *dcs_void,
@@ -862,7 +917,30 @@ static void check_all_finished(libxl__egc *egc,
libxl__conversion_helper_inuse(&stream->chs))
return;
- stream->completion_callback(egc, stream, stream->rc);
+ if (stream->completion_callback)
+ /* back channel stream doesn't have completion_callback() */
+ stream->completion_callback(egc, stream, stream->rc);
+}
+
+/*----- Checkpoint state handlers -----*/
+
+void libxl__stream_read_checkpoint_state(libxl__egc *egc,
+ libxl__stream_read_state *stream)
+{
+ assert(stream->running);
+ assert(!stream->in_checkpoint);
+ assert(!stream->in_checkpoint_state);
+ stream->in_checkpoint_state = true;
+
+ setup_read_record(egc, stream);
+}
+
+static void checkpoint_state_done(libxl__egc *egc,
+ libxl__stream_read_state *stream, int rc)
+{
+ assert(stream->in_checkpoint_state);
+ stream->in_checkpoint_state = false;
+ stream->checkpoint_callback(egc, stream, rc);
}
/*