diff mbox series

[v3,09/25] python/aqmp: add AsyncProtocol.accept() method

Message ID 20210803182941.504537-10-jsnow@redhat.com (mailing list archive)
State New, archived
Headers show
Series python: introduce Asynchronous QMP package | expand

Commit Message

John Snow Aug. 3, 2021, 6:29 p.m. UTC
It's a little messier than connect, because it wasn't designed to accept
*precisely one* connection. Such is life.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
 1 file changed, 85 insertions(+), 4 deletions(-)

Comments

Eric Blake Aug. 17, 2021, 7:29 p.m. UTC | #1
On Tue, Aug 03, 2021 at 02:29:25PM -0400, John Snow wrote:
> It's a little messier than connect, because it wasn't designed to accept
> *precisely one* connection. Such is life.
> 
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
>  1 file changed, 85 insertions(+), 4 deletions(-)
> 
> diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
> index 77b330627b3..7eca65aa265 100644
> --- a/python/qemu/aqmp/protocol.py
> +++ b/python/qemu/aqmp/protocol.py
> @@ -243,6 +243,24 @@ async def runstate_changed(self) -> Runstate:
>          await self._runstate_event.wait()
>          return self.runstate
>  
> +    @upper_half
> +    @require(Runstate.IDLE)
> +    async def accept(self, address: Union[str, Tuple[str, int]],
> +                     ssl: Optional[SSLContext] = None) -> None:
> +        """
> +        Accept a connection and begin processing message queues.
> +
> +        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
> +
> +        :param address:
> +            Address to listen to; UNIX socket path or TCP address/port.

Can't TCP use a well-known port name instead of an int?  But limiting
clients to just int port for now isn't fatal to the patch.

> +        :param ssl: SSL context to use, if any.
> +
> +        :raise StateError: When the `Runstate` is not `IDLE`.
> +        :raise ConnectError: If a connection could not be accepted.
> +        """
> +        await self._new_session(address, ssl, accept=True)
> +

Reviewed-by: Eric Blake <eblake@redhat.com>
John Snow Aug. 18, 2021, 2:24 p.m. UTC | #2
On Tue, Aug 17, 2021 at 3:30 PM Eric Blake <eblake@redhat.com> wrote:

