diff mbox series

[v4,7/7] python/aqmp-tui: Add QMP connection manager

Message ID 20210819173831.23515-8-niteesh.gs@gmail.com (mailing list archive)
State New, archived
Headers show
Series AQMP TUI Draft | expand

Commit Message

Niteesh G. S. Aug. 19, 2021, 5:38 p.m. UTC
The connection manager will take care of connecting/disconnecting
to the server. This will also try to reconnect to the server in
certain situations where the client has been disconnected due to
some error condition.

Signed-off-by: G S Niteesh Babu <niteesh.gs@gmail.com>
---
 python/qemu/aqmp/aqmp_tui.py | 127 +++++++++++++++++++++++++++++------
 1 file changed, 105 insertions(+), 22 deletions(-)
diff mbox series

Patch

diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
index 03d4808acd..c47abe0a25 100644
--- a/python/qemu/aqmp/aqmp_tui.py
+++ b/python/qemu/aqmp/aqmp_tui.py
@@ -35,8 +35,9 @@ 
 import urwid_readline
 
 from ..qmp import QEMUMonitorProtocol, QMPBadPortError
+from .error import ProtocolError
 from .message import DeserializationError, Message, UnexpectedTypeError
-from .protocol import ConnectError
+from .protocol import ConnectError, Runstate
 from .qmp_client import ExecInterruptedError, QMPClient
 from .util import create_task, pretty_traceback
 
@@ -128,17 +129,26 @@  class App(QMPClient):
 
     Initializes the widgets and starts the urwid event loop.
     """
-    def __init__(self, address: Union[str, Tuple[str, int]]) -> None:
+    def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
+                 retry_delay: Optional[int]) -> None:
         """
         Initializes the TUI.
 
         :param address:
             Address of the server to connect to.
+        :param num_retries:
+            The number of times to retry before stopping to reconnect.
+        :param retry_delay:
+            The delay(sec) before each retry
         """
         urwid.register_signal(type(self), UPDATE_MSG)
         self.window = Window(self)
         self.address = address
         self.aloop: Optional[asyncio.AbstractEventLoop] = None
+        self.num_retries = num_retries
+        self.retry_delay = retry_delay if retry_delay else 2
+        self.retry: bool = False
+        self.disconnecting: bool = False
         super().__init__()
 
     def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
@@ -212,10 +222,10 @@  def handle_event(self, event: Message) -> None:
         """
         try:
             await self._raw(msg, assign_id='id' not in msg)
-        except ExecInterruptedError:
-            logging.info('Error server disconnected before reply')
+        except ExecInterruptedError as err:
+            logging.info('Error server disconnected before reply %s', str(err))
             self.add_to_history('Server disconnected before reply', 'ERROR')
-            self._set_status("[Server Disconnected]")
+            await self.disconnect()
         except Exception as err:
             logging.error('Exception from _send_to_server: %s', str(err))
             raise err
@@ -237,10 +247,10 @@  def cb_send_to_server(self, raw_msg: str) -> None:
             create_task(self._send_to_server(msg))
         except (ValueError, TypeError) as err:
             logging.info('Invalid message: %s', str(err))
-            self.add_to_history(f'{raw_msg}: {err}')
+            self.add_to_history(f'{raw_msg}: {err}', 'ERROR')
         except (DeserializationError, UnexpectedTypeError) as err:
             logging.info('Invalid message: %s', err.error_message)
-            self.add_to_history(f'{raw_msg}: {err.error_message}')
+            self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
 
     def unhandled_input(self, key: str) -> None:
         """
@@ -266,18 +276,32 @@  def kill_app(self) -> None:
 
         :raise Exception: When an unhandled exception is caught.
         """
-        # It is ok to call disconnect even in disconnect state
+        await self.disconnect()
+        logging.debug('Disconnect finished. Exiting app')
+        raise urwid.ExitMainLoop()
+
+    async def disconnect(self) -> None:
+        """
+        Overrides the disconnect method to handle the errors locally.
+        """
+        if self.disconnecting:
+            return
         try:
-            await self.disconnect()
-            logging.debug('Disconnect finished. Exiting app')
-        except EOFError:
-            # We receive an EOF during disconnect, ignore that
-            pass
+            self.disconnecting = True
+            await super().disconnect()
+            self.retry = False
+        except EOFError as err:
+            logging.info('disconnect: %s', str(err))
+            self.retry = True
+        except ProtocolError as err:
+            logging.info('disconnect: %s', str(err))
+            self.retry = False
         except Exception as err:
