@@ -3123,9 +3123,7 @@ struct libxl__stream_read_state {
/* Private */
int rc;
bool running;
- bool in_checkpoint;
bool sync_teardown; /* Only used to coordinate shutdown on error path. */
- bool in_checkpoint_state;
libxl__save_helper_state shs;
libxl__conversion_helper_state chs;
@@ -3135,8 +3133,9 @@ struct libxl__stream_read_state {
LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; /* NOGC */
enum {
SRS_PHASE_NORMAL,
- SRS_PHASE_BUFFERING,
- SRS_PHASE_UNBUFFERING,
+ SRS_PHASE_CHECKPOINT_BUFFERING,
+ SRS_PHASE_CHECKPOINT_UNBUFFERING,
+ SRS_PHASE_CHECKPOINT_STATE
} phase;
bool recursion_guard;
@@ -29,14 +29,15 @@
* processed, and all records will be processed in queue order.
*
* Internal states:
- * running phase in_ record incoming
- * checkpoint _queue _record
+ * running phase record incoming
+ * _queue _record
*
- * Undefined undef undef undef undef undef
- * Idle false undef false 0 0
- * Active true NORMAL false 0/1 0/partial
- * Active true BUFFERING true any 0/partial
- * Active true UNBUFFERING true any 0
+ * Undefined undef undef undef undef
+ * Idle false undef 0 0
+ * Active true NORMAL 0/1 0/partial
+ * Active true CHECKPOINT_BUFFERING any 0/partial
+ * Active true CHECKPOINT_UNBUFFERING any 0
+ * Active true CHECKPOINT_STATE 0/1 0/partial
*
* While reading data from the stream, 'dc' is active and a callback
* is expected. Most actions in process_record() start a callback of
@@ -48,12 +49,12 @@
* Records are read one at time and immediately processed. (The
* record queue will not contain more than a single record.)
*
- * PHASE_BUFFERING:
+ * PHASE_CHECKPOINT_BUFFERING:
* This phase is used in checkpointed streams, when libxc signals
* the presence of a checkpoint in the stream. Records are read and
* buffered until a CHECKPOINT_END record has been read.
*
- * PHASE_UNBUFFERING:
+ * PHASE_CHECKPOINT_UNBUFFERING:
* Once a CHECKPOINT_END record has been read, all buffered records
* are processed.
*
@@ -172,6 +173,12 @@ static void checkpoint_state_done(libxl__egc *egc,
/*----- Helpers -----*/
+static inline bool stream_in_checkpoint(libxl__stream_read_state *stream)
+{
+ return stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING ||
+ stream->phase == SRS_PHASE_CHECKPOINT_UNBUFFERING;
+}
+
/* Helper to set up reading some data from the stream. */
static int setup_read(libxl__stream_read_state *stream,
const char *what, void *ptr, size_t nr_bytes,
@@ -210,7 +217,6 @@ void libxl__stream_read_init(libxl__stream_read_state *stream)
stream->rc = 0;
stream->running = false;
- stream->in_checkpoint = false;
stream->sync_teardown = false;
FILLZERO(stream->dc);
FILLZERO(stream->hdr);
@@ -297,10 +303,9 @@ void libxl__stream_read_start_checkpoint(libxl__egc *egc,
libxl__stream_read_state *stream)
{
assert(stream->running);
- assert(!stream->in_checkpoint);
+ assert(stream->phase == SRS_PHASE_NORMAL);
- stream->in_checkpoint = true;
- stream->phase = SRS_PHASE_BUFFERING;
+ stream->phase = SRS_PHASE_CHECKPOINT_BUFFERING;
/*
* Libxc has handed control of the fd to us. Start reading some
@@ -392,6 +397,7 @@ static void stream_continue(libxl__egc *egc,
switch (stream->phase) {
case SRS_PHASE_NORMAL:
+ case SRS_PHASE_CHECKPOINT_STATE:
/*
* Normal phase (regular migration or restore from file):
*
@@ -416,9 +422,9 @@ static void stream_continue(libxl__egc *egc,
}
break;
- case SRS_PHASE_BUFFERING: {
+ case SRS_PHASE_CHECKPOINT_BUFFERING: {
/*
- * Buffering phase (checkpointed streams only):
+ * Buffering phase:
*
* logically:
* do { read_record(); } while ( not CHECKPOINT_END );
@@ -431,8 +437,6 @@ static void stream_continue(libxl__egc *egc,
libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST(
&stream->record_queue, libxl__sr_record_buf, entry);
- assert(stream->in_checkpoint);
-
if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) {
setup_read_record(egc, stream);
break;
@@ -442,19 +446,18 @@ static void stream_continue(libxl__egc *egc,
* There are now some number of buffered records, with a
* CHECKPOINT_END at the end. Start processing them all.
*/
- stream->phase = SRS_PHASE_UNBUFFERING;
+ stream->phase = SRS_PHASE_CHECKPOINT_UNBUFFERING;
}
/* FALLTHROUGH */
- case SRS_PHASE_UNBUFFERING:
+ case SRS_PHASE_CHECKPOINT_UNBUFFERING:
/*
- * Unbuffering phase (checkpointed streams only):
+ * Unbuffering phase:
*
* logically:
* do { process_record(); } while ( not CHECKPOINT_END );
*
* Process all records collected during the buffering phase.
*/
- assert(stream->in_checkpoint);
while (process_record(egc, stream))
; /*
@@ -625,7 +628,7 @@ static bool process_record(libxl__egc *egc,
break;
case REC_TYPE_CHECKPOINT_END:
- if (!stream->in_checkpoint) {
+ if (!stream_in_checkpoint(stream)) {
LOG(ERROR, "Unexpected CHECKPOINT_END record in stream");
rc = ERROR_FAIL;
goto err;
@@ -634,7 +637,7 @@ static bool process_record(libxl__egc *egc,
break;
case REC_TYPE_CHECKPOINT_STATE:
- if (!stream->in_checkpoint_state) {
+ if (stream->phase != SRS_PHASE_CHECKPOINT_STATE) {
LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream");
rc = ERROR_FAIL;
goto err;
@@ -743,7 +746,12 @@ static void stream_complete(libxl__egc *egc,
{
assert(stream->running);
- if (stream->in_checkpoint) {
+ switch (stream->phase) {
+ case SRS_PHASE_NORMAL:
+ stream_done(egc, stream, rc);
+ break;
+ case SRS_PHASE_CHECKPOINT_BUFFERING:
+ case SRS_PHASE_CHECKPOINT_UNBUFFERING:
assert(rc);
/*
@@ -752,10 +760,8 @@ static void stream_complete(libxl__egc *egc,
* libxl__xc_domain_restore_done()
*/
checkpoint_done(egc, stream, rc);
- return;
- }
-
- if (stream->in_checkpoint_state) {
+ break;
+ case SRS_PHASE_CHECKPOINT_STATE:
assert(rc);
/*
@@ -767,10 +773,8 @@ static void stream_complete(libxl__egc *egc,
* libxl__stream_read_abort()
*/
checkpoint_state_done(egc, stream, rc);
- return;
+ break;
}
-
- stream_done(egc, stream, rc);
}
static void checkpoint_done(libxl__egc *egc,
@@ -778,18 +782,17 @@ static void checkpoint_done(libxl__egc *egc,
{
int ret;
- assert(stream->in_checkpoint);
+ assert(stream_in_checkpoint(stream));
if (rc == 0)
ret = XGR_CHECKPOINT_SUCCESS;
- else if (stream->phase == SRS_PHASE_BUFFERING)
+ else if (stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING)
ret = XGR_CHECKPOINT_FAILOVER;
else
ret = XGR_CHECKPOINT_ERROR;
stream->checkpoint_callback(egc, stream, ret);
- stream->in_checkpoint = false;
stream->phase = SRS_PHASE_NORMAL;
}
@@ -799,8 +802,7 @@ static void stream_done(libxl__egc *egc,
libxl__sr_record_buf *rec, *trec;
assert(stream->running);
- assert(!stream->in_checkpoint);
- assert(!stream->in_checkpoint_state);
+ assert(stream->phase == SRS_PHASE_NORMAL);
stream->running = false;
if (stream->incoming_record)
@@ -955,9 +957,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc,
libxl__stream_read_state *stream)
{
assert(stream->running);
- assert(!stream->in_checkpoint);
- assert(!stream->in_checkpoint_state);
- stream->in_checkpoint_state = true;
+ assert(stream->phase == SRS_PHASE_NORMAL);
+ stream->phase = SRS_PHASE_CHECKPOINT_STATE;
setup_read_record(egc, stream);
}
@@ -965,8 +966,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc,
static void checkpoint_state_done(libxl__egc *egc,
libxl__stream_read_state *stream, int rc)
{
- assert(stream->in_checkpoint_state);
- stream->in_checkpoint_state = false;
+ assert(stream->phase == SRS_PHASE_CHECKPOINT_STATE);
+ stream->phase = SRS_PHASE_NORMAL;
stream->checkpoint_callback(egc, stream, rc);
}
As the previous patch did for libxl_stream_write, do for libxl_stream_read. libxl_stream_read already has a notion of phase for its record-buffering behaviour - this is combined with the callback chain phase. Again, this is done to support the addition of a new callback chain for postcopy live migration. No functional change. Signed-off-by: Joshua Otto <jtotto@uwaterloo.ca> --- tools/libxl/libxl_internal.h | 7 ++-- tools/libxl/libxl_stream_read.c | 83 +++++++++++++++++++++-------------------- 2 files changed, 45 insertions(+), 45 deletions(-)