diff mbox series

[08/10] tools/xenstored: Extend restore code to handle multiple input buffer

Message ID 20210616144324.31652-9-julien@xen.org (mailing list archive)
State New, archived
Headers show
Series tools/xenstored: Bug fixes + Improve Live-Update | expand

Commit Message

Julien Grall June 16, 2021, 2:43 p.m. UTC
From: Julien Grall <jgrall@amazon.com>

Currently, the restore code is considering the stream will contain at
most one in-flight request per connection. In a follow-up changes, we
will want to transfer multiple in-flight requests.

The function read_state_buffered() is now extended to restore multiple
in-flight request. Complete requests will be queued as delayed
requests, if there a partial request (only the last one can) then it
will used as the current in-flight request.

Note that we want to bypass the quota check for delayed requests as
the new Xenstore may have a lower limit.

Lastly, there is no need to change the specification as there was
no restriction on the number of in-flight requests preserved.

Signed-off-by: Julien Grall <jgrall@amazon.com>
---
 tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
 1 file changed, 48 insertions(+), 8 deletions(-)

Comments

Luca Fancellu June 21, 2021, 9:21 a.m. UTC | #1
> On 16 Jun 2021, at 15:43, Julien Grall <julien@xen.org> wrote:
> 
> From: Julien Grall <jgrall@amazon.com>
> 
> Currently, the restore code is considering the stream will contain at
> most one in-flight request per connection. In a follow-up changes, we
> will want to transfer multiple in-flight requests.
> 
> The function read_state_buffered() is now extended to restore multiple
> in-flight request. Complete requests will be queued as delayed
> requests, if there a partial request (only the last one can) then it
> will used as the current in-flight request.
> 
> Note that we want to bypass the quota check for delayed requests as
> the new Xenstore may have a lower limit.
> 
> Lastly, there is no need to change the specification as there was
> no restriction on the number of in-flight requests preserved.
> 
> Signed-off-by: Julien Grall <jgrall@amazon.com>

Reviewed-by: Luca Fancellu <luca.fancellu@arm.com>

