mbox series

[RFC,0/7] RFC: Asynchronous QMP Draft

Message ID 20210413155553.2660523-1-jsnow@redhat.com (mailing list archive)
Headers show
Series RFC: Asynchronous QMP Draft | expand

Message

John Snow April 13, 2021, 3:55 p.m. UTC
(Does not apply to the QEMU tree; this is against a blank repository.)

Hi! This is a Draft RFC for an asyncio-based Python library that
implements a QMP client. The intent is to eventually publish this
library directly to PyPI, so the design focus of this library is to be
"useful" instead of providing as low-level an interface as possible.

I am sending this to solicit general, high-level design feedback on the
overal layout and approach. Many minor details are still left to be
implemented, and a lot of the docstrings and documentation need to be
audited to make sure they still apply as I've shuffled things around a
lot in the course of development.

There are some pretty notable things missing still; in particular I need
to develop an Event API (there is a tiny stub added as a hack, but it's
very simplistic), and I also need to develop a sync bridge so that this
library could be used in existing iotests if we eventually expect to
replace the old QMP library with this one.

Scattered throughout these files are "RFC" comments and other "FIXME"
and "TODO" items where I've tried to stub out some of the things I am
still unsure of.

Thanks!

John Snow (7):
  util: asyncio-related helpers
  error: Error classes and so on.
  protocol: generic async message-based protocol loop
  message: add QMP Message type
  models: Add well-known QMP objects
  qmp_protocol: add QMP client implementation
  linter config

 .flake8         |   2 +
 error.py        | 163 +++++++++++
 message.py      | 196 ++++++++++++++
 models.py       | 177 ++++++++++++
 protocol.py     | 704 ++++++++++++++++++++++++++++++++++++++++++++++++
 pylintrc        |  53 ++++
 qmp_protocol.py | 420 +++++++++++++++++++++++++++++
 util.py         |  87 ++++++
 8 files changed, 1802 insertions(+)
 create mode 100644 .flake8
 create mode 100644 error.py
 create mode 100644 message.py
 create mode 100644 models.py
 create mode 100644 protocol.py
 create mode 100644 pylintrc
 create mode 100644 qmp_protocol.py
 create mode 100644 util.py

Comments

Stefan Hajnoczi April 14, 2021, 6:38 a.m. UTC | #1
Below are the API docs that I found helpful for understanding the big
picture.

The QMP.execute() API is nice.

Regarding QMP events, I can think of two approaches:
1. Callbacks
2. An async get_event(name=Optional[str]) -> object API
   (plus get_event_nowait(name=Optional[str]) -> object)

(There's probably a third approach using async iterators but it's
similar to get_event().)

Both approaches are useful. The first is good in larger asynchronous
applications that perform many tasks concurrently. The second is good
when there is just one specific thing to do, like waiting for a block
job to complete.

My general impression is that the public API is nice and usable but the
implementation is complex and risks discouraging other people from
hacking on the code. There are too many abstractions and while it's
highly structured, there is a cost to having all this infrastructure. I
think simplifying it would make it easier for others to understand and
contribute to the code.

Ideas: open code or inline simple things instead of defining
abstractions that only have 1 user, drop the pydantic models, drop
classes that just wrap things like Message and the exception hierarchy,
combine protocol and qmp_protocol.

Things that might be worth adding:
1. File descriptor passing support.
2. Introspection support to easily check if a command/feature is
   available. Users can do this manually by sending QMP commands and
   interpreting the response, but this may be common enough to warrant a
   friendly API.

Help on module qmp.qmp_protocol in qmp:

NAME
    qmp.qmp_protocol - QMP Client Implementation

DESCRIPTION
    This module provides the QMP class, which can be used to connect and
    send commands to a QMP server such as QEMU. The QMP class can be used to
    either connect to a listening server, or used to listen and accept an
    incoming connection from the server.