> On Tue, Aug 03, 2021 at 02:29:25PM -0400, John Snow wrote:
> > It's a little messier than connect, because it wasn't designed to accept
> > *precisely one* connection. Such is life.
> >
> > Signed-off-by: John Snow <jsnow@redhat.com>
> > ---
> >  python/qemu/aqmp/protocol.py | 89 ++++++++++++++++++++++++++++++++++--
> >  1 file changed, 85 insertions(+), 4 deletions(-)
> >
> > diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
> > index 77b330627b3..7eca65aa265 100644
> > --- a/python/qemu/aqmp/protocol.py
> > +++ b/python/qemu/aqmp/protocol.py
> > @@ -243,6 +243,24 @@ async def runstate_changed(self) -> Runstate:
> >          await self._runstate_event.wait()
> >          return self.runstate
> >
> > +    @upper_half
> > +    @require(Runstate.IDLE)
> > +    async def accept(self, address: Union[str, Tuple[str, int]],
> > +                     ssl: Optional[SSLContext] = None) -> None:
> > +        """
> > +        Accept a connection and begin processing message queues.
> > +
> > +        If this call fails, `runstate` is guaranteed to be set back to
> `IDLE`.
> > +
> > +        :param address:
> > +            Address to listen to; UNIX socket path or TCP address/port.
>
> Can't TCP use a well-known port name instead of an int?  But limiting
> clients to just int port for now isn't fatal to the patch.
>
>
The old QMP library didn't support this, and I used the old library as my
template here. I'm willing to change the address format and types to be
more comprehensive, but I was thinking that it should probably try to match
or adhere to some standard; de-facto or otherwise. I wasn't sure which to
pick, and we use a few different ones in QEMU itself. Any recommendations
for me?


> > +        :param ssl: SSL context to use, if any.
> > +
> > +        :raise StateError: When the `Runstate` is not `IDLE`.
> > +        :raise ConnectError: If a connection could not be accepted.
> > +        """
> > +        await self._new_session(address, ssl, accept=True)
> > +
>
> Reviewed-by: Eric Blake <eblake@redhat.com>
>
> --
> Eric Blake, Principal Software Engineer
> Red Hat, Inc.           +1-919-301-3266
> Virtualization:  qemu.org | libvirt.org
>
>
Eric Blake Aug. 19, 2021, 2:50 p.m. UTC | #3
On Wed, Aug 18, 2021 at 10:24:52AM -0400, John Snow wrote:
> > >
> > > +    @upper_half
> > > +    @require(Runstate.IDLE)
> > > +    async def accept(self, address: Union[str, Tuple[str, int]],
> > > +                     ssl: Optional[SSLContext] = None) -> None:
> > > +        """
> > > +        Accept a connection and begin processing message queues.
> > > +
> > > +        If this call fails, `runstate` is guaranteed to be set back to
> > `IDLE`.
> > > +
> > > +        :param address:
> > > +            Address to listen to; UNIX socket path or TCP address/port.
> >
> > Can't TCP use a well-known port name instead of an int?  But limiting
> > clients to just int port for now isn't fatal to the patch.
> >
> >
> The old QMP library didn't support this, and I used the old library as my
> template here. I'm willing to change the address format and types to be
> more comprehensive, but I was thinking that it should probably try to match
> or adhere to some standard; de-facto or otherwise. I wasn't sure which to
> pick, and we use a few different ones in QEMU itself. Any recommendations
> for me?

I asked because I know QAPI specifies TCP as string/string (the
hostname as a string makes absolute sense, but the port number as a
string is because of the less-used feature of a well-known port name).
I'm fine if the initial patch uses an int for the port number here; we
can always add support for more formats down the road when someone
actually has a use for them.
John Snow Aug. 19, 2021, 3:48 p.m. UTC | #4
On Thu, Aug 19, 2021 at 10:50 AM Eric Blake <eblake@redhat.com> wrote:

> On Wed, Aug 18, 2021 at 10:24:52AM -0400, John Snow wrote:
> > > >
> > > > +    @upper_half
> > > > +    @require(Runstate.IDLE)
> > > > +    async def accept(self, address: Union[str, Tuple[str, int]],
> > > > +                     ssl: Optional[SSLContext] = None) -> None:
> > > > +        """
> > > > +        Accept a connection and begin processing message queues.
> > > > +
> > > > +        If this call fails, `runstate` is guaranteed to be set back
> to
> > > `IDLE`.
> > > > +
> > > > +        :param address:
> > > > +            Address to listen to; UNIX socket path or TCP
> address/port.
> > >
> > > Can't TCP use a well-known port name instead of an int?  But limiting
> > > clients to just int port for now isn't fatal to the patch.
> > >
> > >
> > The old QMP library didn't support this, and I used the old library as my
> > template here. I'm willing to change the address format and types to be
> > more comprehensive, but I was thinking that it should probably try to
> match
> > or adhere to some standard; de-facto or otherwise. I wasn't sure which to
> > pick, and we use a few different ones in QEMU itself. Any recommendations
> > for me?
>
> I asked because I know QAPI specifies TCP as string/string (the
> hostname as a string makes absolute sense, but the port number as a
> string is because of the less-used feature of a well-known port name).
> I'm fine if the initial patch uses an int for the port number here; we
> can always add support for more formats down the road when someone
> actually has a use for them.
>
>
https://docs.python.org/3/library/socket.html#socket-families

"A pair (host, port) is used for the AF_INET address family, where host is
a string representing either a hostname in Internet domain notation like '
daring.cwi.nl' or an IPv4 address like '100.50.200.5', and port is an
integer."