> ---
> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
> 1 file changed, 48 insertions(+), 8 deletions(-)
> 
> diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
> index a5084a5b173d..5b7ab7f74013 100644
> --- a/tools/xenstore/xenstored_core.c
> +++ b/tools/xenstore/xenstored_core.c
> @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> 	enum xsd_sockmsg_type type = in->hdr.msg.type;
> 	int ret;
> 
> +	/* At least send_error() and send_reply() expects conn->in == in */
> +	assert(conn->in == in);
> +	trace_io(conn, in, 0);
> +
> 	if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) {
> 		eprintf("Client unknown operation %i", type);
> 		send_error(conn, ENOSYS);
> @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> 	conn->transaction = NULL;
> }
> 
> +static bool process_delayed_message(struct delayed_request *req)
> +{
> +	struct connection *conn = req->data;
> +	struct buffered_data *saved_in = conn->in;
> +
> +	/*
> +	 * Part of process_message() expects conn->in to contains the
> +	 * processed response. So save the current conn->in and restore it
> +	 * afterwards.
> +	 */
> +	conn->in = req->in;
> +	process_message(req->data, req->in);
> +	conn->in = saved_in;
> +
> +	return true;
> +}
> +
> static void consider_message(struct connection *conn)
> {
> 	if (verbose)
> @@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn)
> 	if (in->used != in->hdr.msg.len)
> 		return;
> 
> -	trace_io(conn, in, 0);
> 	consider_message(conn);
> 	return;
> 
> @@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
> 	unsigned int len;
> 	bool partial = sc->data_resp_len;
> 
> -	if (sc->data_in_len) {
> +	for (data = sc->data; data < sc->data + sc->data_in_len; data += len) {
> 		bdata = new_buffer(conn);
> 		if (!bdata)
> 			barf("error restoring read data");
> -		if (sc->data_in_len < sizeof(bdata->hdr)) {
> +
> +		/*
> +		 * We don't know yet if there is more than one message
> +		 * to process. So the len is the size of the leftover data.
> +		 */
> +		len = sc->data_in_len - (data - sc->data);
> +		if (len < sizeof(bdata->hdr)) {
> 			bdata->inhdr = true;
> -			memcpy(&bdata->hdr, sc->data, sc->data_in_len);
> -			bdata->used = sc->data_in_len;
> +			memcpy(&bdata->hdr, sc->data, len);
> +			bdata->used = len;
> 		} else {
> 			bdata->inhdr = false;
> 			memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr));
> @@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
> 							bdata->hdr.msg.len);
> 			if (!bdata->buffer)
> 				barf("Error allocating in buffer");
> -			bdata->used = sc->data_in_len - sizeof(bdata->hdr);
> -			memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr),
> +			bdata->used = min_t(unsigned int,
> +					    len - sizeof(bdata->hdr),
> +					    bdata->hdr.msg.len);
> +			memcpy(bdata->buffer, data + sizeof(bdata->hdr),
> 			       bdata->used);
> +			/* Update len to match the size of the message. */
> +			len = bdata->used + sizeof(bdata->hdr);
> 		}
> 
> -		conn->in = bdata;
> +		/*
> +		 * If the message is not complete, then it means this was
> +		 * the current processed message. All the other messages
> +		 * will be queued to be handled after restoring.
> +		 */
> +		if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) {
> +			assert(conn->in == NULL);
> +			conn->in = bdata;
> +		} else if (delay_request(conn, bdata, process_delayed_message,
> +					 conn, true))
> +			barf("Unable to delay the request");
> 	}
> 
> 	for (data = sc->data + sc->data_in_len;
> -- 
> 2.17.1
> 
>
Jürgen Groß June 24, 2021, 8:30 a.m. UTC | #2
On 16.06.21 16:43, Julien Grall wrote:
> From: Julien Grall <jgrall@amazon.com>
> 
> Currently, the restore code is considering the stream will contain at
> most one in-flight request per connection. In a follow-up changes, we
> will want to transfer multiple in-flight requests.
> 
> The function read_state_buffered() is now extended to restore multiple
> in-flight request. Complete requests will be queued as delayed
> requests, if there a partial request (only the last one can) then it
> will used as the current in-flight request.
> 
> Note that we want to bypass the quota check for delayed requests as
> the new Xenstore may have a lower limit.
> 
> Lastly, there is no need to change the specification as there was
> no restriction on the number of in-flight requests preserved.
> 
> Signed-off-by: Julien Grall <jgrall@amazon.com>
> ---
>   tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
>   1 file changed, 48 insertions(+), 8 deletions(-)
> 
> diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
> index a5084a5b173d..5b7ab7f74013 100644
> --- a/tools/xenstore/xenstored_core.c
> +++ b/tools/xenstore/xenstored_core.c
> @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in)
>   	enum xsd_sockmsg_type type = in->hdr.msg.type;
>   	int ret;
>   
> +	/* At least send_error() and send_reply() expects conn->in == in */
> +	assert(conn->in == in);
> +	trace_io(conn, in, 0);
> +
>   	if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) 
{
>   		eprintf("Client unknown operation %i", type);
>   		send_error(conn, ENOSYS);
> @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in)
>   	conn->transaction = NULL;
>   }
>   
> +static bool process_delayed_message(struct delayed_request *req)
> +{
> +	struct connection *conn = req->data;
> +	struct buffered_data *saved_in = conn->in;
> +
> +	/*
> +	 * Part of process_message() expects conn->in to contains the
> +	 * processed response. So save the current conn->in and restore it
> +	 * afterwards.
> +	 */
> +	conn->in = req->in;
> +	process_message(req->data, req->in);
> +	conn->in = saved_in;

This pattern was added to do_lu_start() already, and it will be needed
for each other function being called via call_delayed().

Maybe it would be better to add conn explicitly to struct
delayed_request (or even better: replace data with conn) and to do the
conn->in saving/setting/restoring in call_delayed() instead?

Other than that:

Reviewed-by: Juergen Gross <jgross@suse.com>


Juergen
Julien Grall June 24, 2021, 8:42 a.m. UTC | #3
Hi Juergen,

On 24/06/2021 10:30, Juergen Gross wrote:
> On 16.06.21 16:43, Julien Grall wrote:
>> From: Julien Grall <jgrall@amazon.com>
>>
>> Currently, the restore code is considering the stream will contain at
>> most one in-flight request per connection. In a follow-up changes, we
>> will want to transfer multiple in-flight requests.
>>
>> The function read_state_buffered() is now extended to restore multiple
>> in-flight request. Complete requests will be queued as delayed
>> requests, if there a partial request (only the last one can) then it
>> will used as the current in-flight request.
>>
>> Note that we want to bypass the quota check for delayed requests as
>> the new Xenstore may have a lower limit.
>>
>> Lastly, there is no need to change the specification as there was
>> no restriction on the number of in-flight requests preserved.
>>
>> Signed-off-by: Julien Grall <jgrall@amazon.com>
>> ---
>>   tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
>>   1 file changed, 48 insertions(+), 8 deletions(-)
>>
>> diff --git a/tools/xenstore/xenstored_core.c 
>> b/tools/xenstore/xenstored_core.c
>> index a5084a5b173d..5b7ab7f74013 100644
>> --- a/tools/xenstore/xenstored_core.c
>> +++ b/tools/xenstore/xenstored_core.c
>> @@ -1486,6 +1486,10 @@ static void process_message(struct connection 
>> *conn, struct buffered_data *in)
>>       enum xsd_sockmsg_type type = in->hdr.msg.type;
>>       int ret;
>> +    /* At least send_error() and send_reply() expects conn->in == in */
>> +    assert(conn->in == in);
>> +    trace_io(conn, in, 0);
>> +
>>       if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) 
> {
>>           eprintf("Client unknown operation %i", type);
>>           send_error(conn, ENOSYS);
>> @@ -1515,6 +1519,23 @@ static void process_message(struct connection 
>> *conn, struct buffered_data *in)
>>       conn->transaction = NULL;
>>   }
>> +static bool process_delayed_message(struct delayed_request *req)
>> +{
>> +    struct connection *conn = req->data;
>> +    struct buffered_data *saved_in = conn->in;
>> +
>> +    /*
>> +     * Part of process_message() expects conn->in to contains the
>> +     * processed response. So save the current conn->in and restore it
>> +     * afterwards.
>> +     */
>> +    conn->in = req->in;
>> +    process_message(req->data, req->in);
>> +    conn->in = saved_in;
> 
> This pattern was added to do_lu_start() already, and it will be needed
> for each other function being called via call_delayed().
> 
> Maybe it would be better to add conn explicitly to struct
> delayed_request (or even better: replace data with conn) and to do the
> conn->in saving/setting/restoring in call_delayed() instead?

This was my original approach, but I abandoned it because in the case of 
do_lu_start() we need to save the original conn->in in the stream (see 
patch #3 for more details).

If we overwrite conn->in in call_delayed(), then we need a different way 
to find the original conn->in. I couldn't find a nice way and decided to 
settle with the duplication.

Cheers,
Jürgen Groß June 24, 2021, 9:20 a.m. UTC | #4
On 24.06.21 10:42, Julien Grall wrote:
> Hi Juergen,
> 
> On 24/06/2021 10:30, Juergen Gross wrote:
>> On 16.06.21 16:43, Julien Grall wrote:
>>> From: Julien Grall <jgrall@amazon.com>
>>>
>>> Currently, the restore code is considering the stream will contain at
>>> most one in-flight request per connection. In a follow-up changes, we
>>> will want to transfer multiple in-flight requests.
>>>
>>> The function read_state_buffered() is now extended to restore multiple
>>> in-flight request. Complete requests will be queued as delayed
>>> requests, if there a partial request (only the last one can) then it
>>> will used as the current in-flight request.
>>>
>>> Note that we want to bypass the quota check for delayed requests as
>>> the new Xenstore may have a lower limit.
>>>
>>> Lastly, there is no need to change the specification as there was
>>> no restriction on the number of in-flight requests preserved.
>>>
>>> Signed-off-by: Julien Grall <jgrall@amazon.com>
>>> ---
>>>   tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
>>>   1 file changed, 48 insertions(+), 8 deletions(-)
>>>
>>> diff --git a/tools/xenstore/xenstored_core.c 
>>> b/tools/xenstore/xenstored_core.c
>>> index a5084a5b173d..5b7ab7f74013 100644
>>> --- a/tools/xenstore/xenstored_core.c
>>> +++ b/tools/xenstore/xenstored_core.c
>>> @@ -1486,6 +1486,10 @@ static void process_message(struct connection 
>>> *conn, struct buffered_data *in)
>>>       enum xsd_sockmsg_type type = in->hdr.msg.type;
>>>       int ret;
>>> +    /* At least send_error() and send_reply() expects 
conn->in == in */
>>> +    assert(conn->in == in);
>>> +    trace_io(conn, in, 0);
>>> +
>>>       if ((unsigned int)type >= XS_TYPE_COUNT|| !wire_funcs[type].func) 
>> {
>>>           eprintf("Client unknown operation %i", type);
>>>           send_error(conn, ENOSYS);
>>> @@ -1515,6 +1519,23 @@ static void process_message(struct connection 
>>> *conn, struct buffered_data *in)
>>>       conn->transaction = NULL;
>>>   }
>>> +static bool process_delayed_message(struct delayed_request *req)
>>> +{
>>> +    struct connection *conn = req->data;
>>> +    struct buffered_data *saved_in = conn->in;
>>> +
>>> +    /*
>>> +     * Part of process_message() expects conn->in to contains the
>>> +     * processed response. So save the current conn->in and restore it
>>> +     * afterwards.
>>> +     */
>>> +    conn->in = req->in;
>>> +    process_message(req->data, req->in);
>>> +    conn->in = saved_in;
>>
>> This pattern was added to do_lu_start() already, and it will be needed
>> for each other function being called via call_delayed().
>>
>> Maybe it would be better to add conn explicitly to struct
>> delayed_request (or even better: replace data with conn) and to do the
>> conn->in saving/setting/restoring in call_delayed() instead?
> 
> This was my original approach, but I abandoned it because in the case of 
> do_lu_start() we need to save the original conn->in in the stream (see 
> patch #3 for more details).
> 
> If we overwrite conn->in in call_delayed(), then we need a different way 
> to find the original conn->in. I couldn't find a nice way and decided to 
> settle with the duplication.

Ah, okay, understood. Even in case we'd be able to restore conn->in it
would be the same amount of code again, but harder to follow.


Juergen
diff mbox series

Patch

diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
index a5084a5b173d..5b7ab7f74013 100644
--- a/tools/xenstore/xenstored_core.c
+++ b/tools/xenstore/xenstored_core.c
@@ -1486,6 +1486,10 @@  static void process_message(struct connection *conn, struct buffered_data *in)
 	enum xsd_sockmsg_type type = in->hdr.msg.type;
 	int ret;
 
+	/* At least send_error() and send_reply() expects conn->in == in */
+	assert(conn->in == in);
+	trace_io(conn, in, 0);
+
 	if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) {
 		eprintf("Client unknown operation %i", type);
 		send_error(conn, ENOSYS);
@@ -1515,6 +1519,23 @@  static void process_message(struct connection *conn, struct buffered_data *in)
 	conn->transaction = NULL;
 }
 
+static bool process_delayed_message(struct delayed_request *req)
+{
+	struct connection *conn = req->data;
+	struct buffered_data *saved_in = conn->in;
+
+	/*
+	 * Part of process_message() expects conn->in to contains the
+	 * processed response. So save the current conn->in and restore it
+	 * afterwards.
+	 */
+	conn->in = req->in;
+	process_message(req->data, req->in);
+	conn->in = saved_in;
+
+	return true;
+}
+
 static void consider_message(struct connection *conn)
 {
 	if (verbose)
@@ -1582,7 +1603,6 @@  static void handle_input(struct connection *conn)
 	if (in->used != in->hdr.msg.len)
 		return;
 
-	trace_io(conn, in, 0);
 	consider_message(conn);
 	return;
 
@@ -2611,14 +2631,20 @@  void read_state_buffered_data(const void *ctx, struct connection *conn,
 	unsigned int len;
 	bool partial = sc->data_resp_len;
 
-	if (sc->data_in_len) {
+	for (data = sc->data; data < sc->data + sc->data_in_len; data += len) {
 		bdata = new_buffer(conn);
 		if (!bdata)
 			barf("error restoring read data");
-		if (sc->data_in_len < sizeof(bdata->hdr)) {
+
+		/*
+		 * We don't know yet if there is more than one message
+		 * to process. So the len is the size of the leftover data.
+		 */
+		len = sc->data_in_len - (data - sc->data);
+		if (len < sizeof(bdata->hdr)) {
 			bdata->inhdr = true;
-			memcpy(&bdata->hdr, sc->data, sc->data_in_len);
-			bdata->used = sc->data_in_len;
+			memcpy(&bdata->hdr, sc->data, len);
+			bdata->used = len;
 		} else {
 			bdata->inhdr = false;
 			memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr));
@@ -2629,12 +2655,26 @@  void read_state_buffered_data(const void *ctx, struct connection *conn,
 							bdata->hdr.msg.len);
 			if (!bdata->buffer)
 				barf("Error allocating in buffer");
-			bdata->used = sc->data_in_len - sizeof(bdata->hdr);
-			memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr),
+			bdata->used = min_t(unsigned int,
+					    len - sizeof(bdata->hdr),
+					    bdata->hdr.msg.len);
+			memcpy(bdata->buffer, data + sizeof(bdata->hdr),
 			       bdata->used);
+			/* Update len to match the size of the message. */
+			len = bdata->used + sizeof(bdata->hdr);
 		}
 
-		conn->in = bdata;
+		/*
+		 * If the message is not complete, then it means this was
+		 * the current processed message. All the other messages
+		 * will be queued to be handled after restoring.
+		 */
+		if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) {
+			assert(conn->in == NULL);
+			conn->in = bdata;
+		} else if (delay_request(conn, bdata, process_delayed_message,
+					 conn, true))
+			barf("Unable to delay the request");
 	}
 
 	for (data = sc->data + sc->data_in_len;