diff mbox series

[v3,09/13] python/aqmp-tui: Add QMP connection manager

Message ID 20210730201846.5147-10-niteesh.gs@gmail.com (mailing list archive)
State New, archived
Headers show
Series AQMP TUI Draft | expand

Commit Message

Niteesh G. S. July 30, 2021, 8:18 p.m. UTC
Instead of manually connecting and disconnecting from the
server. We now rely on the runstate to manage the QMP
connection.

Along with this the ability to reconnect on certain exceptions
has also been added.

Signed-off-by: G S Niteesh Babu <niteesh.gs@gmail.com>
---
 python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++-----
 1 file changed, 94 insertions(+), 15 deletions(-)

Comments

John Snow Aug. 17, 2021, 4:50 a.m. UTC | #1
On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu <niteesh.gs@gmail.com>
wrote:

> Instead of manually connecting and disconnecting from the
> server. We now rely on the runstate to manage the QMP
> connection.
>
> Along with this the ability to reconnect on certain exceptions
> has also been added.
>
> Signed-off-by: G S Niteesh Babu <niteesh.gs@gmail.com>
> ---
>  python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++-----
>  1 file changed, 94 insertions(+), 15 deletions(-)
>
> diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
> index 0d5ec62cb7..ef91883fa5 100644
> --- a/python/qemu/aqmp/aqmp_tui.py
> +++ b/python/qemu/aqmp/aqmp_tui.py
> @@ -25,8 +25,9 @@
>  import urwid_readline
>
>  from ..qmp import QEMUMonitorProtocol, QMPBadPortError
> +from .error import ProtocolError
>  from .message import DeserializationError, Message, UnexpectedTypeError
> -from .protocol import ConnectError
> +from .protocol import ConnectError, Runstate
>  from .qmp_client import ExecInterruptedError, QMPClient
>  from .util import create_task, pretty_traceback
>
> @@ -67,12 +68,24 @@ def format_json(msg: str) -> str:
>      return ' '.join(words)
>
>
> +def type_name(mtype: Any) -> str:
> +    """
> +    Returns the type name
> +    """
> +    return type(mtype).__name__
>

This is a lot of lines for something that doesn't do very much -- do we
really need it?


> +
> +
>  class App(QMPClient):
> -    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
> +    def __init__(self, address: Union[str, Tuple[str, int]], num_retries:
> int,
> +                 retry_delay: Optional[int]) -> None:
>          urwid.register_signal(type(self), UPDATE_MSG)
>          self.window = Window(self)
>          self.address = address
>          self.aloop: Optional[Any] = None  # FIXME: Use more concrete type.
> +        self.num_retries = num_retries
> +        self.retry_delay = retry_delay
> +        self.retry: bool = False
> +        self.disconnecting: bool = False
>

Why is this one needed again ? ...