The docs seem to suggest that I am actually limited only to integers here.
Do you have an example of using a string for a port number? I have to admit
I am not well acquainted with it.
Eduardo Habkost Aug. 19, 2021, 4:43 p.m. UTC | #5
On Thu, Aug 19, 2021 at 11:48:16AM -0400, John Snow wrote:
> On Thu, Aug 19, 2021 at 10:50 AM Eric Blake <eblake@redhat.com> wrote:
> 
> > On Wed, Aug 18, 2021 at 10:24:52AM -0400, John Snow wrote:
> > > > >
> > > > > +    @upper_half
> > > > > +    @require(Runstate.IDLE)
> > > > > +    async def accept(self, address: Union[str, Tuple[str, int]],
> > > > > +                     ssl: Optional[SSLContext] = None) -> None:
> > > > > +        """
> > > > > +        Accept a connection and begin processing message queues.
> > > > > +
> > > > > +        If this call fails, `runstate` is guaranteed to be set back
> > to
> > > > `IDLE`.
> > > > > +
> > > > > +        :param address:
> > > > > +            Address to listen to; UNIX socket path or TCP
> > address/port.
> > > >
> > > > Can't TCP use a well-known port name instead of an int?  But limiting
> > > > clients to just int port for now isn't fatal to the patch.
> > > >
> > > >
> > > The old QMP library didn't support this, and I used the old library as my
> > > template here. I'm willing to change the address format and types to be
> > > more comprehensive, but I was thinking that it should probably try to
> > match
> > > or adhere to some standard; de-facto or otherwise. I wasn't sure which to
> > > pick, and we use a few different ones in QEMU itself. Any recommendations
> > > for me?
> >
> > I asked because I know QAPI specifies TCP as string/string (the
> > hostname as a string makes absolute sense, but the port number as a
> > string is because of the less-used feature of a well-known port name).
> > I'm fine if the initial patch uses an int for the port number here; we
> > can always add support for more formats down the road when someone
> > actually has a use for them.
> >
> >
> https://docs.python.org/3/library/socket.html#socket-families
> 
> "A pair (host, port) is used for the AF_INET address family, where host is
> a string representing either a hostname in Internet domain notation like '
> daring.cwi.nl' or an IPv4 address like '100.50.200.5', and port is an
> integer."
> 
> The docs seem to suggest that I am actually limited only to integers here.
> Do you have an example of using a string for a port number? I have to admit
> I am not well acquainted with it.

QEMU uses getaddrinfo() at inet_parse_connect_saddr() to translate the
string/string pair to a socket address.

Python equivalent:

>> socket.getaddrinfo('localhost', 'ssh')
[(<AddressFamily.AF_INET6: 10>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 22, 0, 0)), (<AddressFamily.AF_INET6: 10>, <SocketKind.SOCK_DGRAM: 2>, 17, '', ('::1', 22, 0, 0)), (<AddressFamily.AF_INET6: 10>, <SocketKind.SOCK_STREAM: 1>, 132, '', ('::1', 22, 0, 0)), (<AddressF
amily.AF_INET6: 10>, <SocketKind.SOCK_SEQPACKET: 5>, 132, '', ('::1', 22, 0, 0)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 22)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_DGRAM: 2>, 17, '', ('127.0.0.1', 22)), (<AddressFamily.AF_INE
T: 2>, <SocketKind.SOCK_STREAM: 1>, 132, '', ('127.0.0.1', 22)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_SEQPACKET: 5>, 132, '', ('127.0.0.1', 22))]

Translating this to the correct arguments to socket.socket() and
socket.socket.connect() seems overly complicated, though.
diff mbox series

Patch

diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index 77b330627b3..7eca65aa265 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -243,6 +243,24 @@  async def runstate_changed(self) -> Runstate:
         await self._runstate_event.wait()
         return self.runstate
 
+    @upper_half
+    @require(Runstate.IDLE)
+    async def accept(self, address: Union[str, Tuple[str, int]],
+                     ssl: Optional[SSLContext] = None) -> None:
+        """
+        Accept a connection and begin processing message queues.
+
+        If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+        :param address:
+            Address to listen to; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise StateError: When the `Runstate` is not `IDLE`.
+        :raise ConnectError: If a connection could not be accepted.
+        """
+        await self._new_session(address, ssl, accept=True)
+
     @upper_half
     @require(Runstate.IDLE)
     async def connect(self, address: Union[str, Tuple[str, int]],
@@ -308,7 +326,8 @@  def _set_state(self, state: Runstate) -> None:
     @upper_half
     async def _new_session(self,
                            address: Union[str, Tuple[str, int]],
-                           ssl: Optional[SSLContext] = None) -> None:
+                           ssl: Optional[SSLContext] = None,
+                           accept: bool = False) -> None:
         """
         Establish a new connection and initialize the session.
 