CLASSES
    qmp.error.AQMPError(builtins.Exception)
        ExecuteError
    qmp.protocol.AsyncProtocol(typing.Generic)
        QMP
    
    class ExecuteError(qmp.error.AQMPError)
     |  ExecuteError(sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
     |  
     |  Execution statement returned failure.
     |  
     |  Method resolution order:
     |      ExecuteError
     |      qmp.error.AQMPError
     |      builtins.Exception
     |      builtins.BaseException
     |      builtins.object
     |  
     |  Methods defined here:
     |  
     |  __init__(self, sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  __str__(self) -> str
     |      Return str(self).
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from qmp.error.AQMPError:
     |  
     |  __weakref__
     |      list of weak references to the object (if defined)
     |  
     |  ----------------------------------------------------------------------
     |  Static methods inherited from builtins.Exception:
     |  
     |  __new__(*args, **kwargs) from builtins.type
     |      Create and return a new object.  See help(type) for accurate signature.
     |  
     |  ----------------------------------------------------------------------
     |  Methods inherited from builtins.BaseException:
     |  
     |  __delattr__(self, name, /)
     |      Implement delattr(self, name).
     |  
     |  __getattribute__(self, name, /)
     |      Return getattr(self, name).
     |  
     |  __reduce__(...)
     |      Helper for pickle.
     |  
     |  __repr__(self, /)
     |      Return repr(self).
     |  
     |  __setattr__(self, name, value, /)
     |      Implement setattr(self, name, value).
     |  
     |  __setstate__(...)
     |  
     |  with_traceback(...)
     |      Exception.with_traceback(tb) --
     |      set self.__traceback__ to tb and return self.
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from builtins.BaseException:
     |  
     |  __cause__
     |      exception cause
     |  
     |  __context__
     |      exception context
     |  
     |  __dict__
     |  
     |  __suppress_context__
     |  
     |  __traceback__
     |  
     |  args
    
    class QMP(qmp.protocol.AsyncProtocol)
     |  QMP(name: Optional[str] = None) -> None
     |  
     |  Implements a QMP connection to/from the server.
     |  
     |  Basic usage looks like this::
     |  
     |    qmp = QMP('my_virtual_machine_name')
     |    await qmp.connect(('127.0.0.1', 1234))
     |    ...
     |    res = await qmp.execute('block-query')
     |    ...
     |    await qmp.disconnect()
     |  
     |  :param name: Optional nickname for the connection, used for logging.
     |  
     |  Method resolution order:
     |      QMP
     |      qmp.protocol.AsyncProtocol
     |      typing.Generic
     |      builtins.object
     |  
     |  Methods defined here:
     |  
     |  __init__(self, name: Optional[str] = None) -> None
     |      Initialize self.  See help(type(self)) for accurate signature.
     |  
     |  async execute(self, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> object
     |      Execute a QMP command and return the response.
     |      
     |      :param cmd: QMP command name.
     |      :param arguments: Arguments (if any). Must be JSON-serializable.
     |      :param oob: If true, execute "out of band".
     |      
     |      :raise: ExecuteError if the server returns an error response.
     |      :raise: DisconnectedError if the connection was terminated early.
     |      
     |      :return: Execution response from the server. The type of object depends
     |               on the command that was issued, though most return a dict.
     |  
     |  async execute_msg(self, msg: qmp.message.Message) -> object
     |      Execute a QMP message and return the response.
     |      
     |      :param msg: The QMP `Message` to execute.
     |      :raises: ValueError if the QMP `Message` does not have either the
     |               'execute' or 'exec-oob' fields set.
     |      :raises: ExecuteError if the server returns an error response.
     |      :raises: DisconnectedError if the connection was terminated early.
     |      
     |      :return: Execution response from the server. The type of object depends
     |               on the command that was issued, though most return a dict.
     |  
     |  on_event(self, func: Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]) -> Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]
     |      FIXME: Quick hack: decorator to register event handlers.
     |      
     |      Use it like this::
     |      
     |        @qmp.on_event
     |        async def my_event_handler(qmp, event: Message) -> None:
     |          print(f"Received event: {event['event']}")
     |      
     |      RFC: What kind of event handler would be the most useful in
     |      practical terms? In tests, we are usually waiting for an
     |      event with some criteria to occur; maybe it would be useful
     |      to allow "coroutine" style functions where we can block
     |      until a certain event shows up?
     |  
     |  ----------------------------------------------------------------------
     |  Class methods defined here:
     |  
     |  make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> qmp.message.Message from builtins.type
     |      Create an executable message to be sent by `execute_msg` later.
     |      
     |      :param cmd: QMP command name.
     |      :param arguments: Arguments (if any). Must be JSON-serializable.
     |      :param oob: If true, execute "out of band".
     |      
     |      :return: An executable QMP message.
     |  
     |  ----------------------------------------------------------------------
     |  Data and other attributes defined here:
     |  
     |  __orig_bases__ = (qmp.protocol.AsyncProtocol[qmp.message.Message],)
     |  
     |  __parameters__ = ()
     |  
     |  logger = <Logger qmp.qmp_protocol (WARNING)>
     |  
     |  ----------------------------------------------------------------------
     |  Methods inherited from qmp.protocol.AsyncProtocol:
     |  
     |  async accept(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
     |      Accept a connection and begin processing message queues.
     |      
     |      :param address: Address to connect to;
     |                      UNIX socket path or TCP address/port.
     |      :param ssl:     SSL context to use, if any.
     |      
     |      :raise: `StateError`   (loop is running or disconnecting.)
     |      :raise: `ConnectError` (Connection was not successful.)
     |  
     |  async connect(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
     |      Connect to the server and begin processing message queues.
     |      
     |      :param address: Address to connect to;
     |                      UNIX socket path or TCP address/port.
     |      :param ssl:     SSL context to use, if any.
     |      
     |      :raise: `StateError`   (loop is running or disconnecting.)
     |      :raise: `ConnectError` (Connection was not successful.)
     |  
     |  async disconnect(self) -> None
     |      Disconnect and wait for all tasks to fully stop.
     |      
     |      If there were exceptions that caused the bottom half to terminate
     |      prematurely, they will be raised here.
     |      
     |      :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
     |                               the bottom half.
     |      :raise: `MultiException` Iterable Exception used to multiplex multiple
     |                               exceptions when multiple tasks failed.
     |  
     |  ----------------------------------------------------------------------
     |  Readonly properties inherited from qmp.protocol.AsyncProtocol:
     |  
     |  disconnecting
     |      Return True when the loop is disconnecting, or disconnected.
     |  
     |  running
     |      Return True when the loop is currently connected and running.
     |  
     |  unconnected
     |      Return True when the loop is fully idle and quiesced.
     |      
     |      Returns True specifically when the loop is neither `running`
     |      nor `disconnecting`. A call to `disconnect()` is required
     |      to transition from `disconnecting` to `unconnected`.
     |  
     |  ----------------------------------------------------------------------
     |  Data descriptors inherited from qmp.protocol.AsyncProtocol:
     |  
     |  __dict__
     |      dictionary for instance variables (if defined)
     |  
     |  __weakref__
     |      list of weak references to the object (if defined)
     |  
     |  ----------------------------------------------------------------------
     |  Class methods inherited from typing.Generic:
     |  
     |  __class_getitem__(params) from builtins.type
     |  
     |  __init_subclass__(*args, **kwargs) from builtins.type
     |      This method is called when a class is subclassed.
     |      
     |      The default implementation does nothing. It may be
     |      overridden to extend subclasses.

DATA
    Awaitable = typing.Awaitable
        A generic version of collections.abc.Awaitable.
    
    Callable = typing.Callable
        Callable type; Callable[[int], str] is a function of (int) -> str.
        
        The subscription syntax must always be used with exactly two
        values: the argument list and the return type.  The argument list
        must be a list of types or ellipsis; the return type must be a single type.
        
        There is no syntax to indicate optional or keyword arguments,
        such function types are rarely used as callback types.
    
    Dict = typing.Dict
        A generic version of dict.
    
    List = typing.List
        A generic version of list.
    
    Mapping = typing.Mapping
        A generic version of collections.abc.Mapping.
    
    Optional = typing.Optional
        Optional type.
        
        Optional[X] is equivalent to Union[X, None].
    
    Tuple = typing.Tuple
        Tuple type; Tuple[X, Y] is the cross-product type of X and Y.
        
        Example: Tuple[T1, T2] is a tuple of two elements corresponding
        to type variables T1 and T2.  Tuple[int, float, str] is a tuple
        of an int, a float and a string.
        
        To specify a variable-length tuple of homogeneous type, use Tuple[T, ...].

FILE
    /tmp/foo/qmp/qmp_protocol.py
John Snow April 14, 2021, 7:17 p.m. UTC | #2
First and foremost, thank you for reviewing this! It is very helpful to 
me to see what others think of this pet project I've been growing in the 
privacy of my own mind.

On 4/14/21 2:38 AM, Stefan Hajnoczi wrote:
> Below are the API docs that I found helpful for understanding the big
> picture.
> 
> The QMP.execute() API is nice.
> 

Yes. It mimics (sync) qmp.command(), which I believe Eduardo Habkost 
wrote. I think it's the correct idea for a generic (QAPI-schema 
ignorant) QMP client library meant to be "used".

I think raising RPC in-band execution errors as exceptions is a nice 
"pythonic" way to do it.

(And, if desired, it is possible to use the QAPI generator to generate 
wrappers around this interface using type-safe arguments in a low-level 
SDK layer. I think that would be pretty swell. We are not there yet, 
though, and I'll focus on this layer first.)

> Regarding QMP events, I can think of two approaches:
> 1. Callbacks
> 2. An async get_event(name=Optional[str]) -> object API
>     (plus get_event_nowait(name=Optional[str]) -> object)
> 
> (There's probably a third approach using async iterators but it's
> similar to get_event().)
> 
> Both approaches are useful. The first is good in larger asynchronous
> applications that perform many tasks concurrently. The second is good
> when there is just one specific thing to do, like waiting for a block
> job to complete.
> 
(1) On callbacks:

Callbacks are what I meagerly mocked up; discord.py has a "cute" little 
hack that works like this:

bot = commands.bot(...)

@bot.event
async def on_ready():
     print("Logged in as")
     print(bot.user.name)
     ...

(See 
https://github.com/Rapptz/discord.py/blob/master/examples/basic_bot.py )

I find this to be extremely cute: the framework uses the name of the 
callback to determine which event you are registering, and uses the 
decorator to merely register the callback.

This makes a nice, uncomplicated way to plug coroutines into the state 
machine of the client loop in the most basic cases.

I thought it might be nice to try and mimic that design, by perhaps 
using the names of QMP events as their own 'queues', and then 
dispatching user callbacks as desired. (Possibly with one mega-queue 
that exists for ALL callbacks.)

For instance, something like this:

@qmp.event
async def job_status_block_job_ready(qmp, event):
     ...

or more generally,

@qmp.event_handler
async def my_own_event_handler(qmp, event):
     ...

I didn't spend much time on the actual queue or dispatch mechanism in my 
draft, though, but it could be "bolstered" into a more full-fledged API 
if desired.

One nice thing about this design is that events aren't "consumed" by a 
caller, they are just dispatched to anyone waiting on an event of that type.

As I recall, events getting "eaten" at the wrong time was a major burden 
when writing iotests that exercised multiple jobs, transactions, etc.

(A side note: a higher-level VM class that uses QMP may wish to capture 
certain events to record state changes, such that the state can be 
queried at an arbitrary point by any number of callers without needing 
to have witnessed the state change event personally. That's not really 
important here in the protocol library, though, which will pretend not 
to know which events exist -- but it's a consideration for making sure 
the design that IS chosen can be extensible to support that kind of thing.)


(2) On get_event or async iterators:

This is likely a good ad-hoc feature. Should it only work for events 
that are delivered from that moment in time, or should there be a 
"backlog" of events to deliver?

Should waiting on events in this manner "consume" the event from the 
backlog, if we have one?

My concern::

   await qmp.execute('blockdev-backup', {...etc...})
   async for event in qmp.get_events():
       ...


It's possible that an event we'd like to see has already occurred by the 
time we get around to invoking the async iterator. You'd really want to 
start checking for events *before* you issue the job request, but that 
involves tasks, and the code doesn't "flow" well anymore.

I don't have ideas, at-present, for how to make things like iotests 
"flow" well in a linear co-routine sense...

...although, maybe it's worth creating something like an Event Listener 
object that, from its creation, stashes events from that point forward. 
How about::

   async with qmp.event_listener() as events:
       await qmp.execute('blockdev-backup', {...})
       async for event in events:
           ...

Actually, that seems pretty cool. What do you think? I think it's fairly 
elegant for ad-hoc use. We could even extend the constructor to accept 
filtering criteria if we wanted to, later.

Possibly we could also augment the Event Listener object to support a 
few methods to facilitate blocking until a certain event occurs, like::

   async with qmp.event_listener() as events:
       await qmp.execute('blockdev-backup', {...})
       await events.event('JOB_STATUS_CHANGE', status="pending")
       await qmp.execute('job-finalize', {...})
       ...


I think that's pretty workable, actually! And it could co-exist 
perfectly well alongside event callback handlers.


> My general impression is that the public API is nice and usable but the
> implementation is complex and risks discouraging other people from
> hacking on the code. There are too many abstractions and while it's
> highly structured, there is a cost to having all this infrastructure. I
> think simplifying it would make it easier for others to understand and
> contribute to the code.
> 

It's a fair point. I am hoping that the protocol.py layers won't need to 
be touched quite so much. (Famous last words) and that most of the 
interesting work can happen at the qmp_protocol.py level and above, though.

> Ideas: open code or inline simple things instead of defining
> abstractions that only have 1 user, drop the pydantic models, drop
> classes that just wrap things like Message and the exception hierarchy,
> combine protocol and qmp_protocol.
> 

(1) On models:

Pydantic models are definitely optional at this stage, but I am floating 
them here to prepare people for the idea that I might try to get more 
mileage out of them in the future to offer a type-safe, QAPI-aware SDK 
layer.

They're definitely only a mild benefit here, for now, as the strict 
typing they help provide is not floated upwards or exposed to the user.


(2) On the Message class:

I'll try a draft where I try to drop or simplify the Message class. It 
seems like a good candidate, but I think I'm subjectively afraid I won't 
like the inlining. We'll see!


(3) On the Exception hierarchy:

Let's talk about error handling design a little bit more. I want to make 
sure that the errors that can happen and in which circumstances are 
obvious and have good names, but there might be better approaches to 
managing that complexity.


(4) On combining protocol and qmp_protocol:

Maybe. Do you want to look at the qtest implementation? It's somewhat 
ancillary to this project, but felt it would make a nice companion 
library. It doesn't benefit as strongly as QMP (As it does not offer 
anything like OOB), but it does have async messages it can send, so it 
can re-use the same infrastructure.

(Fully admit that the first draft, of course, did feature a combined 
protocol/qmp_protocol class. It was split out later.)

> Things that might be worth adding:
> 1. File descriptor passing support.

Do you have an example workflow that I can use to test this? This is a 
weak spot in my knowledge.

> 2. Introspection support to easily check if a command/feature is
>     available. Users can do this manually by sending QMP commands and
>     interpreting the response, but this may be common enough to warrant a
>     friendly API.
> 

I think this treads into QAPI-specific domain knowledge, and I might 
leave such features to a higher-level object.

The QMP spec itself does not define a mechanism by which the QMP 
protocol itself will reveal the valid commands, and so it might be up to 
a machine.py-based extension/capsulation of qmp_protocol to provide that.

(Though, I do agree; I want this feature somewhere. We do have such a 
thing coded into the existing qmp-shell tool, using the query-commands 
command. Maybe I can offer a subclass that offers some of these 
convenience features using a best-effort guess-and-check style 
introspection. Please forgive me if I focus on shoring up the design of 
the core implementation first.)

> Help on module qmp.qmp_protocol in qmp:
> 
> NAME
>      qmp.qmp_protocol - QMP Client Implementation
> 
> DESCRIPTION
>      This module provides the QMP class, which can be used to connect and
>      send commands to a QMP server such as QEMU. The QMP class can be used to
>      either connect to a listening server, or used to listen and accept an
>      incoming connection from the server.
> 
> CLASSES
>      qmp.error.AQMPError(builtins.Exception)
>          ExecuteError
>      qmp.protocol.AsyncProtocol(typing.Generic)
>          QMP
>      
>      class ExecuteError(qmp.error.AQMPError)
>       |  ExecuteError(sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
>       |
>       |  Execution statement returned failure.
>       |
>       |  Method resolution order:
>       |      ExecuteError
>       |      qmp.error.AQMPError
>       |      builtins.Exception
>       |      builtins.BaseException
>       |      builtins.object
>       |
>       |  Methods defined here:
>       |
>       |  __init__(self, sent: qmp.message.Message, received: qmp.message.Message, error: qmp.models.ErrorInfo)
>       |      Initialize self.  See help(type(self)) for accurate signature.
>       |
>       |  __str__(self) -> str
>       |      Return str(self).
>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from qmp.error.AQMPError:
>       |
>       |  __weakref__
>       |      list of weak references to the object (if defined)
>       |
>       |  ----------------------------------------------------------------------
>       |  Static methods inherited from builtins.Exception:
>       |
>       |  __new__(*args, **kwargs) from builtins.type
>       |      Create and return a new object.  See help(type) for accurate signature.
>       |
>       |  ----------------------------------------------------------------------
>       |  Methods inherited from builtins.BaseException:
>       |
>       |  __delattr__(self, name, /)
>       |      Implement delattr(self, name).
>       |
>       |  __getattribute__(self, name, /)
>       |      Return getattr(self, name).
>       |
>       |  __reduce__(...)
>       |      Helper for pickle.
>       |
>       |  __repr__(self, /)
>       |      Return repr(self).
>       |
>       |  __setattr__(self, name, value, /)
>       |      Implement setattr(self, name, value).
>       |
>       |  __setstate__(...)
>       |
>       |  with_traceback(...)
>       |      Exception.with_traceback(tb) --
>       |      set self.__traceback__ to tb and return self.
>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from builtins.BaseException:
>       |
>       |  __cause__
>       |      exception cause
>       |
>       |  __context__
>       |      exception context
>       |
>       |  __dict__
>       |
>       |  __suppress_context__
>       |
>       |  __traceback__
>       |
>       |  args
>      
>      class QMP(qmp.protocol.AsyncProtocol)
>       |  QMP(name: Optional[str] = None) -> None
>       |
>       |  Implements a QMP connection to/from the server.
>       |
>       |  Basic usage looks like this::
>       |
>       |    qmp = QMP('my_virtual_machine_name')
>       |    await qmp.connect(('127.0.0.1', 1234))
>       |    ...
>       |    res = await qmp.execute('block-query')
>       |    ...
>       |    await qmp.disconnect()
>       |

This reminds me.

I was briefly considering the idea that the QMP object could be 
converted into something like a QMP session factory instead, and you 
could use a context manager.

e.g.

qmp = QMP(session_factory_settings)
async with qmp.connect(...) as session:
     ...
     ...

with disconnect() being implicitly called upon the destruction of the 
session object when it goes out of scope.

I could also simply mock this up without creating a factory/session 
split, just by having the context manager simply return 'self':

proto = QMP(various_settings)
async with proto.connect(...) as qmp:
     ...
     ...

Though it's a little hacky, as with-expressions are expected to return 
*something*, and we actually already have a handle to that object.

>       |  :param name: Optional nickname for the connection, used for logging.
>       |
>       |  Method resolution order:
>       |      QMP
>       |      qmp.protocol.AsyncProtocol
>       |      typing.Generic
>       |      builtins.object
>       |
>       |  Methods defined here:
>       |
>       |  __init__(self, name: Optional[str] = None) -> None
>       |      Initialize self.  See help(type(self)) for accurate signature.
>       |
>       |  async execute(self, cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> object
>       |      Execute a QMP command and return the response.
>       |
>       |      :param cmd: QMP command name.
>       |      :param arguments: Arguments (if any). Must be JSON-serializable.
>       |      :param oob: If true, execute "out of band".
>       |
>       |      :raise: ExecuteError if the server returns an error response.
>       |      :raise: DisconnectedError if the connection was terminated early.
>       |
>       |      :return: Execution response from the server. The type of object depends
>       |               on the command that was issued, though most return a dict.
>       |
>       |  async execute_msg(self, msg: qmp.message.Message) -> object
>       |      Execute a QMP message and return the response.
>       |
>       |      :param msg: The QMP `Message` to execute.
>       |      :raises: ValueError if the QMP `Message` does not have either the
>       |               'execute' or 'exec-oob' fields set.
>       |      :raises: ExecuteError if the server returns an error response.
>       |      :raises: DisconnectedError if the connection was terminated early.
>       |
>       |      :return: Execution response from the server. The type of object depends
>       |               on the command that was issued, though most return a dict.
>       |
>       |  on_event(self, func: Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]) -> Callable[[ForwardRef('QMP'), qmp.message.Message], Awaitable[NoneType]]
>       |      FIXME: Quick hack: decorator to register event handlers.
>       |
>       |      Use it like this::
>       |
>       |        @qmp.on_event
>       |        async def my_event_handler(qmp, event: Message) -> None:
>       |          print(f"Received event: {event['event']}")
>       |
>       |      RFC: What kind of event handler would be the most useful in
>       |      practical terms? In tests, we are usually waiting for an
>       |      event with some criteria to occur; maybe it would be useful
>       |      to allow "coroutine" style functions where we can block
>       |      until a certain event shows up?
>       |
>       |  ----------------------------------------------------------------------
>       |  Class methods defined here:
>       |
>       |  make_execute_msg(cmd: str, arguments: Optional[Mapping[str, object]] = None, oob: bool = False) -> qmp.message.Message from builtins.type
>       |      Create an executable message to be sent by `execute_msg` later.
>       |
>       |      :param cmd: QMP command name.
>       |      :param arguments: Arguments (if any). Must be JSON-serializable.
>       |      :param oob: If true, execute "out of band".
>       |
>       |      :return: An executable QMP message.
>       |
>       |  ----------------------------------------------------------------------
>       |  Data and other attributes defined here:
>       |
>       |  __orig_bases__ = (qmp.protocol.AsyncProtocol[qmp.message.Message],)
>       |
>       |  __parameters__ = ()
>       |
>       |  logger = <Logger qmp.qmp_protocol (WARNING)>
>       |
>       |  ----------------------------------------------------------------------
>       |  Methods inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  async accept(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
>       |      Accept a connection and begin processing message queues.
>       |
>       |      :param address: Address to connect to;
>       |                      UNIX socket path or TCP address/port.
>       |      :param ssl:     SSL context to use, if any.
>       |
>       |      :raise: `StateError`   (loop is running or disconnecting.)
>       |      :raise: `ConnectError` (Connection was not successful.)
>       |
>       |  async connect(self, address: Union[str, Tuple[str, int]], ssl: Optional[ssl.SSLContext] = None) -> None
>       |      Connect to the server and begin processing message queues.
>       |
>       |      :param address: Address to connect to;
>       |                      UNIX socket path or TCP address/port.
>       |      :param ssl:     SSL context to use, if any.
>       |
>       |      :raise: `StateError`   (loop is running or disconnecting.)
>       |      :raise: `ConnectError` (Connection was not successful.)
>       |
>       |  async disconnect(self) -> None
>       |      Disconnect and wait for all tasks to fully stop.
>       |
>       |      If there were exceptions that caused the bottom half to terminate
>       |      prematurely, they will be raised here.
>       |
>       |      :raise: `Exception`      Arbitrary exceptions re-raised on behalf of
>       |                               the bottom half.
>       |      :raise: `MultiException` Iterable Exception used to multiplex multiple
>       |                               exceptions when multiple tasks failed.
>       |
>       |  ----------------------------------------------------------------------
>       |  Readonly properties inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  disconnecting
>       |      Return True when the loop is disconnecting, or disconnected.
>       |
>       |  running
>       |      Return True when the loop is currently connected and running.
>       |
>       |  unconnected
>       |      Return True when the loop is fully idle and quiesced.
>       |
>       |      Returns True specifically when the loop is neither `running`
>       |      nor `disconnecting`. A call to `disconnect()` is required
>       |      to transition from `disconnecting` to `unconnected`.

Any thoughts on these? I think I accidentally created some landmines 
here, actually.

disconnecting: This one is, I think, consistent with other primitives in 
Python like "is_closing", where it also does include the "fully closed" 
state. It hopefully adheres to the principle of least surprise.


running: This one is maybe bad, though. I mean this to be "fully 
running", i.e. the loop is fully open and nothing is wrong. I believe it 
is not true until sometime "in the middle" of the accept() or connect() 
calls; i.e. after the Reader/Writer tasks are started. I worry that this 
is a confusing point *within* this library.

Maybe it ought to report "true" as soon as we start trying to build a 
session; and I may need other internal helpers for testing more specific 
conditions inside the loop.


unconnected: I think this name is just genuinely bad, but I needed some 
kind of state to represent "fully quisced, not connected, and idle." 
i.e. that we were prepared and able to issue another accept() or 
connect() call.

"quiesced"? (Kind of jargon-y.)

"disconnected"? (Viable, but introduces ambiguity between 
fully-disconnected but waiting for the user to collect loop 
status/exceptions.)

"idle"? (Might be confused with a loop that's open.)


... Maybe it doesn't need to be externally exposed at all, anyway. The 
primary purpose for it is to safe-guard calls to connect() and accept() 
such that if the loop is engaged or not-yet-cleaned up, that these calls 
will bark an error back at the user.

Perhaps an internal property would suffice in that case.

_has_session True/False might suffice for this concept.

Thoughts?

>       |
>       |  ----------------------------------------------------------------------
>       |  Data descriptors inherited from qmp.protocol.AsyncProtocol:
>       |
>       |  __dict__
>       |      dictionary for instance variables (if defined)
>       |
>       |  __weakref__
>       |      list of weak references to the object (if defined)
>       |
>       |  ----------------------------------------------------------------------
>       |  Class methods inherited from typing.Generic:
>       |
>       |  __class_getitem__(params) from builtins.type
>       |
>       |  __init_subclass__(*args, **kwargs) from builtins.type
>       |      This method is called when a class is subclassed.
>       |
>       |      The default implementation does nothing. It may be
>       |      overridden to extend subclasses.
> 

 From here down, ...

> DATA
>      Awaitable = typing.Awaitable
>          A generic version of collections.abc.Awaitable.
>      
>      Callable = typing.Callable
>          Callable type; Callable[[int], str] is a function of (int) -> str.
>          
>          The subscription syntax must always be used with exactly two
>          values: the argument list and the return type.  The argument list
>          must be a list of types or ellipsis; the return type must be a single type.
>          
>          There is no syntax to indicate optional or keyword arguments,
>          such function types are rarely used as callback types.
>      
>      Dict = typing.Dict
>          A generic version of dict.
>      
>      List = typing.List
>          A generic version of list.
>      
>      Mapping = typing.Mapping
>          A generic version of collections.abc.Mapping.
>      
>      Optional = typing.Optional
>          Optional type.
>          
>          Optional[X] is equivalent to Union[X, None].
>      
>      Tuple = typing.Tuple
>          Tuple type; Tuple[X, Y] is the cross-product type of X and Y.
>          
>          Example: Tuple[T1, T2] is a tuple of two elements corresponding
>          to type variables T1 and T2.  Tuple[int, float, str] is a tuple
>          of an int, a float and a string.
>          
>          To specify a variable-length tuple of homogeneous type, use Tuple[T, ...].
> 

... Oops, I need to "hide" some more of those things from help, I think 
by specifying __all__ in that module I can restrict some of these things 
that aren't interesting to see in the help menu.

> FILE
>      /tmp/foo/qmp/qmp_protocol.py
> 
> 

In general, do you feel this design is roughly serviceable and worth 
pursuing cleanups for? I realize it's a bit "much" but as the audience 
extends beyond our castle walls, I wanted to be quite thorough. It's a 
design that's likely overkill for iotests, but hopefully just about 
correct for external users to prototype toy management scripts with.

At some point, I might try to get it checked in to the QEMU tree as an 
"alpha" library so that iterations on the design can be debated on their 
own merit instead of trying to update a giant new-code-blob. I am not 
sure if it's ready for that just yet, but I think it's close to that 
point where it needs to not live primarily in a separate repo anymore.

Thanks again,
-- John
Stefan Hajnoczi April 15, 2021, 9:52 a.m. UTC | #3
On Wed, Apr 14, 2021 at 03:17:48PM -0400, John Snow wrote:
> First and foremost, thank you for reviewing this! It is very helpful to me
> to see what others think of this pet project I've been growing in the
> privacy of my own mind.
> 
> On 4/14/21 2:38 AM, Stefan Hajnoczi wrote:
> > Below are the API docs that I found helpful for understanding the big
> > picture.
> > 
> > The QMP.execute() API is nice.
> > 
> 
> Yes. It mimics (sync) qmp.command(), which I believe Eduardo Habkost wrote.
> I think it's the correct idea for a generic (QAPI-schema ignorant) QMP
> client library meant to be "used".
> 
> I think raising RPC in-band execution errors as exceptions is a nice
> "pythonic" way to do it.
> 
> (And, if desired, it is possible to use the QAPI generator to generate
> wrappers around this interface using type-safe arguments in a low-level SDK
> layer. I think that would be pretty swell. We are not there yet, though, and
> I'll focus on this layer first.)
> 
> > Regarding QMP events, I can think of two approaches:
> > 1. Callbacks
> > 2. An async get_event(name=Optional[str]) -> object API
> >     (plus get_event_nowait(name=Optional[str]) -> object)
> > 
> > (There's probably a third approach using async iterators but it's
> > similar to get_event().)
> > 
> > Both approaches are useful. The first is good in larger asynchronous
> > applications that perform many tasks concurrently. The second is good
> > when there is just one specific thing to do, like waiting for a block
> > job to complete.
> > 
> (1) On callbacks:
> 
> Callbacks are what I meagerly mocked up; discord.py has a "cute" little hack
> that works like this:
> 
> bot = commands.bot(...)
> 
> @bot.event
> async def on_ready():
>     print("Logged in as")
>     print(bot.user.name)
>     ...
> 
> (See https://github.com/Rapptz/discord.py/blob/master/examples/basic_bot.py
> )
> 
> I find this to be extremely cute: the framework uses the name of the
> callback to determine which event you are registering, and uses the
> decorator to merely register the callback.
> 
> This makes a nice, uncomplicated way to plug coroutines into the state
> machine of the client loop in the most basic cases.
> 
> I thought it might be nice to try and mimic that design, by perhaps using
> the names of QMP events as their own 'queues', and then dispatching user
> callbacks as desired. (Possibly with one mega-queue that exists for ALL
> callbacks.)
> 
> For instance, something like this:
> 
> @qmp.event
> async def job_status_block_job_ready(qmp, event):
>     ...
> 
> or more generally,
> 
> @qmp.event_handler
> async def my_own_event_handler(qmp, event):
>     ...
> 
> I didn't spend much time on the actual queue or dispatch mechanism in my
> draft, though, but it could be "bolstered" into a more full-fledged API if
> desired.
> 
> One nice thing about this design is that events aren't "consumed" by a
> caller, they are just dispatched to anyone waiting on an event of that type.
> 
> As I recall, events getting "eaten" at the wrong time was a major burden
> when writing iotests that exercised multiple jobs, transactions, etc.
> 
> (A side note: a higher-level VM class that uses QMP may wish to capture
> certain events to record state changes, such that the state can be queried
> at an arbitrary point by any number of callers without needing to have
> witnessed the state change event personally. That's not really important
> here in the protocol library, though, which will pretend not to know which
> events exist -- but it's a consideration for making sure the design that IS
> chosen can be extensible to support that kind of thing.)
> 
> 
> (2) On get_event or async iterators:
> 
> This is likely a good ad-hoc feature. Should it only work for events that
> are delivered from that moment in time, or should there be a "backlog" of
> events to deliver?
> 
> Should waiting on events in this manner "consume" the event from the
> backlog, if we have one?
> 
> My concern::
> 
>   await qmp.execute('blockdev-backup', {...etc...})
>   async for event in qmp.get_events():
>       ...
> 
> 
> It's possible that an event we'd like to see has already occurred by the
> time we get around to invoking the async iterator. You'd really want to
> start checking for events *before* you issue the job request, but that
> involves tasks, and the code doesn't "flow" well anymore.
> 
> I don't have ideas, at-present, for how to make things like iotests "flow"
> well in a linear co-routine sense...
> 
> ...although, maybe it's worth creating something like an Event Listener
> object that, from its creation, stashes events from that point forward. How
> about::
> 
>   async with qmp.event_listener() as events:
>       await qmp.execute('blockdev-backup', {...})
>       async for event in events:
>           ...
> 
> Actually, that seems pretty cool. What do you think? I think it's fairly
> elegant for ad-hoc use. We could even extend the constructor to accept
> filtering criteria if we wanted to, later.

Yeah, it seems very nice for allowing multiple event listeners that
don't steal each other's events. I like it.

qmp.event_listener() could take a sequence of QMP event names to trigger
on. If the sequence is empty then all QMP events will be reported.

> 
> Possibly we could also augment the Event Listener object to support a few
> methods to facilitate blocking until a certain event occurs, like::
> 
>   async with qmp.event_listener() as events:
>       await qmp.execute('blockdev-backup', {...})
>       await events.event('JOB_STATUS_CHANGE', status="pending")
>       await qmp.execute('job-finalize', {...})
>       ...
> 
> 
> I think that's pretty workable, actually! And it could co-exist perfectly
> well alongside event callback handlers.

Callbacks and async iterators are equivalent since a callback is
basically a Task with an event_listener() loop that invokes the callback
function. If the boilerplate for setting that up is minimal then there
might be no need to provide both interfaces.

> Pydantic models are definitely optional at this stage, but I am floating
> them here to prepare people for the idea that I might try to get more
> mileage out of them in the future to offer a type-safe, QAPI-aware SDK
> layer.
> 
> They're definitely only a mild benefit here, for now, as the strict typing
> they help provide is not floated upwards or exposed to the user.

Yes, I can see the benefits for programs that need a lot of data
validation and have complex schemas.

Since this library is oblivious to the QMP schema it's probably not
needed.

An example of why I suggested dropping pydantic: I was trying to figure
out what happens if QMP is extended with new response fields. Will
pydantic raise an exception when it encounters an unexpected field? It's
not obvious from the code so I needed to go study pydantic to find the
answer.

> (4) On combining protocol and qmp_protocol:
> 
> Maybe. Do you want to look at the qtest implementation? It's somewhat
> ancillary to this project, but felt it would make a nice companion library.
> It doesn't benefit as strongly as QMP (As it does not offer anything like
> OOB), but it does have async messages it can send, so it can re-use the same
> infrastructure.
> 
> (Fully admit that the first draft, of course, did feature a combined
> protocol/qmp_protocol class. It was split out later.)

Sure, it would be interesting to see the qtest code.

> > Things that might be worth adding:
> > 1. File descriptor passing support.
> 
> Do you have an example workflow that I can use to test this? This is a weak
> spot in my knowledge.

The add-fd QMP command. I guess this patch series cannot execute that
command successfully since it doesn't support fd passing.

It should be easy to do:

  qmp.execute('add-fd', pass_fds=[fobj])

where pass_fds is an optional sequence of file descriptors. The file
descriptors can either be int or file-like objects that support the
fileno() method.

I'm not sure if QMP commands also send file descriptors back to the
client in responses.

> > 2. Introspection support to easily check if a command/feature is
> >     available. Users can do this manually by sending QMP commands and
> >     interpreting the response, but this may be common enough to warrant a
> >     friendly API.
> > 
> 
> I think this treads into QAPI-specific domain knowledge, and I might leave
> such features to a higher-level object.
> 
> The QMP spec itself does not define a mechanism by which the QMP protocol
> itself will reveal the valid commands, and so it might be up to a
> machine.py-based extension/capsulation of qmp_protocol to provide that.
> 
> (Though, I do agree; I want this feature somewhere. We do have such a thing
> coded into the existing qmp-shell tool, using the query-commands command.
> Maybe I can offer a subclass that offers some of these convenience features
> using a best-effort guess-and-check style introspection. Please forgive me
> if I focus on shoring up the design of the core implementation first.)

Okay.

> In general, do you feel this design is roughly serviceable and worth
> pursuing cleanups for? I realize it's a bit "much" but as the audience
> extends beyond our castle walls, I wanted to be quite thorough. It's a

I see the complexity mostly as accidental complexity, not essential
complexity. IMO it's not that the current approach is overkill now but
could be necessary later. I think it will always be unnecessarily
complex because there are simpler ways to do it :D.

Stefan
John Snow April 20, 2021, 2:26 a.m. UTC | #4
On 4/15/21 5:52 AM, Stefan Hajnoczi wrote:
> Yeah, it seems very nice for allowing multiple event listeners that
> don't steal each other's events. I like it.
> 
> qmp.event_listener() could take a sequence of QMP event names to trigger
> on. If the sequence is empty then all QMP events will be reported.

I made something like this:


# Example 1
with qmp.listener('STOP') as listener:
     await qmp.execute('stop')
     await listener.get()


# Example 2
with qmp.listener('JOB_STATUS_CHANGE') as listener:
     await qmp.execute('blockdev-create', ...)
     async for event in listener:
         if event['data']['status'] == 'concluded':
             break
     await qmp.execute('job-dismiss', ...)


# Example 3 - all events
with qmp.listener() as events:
     async for event in events:
         print(f"got '{event['event']}' event!")


# Example 4 - several events on one listener
job_events = (
     'JOB_STATUS_CHANGE', 'BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
     'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 'BLOCK_JOB_PENDING'
)
with qmp.listener(job_events) as events:
     ...


There is a *post-filtering* syntax available to EventListener.get(). It 
will filter events out using a very simplistic syntax.


# Example 5 -- a short-hand form of Example 2.
with qmp.listener('JOB_STATUS_CHANGE') as job_events:
     await qmp.execute('blockdev-create', ...)
     await job_events.get(status='concluded')
     await qmp.execute('job-dismiss', ...)



A shortcoming with this interface is that it's easy to create a listener 
that hears multiple events, but it's not easy to create *several 
listeners*. I am not sure what syntax will be the nicest for this, but I 
tried by allowing the manual creation of listeners:


# Example 6
listener1 = EventListener('JOB_STATUS_CHANGE')
listener2 = EventListener(job_events)

# Note the use of listen() instead of listener()
with qmp.listen(listener1, listener2) as (ev1, ev2):
     # listeners are now active.
     ...
# listeners are now inactive.
# The context manager clears any stale events in the listener(s).


I thought this might be nicer than trying to extend the listener syntax:

with qmp.listeners(
     'JOB_STATUS_CHANGE',
     (job_events)
) as (
     listener1,
     listener2,
):
     ...

especially because it might get confusing when trying to separate "one 
listener with multiple events" vs "several listeners with one event 
each, and it makes things a little ambiguous:

with qmp.listeners('STOP') as (stop_events,):
     ...

And this isn't any prettier, and also likely to confuse:

with qmp.listeners('STOP', 'RESUME') as (stops, resumes):
     ...

because it's only so very subtly different from this:

with qmp.listeners(('STOP', 'RESUME')) as (runstate_events,):
     ...

This also doesn't begin to address one of the worst headaches of writing 
iotests where transactions are involved: accidentally eating events 
meant for other jobs.

I prototyped something where it's possible to create an EventListener 
with an optional pre-filter, but it's a little bulky:


# Example 7
listener = EventListener('JOB_STATUS_CHANGE',
                          lambda e: e['data']['id'] == 'job0')

with qmp.listen(listener):
     await qmp.execute('blockdev-create', arguments={'job-id': 'job0'})
     await listener.get(status='created')
     ...


Some thoughts on this:
- Pre-filters are powerful, but involve a lot of boilerplate.
- Accepting two kinds of parameters, name(s) and filter both, makes it 
even trickier to write concise context blocks; especially with multiple 
jobs.


Here's a final example of something you may very well want to do in 
iotest code:


# Example 8

def job_filter(job_id: str) -> EventFilter:
     def filter(event: Message) -> bool:
         return event.get('data', {}).get('id') == job_id
     return filter

listener1 = EventListener('JOB_STATUS_CHANGE', job_filter('job0'))
listener2 = EventListener('JOB_STATUS_CHANGE', job_filter('job1'))

with qmp.listen(listener1, listener2) as (job0, job1):
     await asyncio.gather(
         qmp.execute('blockdev-create', arguments={'job-id': 'job0'}),
         qmp.execute('blockdev-create', arguments={'job-id': 'job1'}),
         job0.get(status='concluded'),
         job1.get(status='concluded')
     )

(Note: gather isn't required here. You could write the execute and get 
statements individually and in whichever order you wanted, as long as 
the execute statement for a given job appears prior to the corresponding 
wait!)

The difficulty I have here is extending that backwards to the "create 
listener on the fly" syntax, for the reasons stated above with making it 
ambiguous as to whether we're creating one or two listeners, etc. Trying 
to minimize boilerplate while leaving the interfaces generic and 
powerful is tough.

I'm still playing around with different options and solutions, but your 
feedback/input is welcome.

--js
John Snow April 20, 2021, 2:47 a.m. UTC | #5
On 4/19/21 10:26 PM, John Snow wrote:
> On 4/15/21 5:52 AM, Stefan Hajnoczi wrote:
>> Yeah, it seems very nice for allowing multiple event listeners that
>> don't steal each other's events. I like it.
>>
>> qmp.event_listener() could take a sequence of QMP event names to trigger
>> on. If the sequence is empty then all QMP events will be reported.
> 
> I made something like this:
> 
> 
> # Example 1
> with qmp.listener('STOP') as listener:
>      await qmp.execute('stop')
>      await listener.get()
> 
> 
> # Example 2
> with qmp.listener('JOB_STATUS_CHANGE') as listener:
>      await qmp.execute('blockdev-create', ...)
>      async for event in listener:
>          if event['data']['status'] == 'concluded':
>              break
>      await qmp.execute('job-dismiss', ...)
> 
> 
> # Example 3 - all events
> with qmp.listener() as events:
>      async for event in events:
>          print(f"got '{event['event']}' event!")
> 
> 
> # Example 4 - several events on one listener
> job_events = (
>      'JOB_STATUS_CHANGE', 'BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
>      'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 'BLOCK_JOB_PENDING'
> )
> with qmp.listener(job_events) as events:
>      ...
> 
> 
> There is a *post-filtering* syntax available to EventListener.get(). It 
> will filter events out using a very simplistic syntax.
> 
> 
> # Example 5 -- a short-hand form of Example 2.
> with qmp.listener('JOB_STATUS_CHANGE') as job_events:
>      await qmp.execute('blockdev-create', ...)
>      await job_events.get(status='concluded')
>      await qmp.execute('job-dismiss', ...)
> 
> 
> 
> A shortcoming with this interface is that it's easy to create a listener 
> that hears multiple events, but it's not easy to create *several 
> listeners*. I am not sure what syntax will be the nicest for this, but I 
> tried by allowing the manual creation of listeners:
> 
> 
> # Example 6
> listener1 = EventListener('JOB_STATUS_CHANGE')
> listener2 = EventListener(job_events)
> 
> # Note the use of listen() instead of listener()
> with qmp.listen(listener1, listener2) as (ev1, ev2):
>      # listeners are now active.
>      ...
> # listeners are now inactive.
> # The context manager clears any stale events in the listener(s).
> 
> 
> I thought this might be nicer than trying to extend the listener syntax:
> 
> with qmp.listeners(
>      'JOB_STATUS_CHANGE',
>      (job_events)
> ) as (
>      listener1,
>      listener2,
> ):
>      ...
> 
> especially because it might get confusing when trying to separate "one 
> listener with multiple events" vs "several listeners with one event 
> each, and it makes things a little ambiguous:
> 
> with qmp.listeners('STOP') as (stop_events,):
>      ...
> 
> And this isn't any prettier, and also likely to confuse:
> 
> with qmp.listeners('STOP', 'RESUME') as (stops, resumes):
>      ...
> 
> because it's only so very subtly different from this:
> 
> with qmp.listeners(('STOP', 'RESUME')) as (runstate_events,):
>      ...
> 
> This also doesn't begin to address one of the worst headaches of writing 
> iotests where transactions are involved: accidentally eating events 
> meant for other jobs.
> 
> I prototyped something where it's possible to create an EventListener 
> with an optional pre-filter, but it's a little bulky:
> 
> 
> # Example 7
> listener = EventListener('JOB_STATUS_CHANGE',
>                           lambda e: e['data']['id'] == 'job0')
> 
> with qmp.listen(listener):
>      await qmp.execute('blockdev-create', arguments={'job-id': 'job0'})
>      await listener.get(status='created')
>      ...
> 
> 
> Some thoughts on this:
> - Pre-filters are powerful, but involve a lot of boilerplate.
> - Accepting two kinds of parameters, name(s) and filter both, makes it 
> even trickier to write concise context blocks; especially with multiple 
> jobs.
> 
> 
> Here's a final example of something you may very well want to do in 
> iotest code:
> 
> 
> # Example 8
> 
> def job_filter(job_id: str) -> EventFilter:
>      def filter(event: Message) -> bool:
>          return event.get('data', {}).get('id') == job_id
>      return filter
> 
> listener1 = EventListener('JOB_STATUS_CHANGE', job_filter('job0'))
> listener2 = EventListener('JOB_STATUS_CHANGE', job_filter('job1'))
> 
> with qmp.listen(listener1, listener2) as (job0, job1):
>      await asyncio.gather(
>          qmp.execute('blockdev-create', arguments={'job-id': 'job0'}),
>          qmp.execute('blockdev-create', arguments={'job-id': 'job1'}),
>          job0.get(status='concluded'),
>          job1.get(status='concluded')
>      )
> 
> (Note: gather isn't required here. You could write the execute and get 
> statements individually and in whichever order you wanted, as long as 
> the execute statement for a given job appears prior to the corresponding 
> wait!)
> 
> The difficulty I have here is extending that backwards to the "create 
> listener on the fly" syntax, for the reasons stated above with making it 
> ambiguous as to whether we're creating one or two listeners, etc. Trying 
> to minimize boilerplate while leaving the interfaces generic and 
> powerful is tough.
> 
> I'm still playing around with different options and solutions, but your 
> feedback/input is welcome.
> 
> --js


Oh, though of course, the moment I sent this, I realized there is 
actually a somewhat nicer way to do this in non-test code that doesn't 
care about ordering, but still wouldn't work for QMP transactions; but 
it's nice to look at:

# Example 9 -- Multiple jobs without a transaction:

async def blockdev_create(qmp, job_id: str, options: Dict[str, Any]):
     with qmp.listener('JOB_STATUS_CHANGE') as listener:
         await qmp.execute('blockdev-create', arguments={
             'job-id': job_id,
             'options': options,
         })
         await listener.get(id=job_id, status='concluded')
         await qmp.execute('job-dismiss', arguments={'id': job_id})
         await listener.get(id=job_id, status='null')

await asyncio.gather(
     blockdev_create(qmp, 'job2', {...}),
     blockdev_create(qmp, 'job3', {...}),
)

It won't work for transactions because we spawn multiple IDs with a 
single command in a single context. You could remedy it by creating 
multiple listeners and just being very careful to always use just one 
per each job, but that's likely prone to failure and hard to catch on 
reviews, etc.

--js