-            logging.info('_kill_app: %s', str(err))
-            # Let the app crash after providing a proper stack trace
+            logging.error('disconnect: Unhandled exception %s', str(err))
+            self.retry = False
             raise err
-        raise urwid.ExitMainLoop()
+        finally:
+            self.disconnecting = False
 
     def _set_status(self, msg: str) -> None:
         """
@@ -301,18 +325,72 @@  def _get_formatted_address(self) -> str:
             addr = f'{self.address}'
         return addr
 
-    async def connect_server(self) -> None:
+    async def _initiate_connection(self) -> Optional[ConnectError]:
+        """
+        Tries connecting to a server a number of times with a delay between
+        each try. If all retries failed then return the error faced during
+        the last retry.
+
+        :return: Error faced during last retry.
+        """
+        current_retries = 0
+        err = None
+
+        # initial try
+        await self.connect_server()
+        while self.retry and current_retries < self.num_retries:
+            logging.info('Connection Failed, retrying in %d', self.retry_delay)
+            status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
+            self._set_status(status)
+
+            await asyncio.sleep(self.retry_delay)
+
+            err = await self.connect_server()
+            current_retries += 1
+        # If all retries failed report the last error
+        if err:
+            logging.info('All retries failed: %s', err)
+            return err
+        return None
+
+    async def manage_connection(self) -> None:
+        """
+        Manage the connection based on the current run state.
+
+        A reconnect is issued when the current state is IDLE and the number
+        of retries is not exhausted.
+        A disconnect is issued when the current state is DISCONNECTING.
+        """
+        while True:
+            if self.runstate == Runstate.IDLE:
+                err = await self._initiate_connection()
+                # If retry is still true then, we have exhausted all our tries.
+                if err:
+                    self._set_status(f'[Error: {err.error_message}]')
+                else:
+                    addr = self._get_formatted_address()
+                    self._set_status(f'[Connected {addr}]')
+            elif self.runstate == Runstate.DISCONNECTING:
+                self._set_status('[Disconnected]')
+                await self.disconnect()
+                # check if a retry is needed
+                if self.runstate == Runstate.IDLE:
+                    continue
+            await self.runstate_changed()
+
+    async def connect_server(self) -> Optional[ConnectError]:
         """
         Initiates a connection to the server at address `self.address`
         and in case of a failure, sets the status to the respective error.
         """
         try:
             await self.connect(self.address)
-            addr = self._get_formatted_address()
-            self._set_status(f'Connected to {addr}')
+            self.retry = False
         except ConnectError as err:
             logging.info('connect_server: ConnectError %s', str(err))
-            self._set_status(f'[ConnectError: {err.error_message}]')
+            self.retry = True
+            return err
+        return None
 
     def run(self, debug: bool = False) -> None:
         """
@@ -341,7 +419,7 @@  def run(self, debug: bool = False) -> None:
                                    event_loop=event_loop)
 
         create_task(self.wait_for_events(), self.aloop)
-        create_task(self.connect_server(), self.aloop)
+        create_task(self.manage_connection(), self.aloop)
         try:
             main_loop.run()
         except Exception as err:
@@ -566,6 +644,11 @@  def main() -> None:
     parser = argparse.ArgumentParser(description='AQMP TUI')
     parser.add_argument('qmp_server', help='Address of the QMP server. '
                         'Format <UNIX socket path | TCP addr:port>')
+    parser.add_argument('--num-retries', type=int, default=10,
+                        help='Number of times to reconnect before giving up.')
+    parser.add_argument('--retry-delay', type=int,
+                        help='Time(s) to wait before next retry. '
+                        'Default action is to wait 2s between each retry.')
     parser.add_argument('--log-file', help='The Log file name')
     parser.add_argument('--log-level', default='WARNING',
                         help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
@@ -581,7 +664,7 @@  def main() -> None:
     except QMPBadPortError as err:
         parser.error(str(err))
 
-    app = App(address)
+    app = App(address, args.num_retries, args.retry_delay)
 
     root_logger = logging.getLogger()
     root_logger.setLevel(logging.getLevelName(args.log_level))