@@ -317,9 +336,10 @@  async def _new_session(self,
         to be set back to `IDLE`.
 
         :param address:
-            Address to connect to;
+            Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
 
         :raise ConnectError:
             When a connection or session cannot be established.
@@ -333,7 +353,7 @@  async def _new_session(self,
 
         try:
             phase = "connection"
-            await self._establish_connection(address, ssl)
+            await self._establish_connection(address, ssl, accept)
 
             phase = "session"
             await self._establish_session()
@@ -367,6 +387,7 @@  async def _establish_connection(
             self,
             address: Union[str, Tuple[str, int]],
             ssl: Optional[SSLContext] = None,
+            accept: bool = False
     ) -> None:
         """
         Establish a new connection.
@@ -375,6 +396,7 @@  async def _establish_connection(
             Address to connect to/listen on;
             UNIX socket path or TCP address/port.
         :param ssl: SSL context to use, if any.
+        :param accept: Accept a connection instead of connecting when `True`.
         """
         assert self.runstate == Runstate.IDLE
         self._set_state(Runstate.CONNECTING)
@@ -384,7 +406,66 @@  async def _establish_connection(
         # otherwise yield.
         await asyncio.sleep(0)
 
-        await self._do_connect(address, ssl)
+        if accept:
+            await self._do_accept(address, ssl)
+        else:
+            await self._do_connect(address, ssl)
+
+    @upper_half
+    async def _do_accept(self, address: Union[str, Tuple[str, int]],
+                         ssl: Optional[SSLContext] = None) -> None:
+        """
+        Acting as the transport server, accept a single connection.
+
+        :param address:
+            Address to listen on; UNIX socket path or TCP address/port.
+        :param ssl: SSL context to use, if any.
+
+        :raise OSError: For stream-related errors.
+        """
+        self.logger.debug("Awaiting connection on %s ...", address)
+        connected = asyncio.Event()
+        server: Optional[asyncio.AbstractServer] = None
+
+        async def _client_connected_cb(reader: asyncio.StreamReader,
+                                       writer: asyncio.StreamWriter) -> None:
+            """Used to accept a single incoming connection, see below."""
+            nonlocal server
+            nonlocal connected
+
+            # A connection has been accepted; stop listening for new ones.
+            assert server is not None
+            server.close()
+            await server.wait_closed()
+            server = None
+
+            # Register this client as being connected
+            self._reader, self._writer = (reader, writer)
+
+            # Signal back: We've accepted a client!
+            connected.set()
+
+        if isinstance(address, tuple):
+            coro = asyncio.start_server(
+                _client_connected_cb,
+                host=address[0],
+                port=address[1],
+                ssl=ssl,
+                backlog=1,
+            )
+        else:
+            coro = asyncio.start_unix_server(
+                _client_connected_cb,
+                path=address,
+                ssl=ssl,
+                backlog=1,
+            )
+
+        server = await coro     # Starts listening
+        await connected.wait()  # Waits for the callback to fire (and finish)
+        assert server is None
+
+        self.logger.debug("Connection accepted.")
 
     @upper_half
     async def _do_connect(self, address: Union[str, Tuple[str, int]],