>          super().__init__()
>
>      def add_to_history(self, msg: str, level: Optional[str] = None) ->
> None:
> @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message:
>              LOGGER.info('Error server disconnected before reply')
>              urwid.emit_signal(self, UPDATE_MSG,
>                                '{"error": "Server disconnected before
> reply"}')
> -            self._set_status("Server disconnected")
> +            await self.disconnect()
>          except Exception as err:
>              LOGGER.error('Exception from _send_to_server: %s', str(err))
>              raise err
> @@ -136,15 +149,29 @@ def kill_app(self) -> None:
>          create_task(self._kill_app())
>
>      async def _kill_app(self) -> None:
> -        # It is ok to call disconnect even in disconnect state
> +        await self.disconnect()
> +        LOGGER.debug('Disconnect finished. Exiting app')
> +        raise urwid.ExitMainLoop()
> +
> +    async def disconnect(self) -> None:
> +        if self.disconnecting:
> +            return
>          try:
> -            await self.disconnect()
> -            LOGGER.debug('Disconnect finished. Exiting app')
> +            self.disconnecting = True
> +            await super().disconnect()
> +            self.retry = True
> +        except EOFError as err:
> +            LOGGER.info('disconnect: %s', type_name(err))
> +            self.retry = True
> +        except ProtocolError as err:
> +            LOGGER.info('disconnect: %s', type_name(err))
> +            self.retry = False
>          except Exception as err:
> -            LOGGER.info('_kill_app: %s', str(err))
> -            # Let the app crash after providing a proper stack trace
> +            LOGGER.error('disconnect: Unhandled exception %s', str(err))
> +            self.retry = False
>              raise err
> -        raise urwid.ExitMainLoop()
> +        finally:
> +            self.disconnecting = False
>
>      def handle_event(self, event: Message) -> None:
>          # FIXME: Consider all states present in qapi/run-state.json
> @@ -161,14 +188,61 @@ def _get_formatted_address(self) -> str:
>              addr = f'{host}:{port}'
>          return addr
>
> -    async def connect_server(self) -> None:
> +    async def _retry_connection(self) -> Optional[str]:
> +        current_retries = 0
> +        err = None
> +        # Increase in power sequence of 2 if no delay is provided
> +        cur_delay = 1
> +        inc_delay = 2
> +        if self.retry_delay:
> +            inc_delay = 1
> +            cur_delay = self.retry_delay
> +        # initial try
> +        await self.connect_server()
> +        while self.retry and current_retries < self.num_retries:
> +            LOGGER.info('Connection Failed, retrying in %d', cur_delay)
> +            status = f'[Retry #{current_retries} ({cur_delay}s)]'
> +            self._set_status(status)
> +
> +            await asyncio.sleep(cur_delay)
> +
> +            err = await self.connect_server()
> +            cur_delay *= inc_delay
> +            # Cap delay to 5mins
> +            cur_delay = min(cur_delay, 5 * 60)
> +            current_retries += 1
> +        # If all retries failed report the last error
> +        LOGGER.info('All retries failed: %s', str(err))
> +        return type_name(err)
>

I had suggested something like an exponential backoff, but maybe a constant
delay would be a little cleaner to implement for right now without getting
too fancy over it. If you go with a simpler retry algorithm, do you think
you could clean up the logic in the retry loop here a bit more?

Something like:

for _ in range(num_retries):
    try:
        whatever_you_have_to_do_to_connect()
        return
    except ConnectError as err:
        LOGGER.info(...etc)
    await asyncio.sleep(whatever_the_delay_is)
# ran out of retries here, presumably the connection manager will just go
idle until the user interferes some other way.

In particular, I think passing around the name of the exception is a little
dubious -- we should be logging with the actual Exception we've received.


> +
> +    async def manage_connection(self) -> None:
> +        while True:
> +            if self.runstate == Runstate.IDLE:
> +                LOGGER.info('Trying to reconnect')
>

But will this be true upon the very first boot? This message might not be
right.


> +                err = await self._retry_connection()
>

This seems named oddly too, since it might be the initial attempt and not
necessarily a reconnection or a retry.


> +                # If retry is still true then, we have exhausted all our
> tries.
> +                if self.retry:
> +                    self._set_status(f'Error: {err}')
>
+                else:
> +                    addr = self._get_formatted_address()
> +                    self._set_status(f'[Connected {addr}]')
> +            elif self.runstate == Runstate.DISCONNECTING:
> +                self._set_status('[Disconnected]')
> +                await self.disconnect()
> +                # check if a retry is needed
>

Is this required? I would have hoped that after calling disconnect that the
state would have again changed to IDLE and you wouldn't need this clause
here.


> +                if self.runstate == Runstate.IDLE:
> +                    continue
> +            await self.runstate_changed()
> +
> +    async def connect_server(self) -> Optional[str]:
>          try:
>              await self.connect(self.address)
> -            addr = self._get_formatted_address()
> -            self._set_status(f'Connected to {addr}')
> +            self.retry = False
>          except ConnectError as err:
>              LOGGER.info('connect_server: ConnectError %s', str(err))
> -            self._set_status('Server shutdown')
> +            self.retry = True
> +            return type_name(err)
> +        return None
>
>      def run(self, debug: bool = False) -> None:
>          screen = urwid.raw_display.Screen()
> @@ -191,7 +265,7 @@ def run(self, debug: bool = False) -> None:
>                                     event_loop=event_loop)
>
>          create_task(self.wait_for_events(), self.aloop)
> -        create_task(self.connect_server(), self.aloop)
> +        create_task(self.manage_connection(), self.aloop)
>          try:
>              main_loop.run()
>          except Exception as err:
> @@ -333,6 +407,11 @@ def main() -> None:
>      parser = argparse.ArgumentParser(description='AQMP TUI')
>      parser.add_argument('qmp_server', help='Address of the QMP server'
>                          '< UNIX socket path | TCP addr:port >')
> +    parser.add_argument('--num-retries', type=int, default=10,
> +                        help='Number of times to reconnect before giving
> up')
> +    parser.add_argument('--retry-delay', type=int,
> +                        help='Time(s) to wait before next retry.'
> +                        'Default action is to increase delay in powers of
> 2')
>      parser.add_argument('--log-file', help='The Log file name')
>      parser.add_argument('--log-level', default='WARNING',
>                          help='Log level
> <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
> @@ -348,7 +427,7 @@ def main() -> None:
>      except QMPBadPortError as err:
>          parser.error(str(err))
>
> -    app = App(address)
> +    app = App(address, args.num_retries, args.retry_delay)
>
>      if args.log_file:
>          LOGGER.addHandler(logging.FileHandler(args.log_file))
> --
> 2.17.1
>
>
Right idea overall - possibly needs some polish and to be integrated with
an earlier patch to avoid the intermediate FIXMEs.

Thanks,
--js
Niteesh G. S. Aug. 17, 2021, 7:06 p.m. UTC | #2
On Tue, Aug 17, 2021 at 10:21 AM John Snow <jsnow@redhat.com> wrote:

>
>
> On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu <niteesh.gs@gmail.com>
> wrote:
>
>> Instead of manually connecting and disconnecting from the
>> server. We now rely on the runstate to manage the QMP
>> connection.
>>
>> Along with this the ability to reconnect on certain exceptions
>> has also been added.
>>
>> Signed-off-by: G S Niteesh Babu <niteesh.gs@gmail.com>
>> ---
>>  python/qemu/aqmp/aqmp_tui.py | 109 ++++++++++++++++++++++++++++++-----
>>  1 file changed, 94 insertions(+), 15 deletions(-)
>>
>> diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
>> index 0d5ec62cb7..ef91883fa5 100644
>> --- a/python/qemu/aqmp/aqmp_tui.py
>> +++ b/python/qemu/aqmp/aqmp_tui.py
>> @@ -25,8 +25,9 @@
>>  import urwid_readline
>>
>>  from ..qmp import QEMUMonitorProtocol, QMPBadPortError
>> +from .error import ProtocolError
>>  from .message import DeserializationError, Message, UnexpectedTypeError
>> -from .protocol import ConnectError
>> +from .protocol import ConnectError, Runstate
>>  from .qmp_client import ExecInterruptedError, QMPClient
>>  from .util import create_task, pretty_traceback
>>
>> @@ -67,12 +68,24 @@ def format_json(msg: str) -> str:
>>      return ' '.join(words)
>>
>>
>> +def type_name(mtype: Any) -> str:
>> +    """
>> +    Returns the type name
>> +    """
>> +    return type(mtype).__name__
>>
>
> This is a lot of lines for something that doesn't do very much -- do we
> really need it?
>
No. This has been removed in v4.

>
>
>> +
>> +
>>  class App(QMPClient):
>> -    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
>> +    def __init__(self, address: Union[str, Tuple[str, int]],
>> num_retries: int,
>> +                 retry_delay: Optional[int]) -> None:
>>          urwid.register_signal(type(self), UPDATE_MSG)
>>          self.window = Window(self)
>>          self.address = address
>>          self.aloop: Optional[Any] = None  # FIXME: Use more concrete
>> type.
>> +        self.num_retries = num_retries
>> +        self.retry_delay = retry_delay
>> +        self.retry: bool = False
>> +        self.disconnecting: bool = False
>>
>
> Why is this one needed again ? ...
>
A race condition occurs in protocol.py line 597
The reason behind this is there are two disconnect calls initiated. The
first one via kill_app
and the second one via manage_connection when the state is set to
disconnecting by the first call.
One of the calls set's the state to IDLE(protocol.py:584) after it has
finished disconnecting, meanwhile
the second call is somehow in the process of disconnecting and assert the
state to be in DISCONNECTING
in protocol.py:597, which it is not since it has been set to IDLE by the
first call.

If I don't gaurd against the second call I get the following exception
------------------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/niteesh/development/qemu/python/.venv/bin/aqmp-tui", line 33,
in <module>
    sys.exit(load_entry_point('qemu', 'console_scripts', 'aqmp-tui')())
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
695, in main
    app.run(args.asyncio_debug)
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
444, in run
    raise err
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
441, in run
    main_loop.run()
  File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 287, in run
    self._run()
  File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 385, in _run
    self.event_loop.run()
  File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
line 1494, in run
    reraise(*exc_info)
  File
"/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py",
line 58, in reraise
    raise value
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
391, in manage_connection
    await self.disconnect()
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
312, in disconnect
    raise err
  File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
300, in disconnect
    await super().disconnect()
  File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
302, in disconnect
    await self._wait_disconnect()
  File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
583, in _wait_disconnect
    self._cleanup()
  File "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py",
line 331, in _cleanup
    super()._cleanup()
  File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
597, in _cleanup
    assert self.runstate == Runstate.DISCONNECTING
AssertionError
-------------------------------------------------------------------------------------------


>
>>          super().__init__()
>>
>>      def add_to_history(self, msg: str, level: Optional[str] = None) ->
>> None:
>> @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message:
>>              LOGGER.info('Error server disconnected before reply')
>>              urwid.emit_signal(self, UPDATE_MSG,
>>                                '{"error": "Server disconnected before
>> reply"}')
>> -            self._set_status("Server disconnected")
>> +            await self.disconnect()
>>          except Exception as err:
>>              LOGGER.error('Exception from _send_to_server: %s', str(err))
>>              raise err
>> @@ -136,15 +149,29 @@ def kill_app(self) -> None:
>>          create_task(self._kill_app())
>>
>>      async def _kill_app(self) -> None:
>> -        # It is ok to call disconnect even in disconnect state
>> +        await self.disconnect()
>> +        LOGGER.debug('Disconnect finished. Exiting app')
>> +        raise urwid.ExitMainLoop()
>> +
>> +    async def disconnect(self) -> None:
>> +        if self.disconnecting:
>> +            return
>>          try:
>> -            await self.disconnect()
>> -            LOGGER.debug('Disconnect finished. Exiting app')
>> +            self.disconnecting = True
>> +            await super().disconnect()
>> +            self.retry = True
>> +        except EOFError as err:
>> +            LOGGER.info('disconnect: %s', type_name(err))
>> +            self.retry = True
>> +        except ProtocolError as err:
>> +            LOGGER.info('disconnect: %s', type_name(err))
>> +            self.retry = False
>>          except Exception as err:
>> -            LOGGER.info('_kill_app: %s', str(err))
>> -            # Let the app crash after providing a proper stack trace
>> +            LOGGER.error('disconnect: Unhandled exception %s', str(err))
>> +            self.retry = False
>>              raise err
>> -        raise urwid.ExitMainLoop()
>> +        finally:
>> +            self.disconnecting = False
>>
>>      def handle_event(self, event: Message) -> None:
>>          # FIXME: Consider all states present in qapi/run-state.json
>> @@ -161,14 +188,61 @@ def _get_formatted_address(self) -> str:
>>              addr = f'{host}:{port}'
>>          return addr
>>
>> -    async def connect_server(self) -> None:
>> +    async def _retry_connection(self) -> Optional[str]:
>> +        current_retries = 0
>> +        err = None
>> +        # Increase in power sequence of 2 if no delay is provided
>> +        cur_delay = 1
>> +        inc_delay = 2
>> +        if self.retry_delay:
>> +            inc_delay = 1
>> +            cur_delay = self.retry_delay
>> +        # initial try
>> +        await self.connect_server()
>> +        while self.retry and current_retries < self.num_retries:
>> +            LOGGER.info('Connection Failed, retrying in %d', cur_delay)
>> +            status = f'[Retry #{current_retries} ({cur_delay}s)]'
>> +            self._set_status(status)
>> +
>> +            await asyncio.sleep(cur_delay)
>> +
>> +            err = await self.connect_server()
>> +            cur_delay *= inc_delay
>> +            # Cap delay to 5mins
>> +            cur_delay = min(cur_delay, 5 * 60)
>> +            current_retries += 1
>> +        # If all retries failed report the last error
>> +        LOGGER.info('All retries failed: %s', str(err))
>> +        return type_name(err)
>>
>
> I had suggested something like an exponential backoff, but maybe a
> constant delay would be a little cleaner to implement for right now without
> getting too fancy over it. If you go with a simpler retry algorithm, do you
> think you could clean up the logic in the retry loop here a bit more?
>
Yes, we can. I'll refactor it to constant delay.


> Something like:
>
> for _ in range(num_retries):
>     try:
>         whatever_you_have_to_do_to_connect()
>         return
>     except ConnectError as err:
>         LOGGER.info(...etc)
>     await asyncio.sleep(whatever_the_delay_is)
> # ran out of retries here, presumably the connection manager will just go
> idle until the user interferes some other way.
>

> In particular, I think passing around the name of the exception is a
> little dubious -- we should be logging with the actual Exception we've
> received.
>
This has been fixed in V4. We pass the exception now instead of just
passing around the name.

>
>
>> +
>> +    async def manage_connection(self) -> None:
>> +        while True:
>> +            if self.runstate == Runstate.IDLE:
>> +                LOGGER.info('Trying to reconnect')
>>
>
> But will this be true upon the very first boot? This message might not be
> right.
>
Yes, it also occurs in the first boot. I'll fix this in the V3.

>
>
>> +                err = await self._retry_connection()
>>
>
> This seems named oddly too, since it might be the initial attempt and not
> necessarily a reconnection or a retry.
>
Will fix that.

>
>
>> +                # If retry is still true then, we have exhausted all our
>> tries.
>> +                if self.retry:
>> +                    self._set_status(f'Error: {err}')
>>
> +                else:
>> +                    addr = self._get_formatted_address()
>> +                    self._set_status(f'[Connected {addr}]')
>> +            elif self.runstate == Runstate.DISCONNECTING:
>> +                self._set_status('[Disconnected]')
>> +                await self.disconnect()
>> +                # check if a retry is needed
>>
>
> Is this required? I would have hoped that after calling disconnect that
> the state would have again changed to IDLE and you wouldn't need this
> clause here.
>
After you mentioned it I too felt it was redundant. But on removing it the
whole app freezes when trying to exit.
I logged the state after the call to disconnect, instead of being in the
IDLE state, it is still in DISCONNECTING state.
I suspect this results in the constant infinite looping which doesn't give
other coroutines a chance to run and blocks
the event loop thus resulting in the freezing of the app. But I am not sure
why the state isn't changing to IDLE.

>
>
>> +                if self.runstate == Runstate.IDLE:
>> +                    continue
>> +            await self.runstate_changed()
>> +
>> +    async def connect_server(self) -> Optional[str]:
>>          try:
>>              await self.connect(self.address)
>> -            addr = self._get_formatted_address()
>> -            self._set_status(f'Connected to {addr}')
>> +            self.retry = False
>>          except ConnectError as err:
>>              LOGGER.info('connect_server: ConnectError %s', str(err))
>> -            self._set_status('Server shutdown')
>> +            self.retry = True
>> +            return type_name(err)
>> +        return None
>>
>>      def run(self, debug: bool = False) -> None:
>>          screen = urwid.raw_display.Screen()
>> @@ -191,7 +265,7 @@ def run(self, debug: bool = False) -> None:
>>                                     event_loop=event_loop)
>>
>>          create_task(self.wait_for_events(), self.aloop)
>> -        create_task(self.connect_server(), self.aloop)
>> +        create_task(self.manage_connection(), self.aloop)
>>          try:
>>              main_loop.run()
>>          except Exception as err:
>> @@ -333,6 +407,11 @@ def main() -> None:
>>      parser = argparse.ArgumentParser(description='AQMP TUI')
>>      parser.add_argument('qmp_server', help='Address of the QMP server'
>>                          '< UNIX socket path | TCP addr:port >')
>> +    parser.add_argument('--num-retries', type=int, default=10,
>> +                        help='Number of times to reconnect before giving
>> up')
>> +    parser.add_argument('--retry-delay', type=int,
>> +                        help='Time(s) to wait before next retry.'
>> +                        'Default action is to increase delay in powers
>> of 2')
>>      parser.add_argument('--log-file', help='The Log file name')
>>      parser.add_argument('--log-level', default='WARNING',
>>                          help='Log level
>> <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
>> @@ -348,7 +427,7 @@ def main() -> None:
>>      except QMPBadPortError as err:
>>          parser.error(str(err))
>>
>> -    app = App(address)
>> +    app = App(address, args.num_retries, args.retry_delay)
>>
>>      if args.log_file:
>>          LOGGER.addHandler(logging.FileHandler(args.log_file))
>> --
>> 2.17.1
>>
>>
> Right idea overall - possibly needs some polish and to be integrated with
> an earlier patch to avoid the intermediate FIXMEs.
>
> Thanks,
> --js
>
John Snow Aug. 18, 2021, 7:36 p.m. UTC | #3
On Tue, Aug 17, 2021 at 3:07 PM Niteesh G. S. <niteesh.gs@gmail.com> wrote:

>
>
> On Tue, Aug 17, 2021 at 10:21 AM John Snow <jsnow@redhat.com> wrote:
>
>>
>>
>> On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu <niteesh.gs@gmail.com>
>> wrote:
>>
>>
[...]


>
>>
>>> +
>>> +
>>>  class App(QMPClient):
>>> -    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
>>> +    def __init__(self, address: Union[str, Tuple[str, int]],
>>> num_retries: int,
>>> +                 retry_delay: Optional[int]) -> None:
>>>          urwid.register_signal(type(self), UPDATE_MSG)
>>>          self.window = Window(self)
>>>          self.address = address
>>>          self.aloop: Optional[Any] = None  # FIXME: Use more concrete
>>> type.
>>> +        self.num_retries = num_retries
>>> +        self.retry_delay = retry_delay
>>> +        self.retry: bool = False
>>> +        self.disconnecting: bool = False
>>>
>>
>> Why is this one needed again ? ...
>>
>

> A race condition occurs in protocol.py line 597
> The reason behind this is there are two disconnect calls initiated. The
> first one via kill_app
> and the second one via manage_connection when the state is set to
> disconnecting by the first call.
> One of the calls set's the state to IDLE(protocol.py:584) after it has
> finished disconnecting, meanwhile
> the second call is somehow in the process of disconnecting and assert the
> state to be in DISCONNECTING
> in protocol.py:597, which it is not since it has been set to IDLE by the
> first call.
>
> If I don't gaurd against the second call I get the following exception
>
> ------------------------------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/home/niteesh/development/qemu/python/.venv/bin/aqmp-tui", line
> 33, in <module>
>     sys.exit(load_entry_point('qemu', 'console_scripts', 'aqmp-tui')())
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 695, in main
>     app.run(args.asyncio_debug)
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 444, in run
>     raise err
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 441, in run
>     main_loop.run()
>   File
> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
> line 287, in run
>     self._run()
>   File
> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
> line 385, in _run
>     self.event_loop.run()
>   File
> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/main_loop.py",
> line 1494, in run
>     reraise(*exc_info)
>   File
> "/home/niteesh/development/qemu/python/.venv/lib/python3.6/site-packages/urwid/compat.py",
> line 58, in reraise
>     raise value
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 391, in manage_connection
>     await self.disconnect()
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 312, in disconnect
>     raise err
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/aqmp_tui.py", line
> 300, in disconnect
>     await super().disconnect()
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
> 302, in disconnect
>     await self._wait_disconnect()
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
> 583, in _wait_disconnect
>     self._cleanup()
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/qmp_client.py",
> line 331, in _cleanup
>     super()._cleanup()
>   File "/home/niteesh/development/qemu/python/qemu/aqmp/protocol.py", line
> 597, in _cleanup
>     assert self.runstate == Runstate.DISCONNECTING
> AssertionError
>
> -------------------------------------------------------------------------------------------
>

Hm, OK. I'm not sure if this is a bug on my part or not yet, I'll
investigate.


>      def add_to_history(self, msg: str, level: Optional[str] = None) ->
>>> None:
>>> @@ -119,7 +132,7 @@ def _cb_inbound(self, msg: Message) -> Message:
>>>              LOGGER.info('Error server disconnected before reply')
>>>              urwid.emit_signal(self, UPDATE_MSG,
>>>                                '{"error": "Server disconnected before
>>> reply"}')
>>> -            self._set_status("Server disconnected")
>>> +            await self.disconnect()
>>>          except Exception as err:
>>>              LOGGER.error('Exception from _send_to_server: %s', str(err))
>>>              raise err
>>> @@ -136,15 +149,29 @@ def kill_app(self) -> None:
>>>          create_task(self._kill_app())
>>
>> Is this required? I would have hoped that after calling disconnect that
>> the state would have again changed to IDLE and you wouldn't need this
>> clause here.
>>
> After you mentioned it I too felt it was redundant. But on removing it the
> whole app freezes when trying to exit.
> I logged the state after the call to disconnect, instead of being in the
> IDLE state, it is still in DISCONNECTING state.
> I suspect this results in the constant infinite looping which doesn't give
> other coroutines a chance to run and blocks
> the event loop thus resulting in the freezing of the app. But I am not
> sure why the state isn't changing to IDLE.
>

Hmm ... That may well be a bug in AQMP then. I will investigate.
John Snow Aug. 20, 2021, 7:31 p.m. UTC | #4
On Wed, Aug 18, 2021 at 3:36 PM John Snow <jsnow@redhat.com> wrote:

> On Tue, Aug 17, 2021 at 3:07 PM Niteesh G. S. <niteesh.gs@gmail.com>
> wrote:
>
>> On Tue, Aug 17, 2021 at 10:21 AM John Snow <jsnow@redhat.com> wrote:
>>
>>> On Fri, Jul 30, 2021 at 4:19 PM G S Niteesh Babu <niteesh.gs@gmail.com>
>>> wrote:
>>>
>>
> Is this required? I would have hoped that after calling disconnect that
>>> the state would have again changed to IDLE and you wouldn't need this
>>> clause here.
>>>
>> After you mentioned it I too felt it was redundant. But on removing it
>> the whole app freezes when trying to exit.
>> I logged the state after the call to disconnect, instead of being in the
>> IDLE state, it is still in DISCONNECTING state.
>> I suspect this results in the constant infinite looping which doesn't
>> give other coroutines a chance to run and blocks
>> the event loop thus resulting in the freezing of the app. But I am not
>> sure why the state isn't changing to IDLE.
>>
>
> Hmm ... That may well be a bug in AQMP then. I will investigate.
>

No, it's not -- It's just tricky. The "problem" is that the
runstate_changed() event only returns once the runstate has changed *after*
you await it. So this code is perfectly correct and I am just bad at
reading.

--js
diff mbox series

Patch

diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
index 0d5ec62cb7..ef91883fa5 100644
--- a/python/qemu/aqmp/aqmp_tui.py
+++ b/python/qemu/aqmp/aqmp_tui.py
@@ -25,8 +25,9 @@ 
 import urwid_readline
 
 from ..qmp import QEMUMonitorProtocol, QMPBadPortError
+from .error import ProtocolError
 from .message import DeserializationError, Message, UnexpectedTypeError
-from .protocol import ConnectError
+from .protocol import ConnectError, Runstate
 from .qmp_client import ExecInterruptedError, QMPClient
 from .util import create_task, pretty_traceback
 
@@ -67,12 +68,24 @@  def format_json(msg: str) -> str:
     return ' '.join(words)
 
 
+def type_name(mtype: Any) -> str:
+    """
+    Returns the type name
+    """
+    return type(mtype).__name__
+
+
 class App(QMPClient):
-    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
+    def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
+                 retry_delay: Optional[int]) -> None:
         urwid.register_signal(type(self), UPDATE_MSG)
         self.window = Window(self)
         self.address = address
         self.aloop: Optional[Any] = None  # FIXME: Use more concrete type.
+        self.num_retries = num_retries
+        self.retry_delay = retry_delay
+        self.retry: bool = False
+        self.disconnecting: bool = False
         super().__init__()
 
     def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
@@ -119,7 +132,7 @@  def _cb_inbound(self, msg: Message) -> Message:
             LOGGER.info('Error server disconnected before reply')
             urwid.emit_signal(self, UPDATE_MSG,
                               '{"error": "Server disconnected before reply"}')
-            self._set_status("Server disconnected")
+            await self.disconnect()
         except Exception as err:
             LOGGER.error('Exception from _send_to_server: %s', str(err))
             raise err
@@ -136,15 +149,29 @@  def kill_app(self) -> None:
         create_task(self._kill_app())
 
     async def _kill_app(self) -> None:
-        # It is ok to call disconnect even in disconnect state
+        await self.disconnect()
+        LOGGER.debug('Disconnect finished. Exiting app')
+        raise urwid.ExitMainLoop()
+
+    async def disconnect(self) -> None:
+        if self.disconnecting:
+            return
         try:
-            await self.disconnect()
-            LOGGER.debug('Disconnect finished. Exiting app')
+            self.disconnecting = True
+            await super().disconnect()
+            self.retry = True
+        except EOFError as err:
+            LOGGER.info('disconnect: %s', type_name(err))
+            self.retry = True
+        except ProtocolError as err:
+            LOGGER.info('disconnect: %s', type_name(err))
+            self.retry = False
         except Exception as err:
-            LOGGER.info('_kill_app: %s', str(err))
-            # Let the app crash after providing a proper stack trace
+            LOGGER.error('disconnect: Unhandled exception %s', str(err))
+            self.retry = False
             raise err
-        raise urwid.ExitMainLoop()
+        finally:
+            self.disconnecting = False
 
     def handle_event(self, event: Message) -> None:
         # FIXME: Consider all states present in qapi/run-state.json
@@ -161,14 +188,61 @@  def _get_formatted_address(self) -> str:
             addr = f'{host}:{port}'
         return addr
 
-    async def connect_server(self) -> None:
+    async def _retry_connection(self) -> Optional[str]:
+        current_retries = 0
+        err = None
+        # Increase in power sequence of 2 if no delay is provided
+        cur_delay = 1
+        inc_delay = 2
+        if self.retry_delay:
+            inc_delay = 1
+            cur_delay = self.retry_delay
+        # initial try
+        await self.connect_server()
+        while self.retry and current_retries < self.num_retries:
+            LOGGER.info('Connection Failed, retrying in %d', cur_delay)
+            status = f'[Retry #{current_retries} ({cur_delay}s)]'
+            self._set_status(status)
+
+            await asyncio.sleep(cur_delay)
+
+            err = await self.connect_server()
+            cur_delay *= inc_delay
+            # Cap delay to 5mins
+            cur_delay = min(cur_delay, 5 * 60)
+            current_retries += 1
+        # If all retries failed report the last error
+        LOGGER.info('All retries failed: %s', str(err))
+        return type_name(err)
+
+    async def manage_connection(self) -> None:
+        while True:
+            if self.runstate == Runstate.IDLE:
+                LOGGER.info('Trying to reconnect')
+                err = await self._retry_connection()
+                # If retry is still true then, we have exhausted all our tries.
+                if self.retry:
+                    self._set_status(f'Error: {err}')
+                else:
+                    addr = self._get_formatted_address()
+                    self._set_status(f'[Connected {addr}]')
+            elif self.runstate == Runstate.DISCONNECTING:
+                self._set_status('[Disconnected]')
+                await self.disconnect()
+                # check if a retry is needed
+                if self.runstate == Runstate.IDLE:
+                    continue
+            await self.runstate_changed()
+
+    async def connect_server(self) -> Optional[str]:
         try:
             await self.connect(self.address)
-            addr = self._get_formatted_address()
-            self._set_status(f'Connected to {addr}')
+            self.retry = False
         except ConnectError as err:
             LOGGER.info('connect_server: ConnectError %s', str(err))
-            self._set_status('Server shutdown')
+            self.retry = True
+            return type_name(err)
+        return None
 
     def run(self, debug: bool = False) -> None:
         screen = urwid.raw_display.Screen()
@@ -191,7 +265,7 @@  def run(self, debug: bool = False) -> None:
                                    event_loop=event_loop)
 
         create_task(self.wait_for_events(), self.aloop)
-        create_task(self.connect_server(), self.aloop)
+        create_task(self.manage_connection(), self.aloop)
         try:
             main_loop.run()
         except Exception as err:
@@ -333,6 +407,11 @@  def main() -> None:
     parser = argparse.ArgumentParser(description='AQMP TUI')
     parser.add_argument('qmp_server', help='Address of the QMP server'
                         '< UNIX socket path | TCP addr:port >')
+    parser.add_argument('--num-retries', type=int, default=10,
+                        help='Number of times to reconnect before giving up')
+    parser.add_argument('--retry-delay', type=int,
+                        help='Time(s) to wait before next retry.'
+                        'Default action is to increase delay in powers of 2')
     parser.add_argument('--log-file', help='The Log file name')
     parser.add_argument('--log-level', default='WARNING',
                         help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
@@ -348,7 +427,7 @@  def main() -> None:
     except QMPBadPortError as err:
         parser.error(str(err))
 
-    app = App(address)
+    app = App(address, args.num_retries, args.retry_delay)
 
     if args.log_file:
         LOGGER.addHandler(logging.FileHandler(args.log_file))