diff mbox series

qemu/machine: add asyncio process-management demo

Message ID 20221202205744.591462-1-jsnow@redhat.com (mailing list archive)
State New, archived
Headers show
Series qemu/machine: add asyncio process-management demo | expand

Commit Message

John Snow Dec. 2, 2022, 8:57 p.m. UTC
This is just an RFC to show a technique for how to watch terminal input
using asyncio. This is just a *demo* and elides many things for
simplicity; namely I don't create a QMP monitor nor a guest console
socket.

What I really wanted feedback on was an API for consuming information
from an async stream. In this demo, that's QEMU's terminal output
(stdout, stderr -- usually pretty minimal information in most
circumstances except in error cases) but the same techniques may apply
to guest console output, too.

Please see comments and docstrings inline for RFC questions and other
observations.

To test it, navigate to qemu.git/python, and then invoke e.g.;

> python3 -m qemu.machine.demo /usr/bin/qemu-system-x86_64 --help

(Note, this demo requires Python 3.7+; not as a fundamental necessity,
but 3.6 requires some ifdef style tricks to run that I didn't bother to
code in for a quick demo. Sorry.)

or try it with a bad flag on purpose (--frobnozz), no flag at all, or
try with *any* executable of your choice to see how stdout/stderr, return
codes and newline terminations are handled.

Also try:

> python3 -m qemu.machine.demo /usr/bin/echo -n "no newline?"
> python3 -m qemu.machine.demo /usr/bin/yes "I hate the antichrist"

One huge caveat:

I would *really have liked* to offer an interface that's exactly
equivalent to asyncio.StreamReader() that would allow a user to
asynchronously *consume* data using the same API that Python
offers. However, I could not find a way to provide a kind of
"pass-through" that also provided logging, logging-to-file, in-memory
buffering, etc.

I briefly experimented with using an actual pipe via os.pipe() for which
I could asynchronously read and write, but the overhead of asyncio
machinery here felt a bit high and perhaps simultaneously overkill and
porcelain. It's possible I'm missing an easier way to provide this type
of feature, but it seems like the sort of thing we might want for
e.g. asynchronously watching for lines from a guest console.

I'm not sure if it will be possible to add. If there's interest and it
seems worth pursuing, I'll try to push on it. Otherwise, maybe what we
have here is "good enough"?

*shrug*

Anyway, thanks!

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/qemu/machine/demo.py | 276 ++++++++++++++++++++++++++++++++++++
 1 file changed, 276 insertions(+)
 create mode 100644 python/qemu/machine/demo.py

Comments

John Snow Dec. 3, 2022, 12:20 a.m. UTC | #1
On Fri, Dec 2, 2022 at 3:57 PM John Snow <jsnow@redhat.com> wrote:
>
> This is just an RFC to show a technique for how to watch terminal input
> using asyncio. This is just a *demo* and elides many things for
> simplicity; namely I don't create a QMP monitor nor a guest console
> socket.
>
> What I really wanted feedback on was an API for consuming information
> from an async stream. In this demo, that's QEMU's terminal output
> (stdout, stderr -- usually pretty minimal information in most
> circumstances except in error cases) but the same techniques may apply
> to guest console output, too.
>
> Please see comments and docstrings inline for RFC questions and other
> observations.
>
> To test it, navigate to qemu.git/python, and then invoke e.g.;
>
> > python3 -m qemu.machine.demo /usr/bin/qemu-system-x86_64 --help
>
> (Note, this demo requires Python 3.7+; not as a fundamental necessity,
> but 3.6 requires some ifdef style tricks to run that I didn't bother to
> code in for a quick demo. Sorry.)
>
> or try it with a bad flag on purpose (--frobnozz), no flag at all, or
> try with *any* executable of your choice to see how stdout/stderr, return
> codes and newline terminations are handled.
>
> Also try:
>
> > python3 -m qemu.machine.demo /usr/bin/echo -n "no newline?"
> > python3 -m qemu.machine.demo /usr/bin/yes "I hate the antichrist"

*cough* this is a bit of an inside joke that I forgot to edit out of my draft.
Context is https://i.kym-cdn.com/photos/images/original/002/368/100/427
(Does that help? No? Oh well.)

>
> One huge caveat:
>
> I would *really have liked* to offer an interface that's exactly
> equivalent to asyncio.StreamReader() that would allow a user to
> asynchronously *consume* data using the same API that Python
> offers. However, I could not find a way to provide a kind of
> "pass-through" that also provided logging, logging-to-file, in-memory
> buffering, etc.
>
> I briefly experimented with using an actual pipe via os.pipe() for which
> I could asynchronously read and write, but the overhead of asyncio
> machinery here felt a bit high and perhaps simultaneously overkill and
> porcelain. It's possible I'm missing an easier way to provide this type
> of feature, but it seems like the sort of thing we might want for
> e.g. asynchronously watching for lines from a guest console.
>
> I'm not sure if it will be possible to add. If there's interest and it
> seems worth pursuing, I'll try to push on it. Otherwise, maybe what we
> have here is "good enough"?
>
> *shrug*
>
> Anyway, thanks!
>
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/qemu/machine/demo.py | 276 ++++++++++++++++++++++++++++++++++++
>  1 file changed, 276 insertions(+)
>  create mode 100644 python/qemu/machine/demo.py
>
> diff --git a/python/qemu/machine/demo.py b/python/qemu/machine/demo.py
> new file mode 100644
> index 00000000000..329c2fc5532
> --- /dev/null
> +++ b/python/qemu/machine/demo.py
> @@ -0,0 +1,276 @@
> +"""
> +This module is a quick demonstration to show how to asynchronously
> +watch terminal output using Python asyncio, and how it might relate to
> +improving QEMUMachine.
> +
> +This demo does NOT include console output nor a QMP monitor, but the
> +techniques being applied here might be applied to guest console
> +interactions.
> +"""
> +
> +import asyncio
> +import io
> +import locale
> +import logging
> +import os
> +from pathlib import Path
> +import resource
> +import sys
> +from typing import (
> +    Any,
> +    BinaryIO,
> +    List,
> +    Optional,
> +    Union,
> +)
> +
> +
> +class StreamWatcher:
> +    """
> +    StreamWatcher is a bit of a quick hack that consumes data (as
> +    bytes) from an asyncio.StreamReader and relays it to several other
> +    sources concurrently.
> +
> +    Conceptually, it's kind of like "tee": data from one pipe is
> +    forwarded to several.
> +
> +    A single instance of this class watches either stdout or
> +    stderr. (A user could combine these streams and then a single
> +    instance of this class could watch both.)
> +
> +    The data stream being watched is forwarded to three destinations:
> +
> +    (1) BytesIO (In-memory buffer)
> +        --------------------------
> +
> +        The data is buffered directly into a BytesIO object. This isn't
> +        used to do anything further, but a caller could get the entire
> +        stream (so far) at any time. This mimics the "iolog" property of
> +        QEMUMachine, which we have used in the past to show the console
> +        output on various error conditions or to assert that certain
> +        patterns have occurred in iotests.
> +
> +        This can grow without bound, which might be a bad idea. It seems
> +        convenient to have on by default, but for more serious uses, you
> +        may want to actually disable this. Maybe it'd be useful to be
> +        able to configure which sources you want active by default.
> +
> +    (2) External logfile
> +        ----------------
> +
> +        Data is written byte-for-byte into an external logfile. In this
> +        demo, StreamWatcher does not own the file object so that two
> +        StreamWatchers can share the same logfile -- so that a
> +        stderr-watcher and a stdout-watcher can log to the same file.
> +
> +        This destination adds the "logfile" and "flush" parameters to
> +        the initializer; flush=True can be used for the stderr-watcher
> +        if desired to flush the file to disk without waiting for a
> +        newline.
> +
> +        This might be a bit extraneous since we already have an
> +        in-memory log, but I added it here purely because if all else
> +        fails -- if we don't ever print out the in-memory buffer and we
> +        don't enable logging -- we can likely rely on a good
> +        old-fashioned solid bohr-model file.
> +
> +    (3) Python Logging Interface
> +        ------------------------
> +
> +        Data is manually re-buffered and when a newline is encountered,
> +        the buffer is flushed into the Python logging subsystem. This
> +        may mean that if terminal output is not terminated with a
> +        newline, we may hang onto it in the buffer. When EOF is
> +        encountered, any remaining information in the buffer is flushed
> +        with a newline marker inserted to indicate that a newline was
> +        not actually seen. (This is how FiSH seems to handle it, and I
> +        like it.)
> +
> +        This destination adds two more parameters: logger and level. The
> +        logger is the Logger instance to log to, while the level
> +        determines which logging level to use for messages in this
> +        stream. In this demo, I use "INFO" for stdout and "WARNING" for
> +        stderr. If the Python logging subsystem is not configured, the
> +        default behavior is to hide "INFO" messages but to show
> +        "WARNING" messages. This might be the most useful behavior for
> +        helping to surface potential errors, but it's possible it will
> +        be a pain for certain kinds of iotesting.
> +    """
> +    # pylint: disable=too-few-public-methods
> +    def __init__(
> +            self,
> +            pipe: asyncio.StreamReader,
> +            logger: logging.Logger,
> +            level: int,
> +            logfile: BinaryIO,
> +            flush: bool = False):
> +        self.pipe = pipe
> +        self.logfile = logfile
> +        self.logger = logger
> +        self.level = level
> +        self.flush = flush
> +
> +        self.data = io.BytesIO()
> +        self.buffer = bytearray()
> +
> +        # We need an encoding for whatever we're watching.
> +        # For console output, assume it's whatever our locale says.
> +        # If this guess is wrong, go ahead and change it, pal.
> +        _, encoding = locale.getlocale()
> +        self.encoding = encoding or 'UTF-8'
> +
> +    async def run(self) -> None:
> +        """
> +        Run forever, waiting for new data.
> +
> +        When the stream hits EOF, return.
> +        """
> +        self.logger.debug("StreamWatcher starting")
> +        pagesize = resource.getpagesize()
> +        while True:
> +            data = await self.pipe.read(pagesize)
> +            await self._handle_data(data)
> +            if not data:
> +                break
> +        self.logger.debug("StreamWatcher exiting")
> +
> +    async def _handle_data(self, data: bytes) -> None:
> +        # Destination A: Internal line-based buffer
> +        await self._buffer_data(data)
> +
> +        # Destination B: Internal byte-based log
> +        self.data.write(data)
> +
> +        # Destination C: External logfile
> +        self.logfile.write(data)
> +        if self.flush:
> +            self.logfile.flush()
> +
> +    async def _buffer_data(self, data: bytes) -> None:
> +        self.buffer.extend(data)
> +        if not self.buffer:
> +            return
> +
> +        lines = self.buffer.split(b'\n')
> +        if lines[-1]:  # trailing line was not (yet) terminated
> +            self.buffer = lines[-1]
> +        else:
> +            self.buffer.clear()
> +
> +        for line in lines[:-1]:
> +            await self._handle_line(
> +                line.decode(self.encoding, errors='replace'))
> +
> +        if data == b'' and self.buffer:
> +            # EOL; flush the remainder of the buffer.
> +            await self._handle_line(
> +                self.buffer.decode(self.encoding, errors='replace') + '⏎')
> +
> +    async def _handle_line(self, line: str) -> None:
> +        self.logger.log(self.level, line)
> +
> +
> +class ExecManager:
> +    """
> +    Simple demo for executing a child process while gathering its output.
> +    """
> +    logger = logging.getLogger(__name__)
> +
> +    def __init__(self) -> None:
> +        self.process: Optional[asyncio.subprocess.Process] = None
> +        self.log: Optional[BinaryIO] = None
> +        self.stdout: Optional[StreamWatcher] = None
> +        self.stderr: Optional[StreamWatcher] = None
> +        self._tasks: List[asyncio.Task[Any]] = []
> +
> +    async def launch(self, binary: Union[str, Path], *args: str) -> None:
> +        """Launch the executable, but don't wait for it."""
> +        self.logger.debug("launching '%s'", binary)
> +        self.logger.debug("%s", ' '.join((str(binary),) + args))
> +        self.process = await asyncio.create_subprocess_exec(
> +            str(binary),
> +            *args,
> +            stdin=asyncio.subprocess.DEVNULL,
> +            stdout=asyncio.subprocess.PIPE,
> +            stderr=asyncio.subprocess.PIPE,
> +        )
> +        # Type hints for mypy
> +        assert self.process.stdout is not None
> +        assert self.process.stderr is not None
> +
> +        self.log = open("qemu.log", "wb")
> +        self.stdout = StreamWatcher(
> +            self.process.stdout,
> +            self.logger.getChild('stdout'),
> +            logging.INFO,
> +            self.log)
> +        self.stderr = StreamWatcher(
> +            self.process.stderr,
> +            self.logger.getChild('stderr'),
> +            logging.WARNING,
> +            self.log,
> +            flush=True)
> +        self._tasks.append(asyncio.create_task(self.stdout.run()))
> +        self._tasks.append(asyncio.create_task(self.stderr.run()))
> +
> +    async def wait(self) -> None:
> +        """Wait for the process and all watchers to finish."""
> +        if self.process is None:
> +            raise Exception("Nothing's running, pal!")
> +        self.logger.debug("bundling reader tasks and process waiter ...")
> +        task = asyncio.gather(
> +            *self._tasks,
> +            self.process.wait(),
> +            return_exceptions=True,
> +        )
> +        # return_exceptions=True means that if any coroutine raises an
> +        # exception, all other coroutines will be cancelled and waited on.
> +        # Without this, the other coroutines continue to run after first
> +        # exception.
> +        self.logger.debug("waiting on bundled task ...")
> +        await task
> +        self.logger.debug("bundled task done.")
> +        if self.log is not None:
> +            self.logger.debug("closing logfile")
> +            self.log.close()
> +            if self.process.returncode == 0:
> +                self.logger.debug("No errors detected; deleting qemu.log")
> +                os.unlink("qemu.log")
> +            else:
> +                self.logger.debug(
> +                    "Process returned non-zero returncode, keeping qemu.log")
> +            self.log = None
> +
> +
> +async def main(binary: str, *args: str) -> int:
> +    """Run a subprocess, print out some stuff, have a good time."""
> +    logging.basicConfig(level=logging.DEBUG)
> +    proc = ExecManager()
> +    await proc.launch(binary, *args)
> +    assert proc.stdout is not None
> +    assert proc.stderr is not None
> +    logging.debug("process launched; waiting on termination")
> +    await proc.wait()
> +    logging.debug("process terminated.")
> +
> +    stdout = proc.stdout.data.getvalue()
> +    if stdout:
> +        print("========== stdout ==========")
> +        print(stdout.decode(proc.stdout.encoding), end='')
> +        print("=" * 80)
> +
> +    stderr = proc.stderr.data.getvalue()
> +    if stderr:
> +        print("========== stderr ==========")
> +        print(stderr.decode(proc.stderr.encoding), end='')
> +        print("=" * 80)
> +
> +    assert proc.process is not None
> +    assert proc.process.returncode is not None
> +    print(f"process returncode was {proc.process.returncode}")
> +    print("OK, seeya!")
> +    return proc.process.returncode
> +
> +
> +if __name__ == '__main__':
> +    sys.exit(asyncio.run(main(*sys.argv[1:])))
> --
> 2.38.1
>
diff mbox series

Patch

diff --git a/python/qemu/machine/demo.py b/python/qemu/machine/demo.py
new file mode 100644
index 00000000000..329c2fc5532
--- /dev/null
+++ b/python/qemu/machine/demo.py
@@ -0,0 +1,276 @@ 
+"""
+This module is a quick demonstration to show how to asynchronously
+watch terminal output using Python asyncio, and how it might relate to
+improving QEMUMachine.
+
+This demo does NOT include console output nor a QMP monitor, but the
+techniques being applied here might be applied to guest console
+interactions.
+"""
+
+import asyncio
+import io
+import locale
+import logging
+import os
+from pathlib import Path
+import resource
+import sys
+from typing import (
+    Any,
+    BinaryIO,
+    List,
+    Optional,
+    Union,
+)
+
+
+class StreamWatcher:
+    """
+    StreamWatcher is a bit of a quick hack that consumes data (as
+    bytes) from an asyncio.StreamReader and relays it to several other
+    sources concurrently.
+
+    Conceptually, it's kind of like "tee": data from one pipe is
+    forwarded to several.
+
+    A single instance of this class watches either stdout or
+    stderr. (A user could combine these streams and then a single
+    instance of this class could watch both.)
+
+    The data stream being watched is forwarded to three destinations:
+
+    (1) BytesIO (In-memory buffer)
+        --------------------------
+
+        The data is buffered directly into a BytesIO object. This isn't
+        used to do anything further, but a caller could get the entire
+        stream (so far) at any time. This mimics the "iolog" property of
+        QEMUMachine, which we have used in the past to show the console
+        output on various error conditions or to assert that certain
+        patterns have occurred in iotests.
+
+        This can grow without bound, which might be a bad idea. It seems
+        convenient to have on by default, but for more serious uses, you
+        may want to actually disable this. Maybe it'd be useful to be
+        able to configure which sources you want active by default.
+
+    (2) External logfile
+        ----------------
+
+        Data is written byte-for-byte into an external logfile. In this
+        demo, StreamWatcher does not own the file object so that two
+        StreamWatchers can share the same logfile -- so that a
+        stderr-watcher and a stdout-watcher can log to the same file.
+
+        This destination adds the "logfile" and "flush" parameters to
+        the initializer; flush=True can be used for the stderr-watcher
+        if desired to flush the file to disk without waiting for a
+        newline.
+
+        This might be a bit extraneous since we already have an
+        in-memory log, but I added it here purely because if all else
+        fails -- if we don't ever print out the in-memory buffer and we
+        don't enable logging -- we can likely rely on a good
+        old-fashioned solid bohr-model file.
+
+    (3) Python Logging Interface
+        ------------------------
+
+        Data is manually re-buffered and when a newline is encountered,
+        the buffer is flushed into the Python logging subsystem. This
+        may mean that if terminal output is not terminated with a
+        newline, we may hang onto it in the buffer. When EOF is
+        encountered, any remaining information in the buffer is flushed
+        with a newline marker inserted to indicate that a newline was
+        not actually seen. (This is how FiSH seems to handle it, and I
+        like it.)
+
+        This destination adds two more parameters: logger and level. The
+        logger is the Logger instance to log to, while the level
+        determines which logging level to use for messages in this
+        stream. In this demo, I use "INFO" for stdout and "WARNING" for
+        stderr. If the Python logging subsystem is not configured, the
+        default behavior is to hide "INFO" messages but to show
+        "WARNING" messages. This might be the most useful behavior for
+        helping to surface potential errors, but it's possible it will
+        be a pain for certain kinds of iotesting.
+    """
+    # pylint: disable=too-few-public-methods
+    def __init__(
+            self,
+            pipe: asyncio.StreamReader,
+            logger: logging.Logger,
+            level: int,
+            logfile: BinaryIO,
+            flush: bool = False):
+        self.pipe = pipe
+        self.logfile = logfile
+        self.logger = logger
+        self.level = level
+        self.flush = flush
+
+        self.data = io.BytesIO()
+        self.buffer = bytearray()
+
+        # We need an encoding for whatever we're watching.
+        # For console output, assume it's whatever our locale says.
+        # If this guess is wrong, go ahead and change it, pal.
+        _, encoding = locale.getlocale()
+        self.encoding = encoding or 'UTF-8'
+
+    async def run(self) -> None:
+        """
+        Run forever, waiting for new data.
+
+        When the stream hits EOF, return.
+        """
+        self.logger.debug("StreamWatcher starting")
+        pagesize = resource.getpagesize()
+        while True:
+            data = await self.pipe.read(pagesize)
+            await self._handle_data(data)
+            if not data:
+                break
+        self.logger.debug("StreamWatcher exiting")
+
+    async def _handle_data(self, data: bytes) -> None:
+        # Destination A: Internal line-based buffer
+        await self._buffer_data(data)
+
+        # Destination B: Internal byte-based log
+        self.data.write(data)
+
+        # Destination C: External logfile
+        self.logfile.write(data)
+        if self.flush:
+            self.logfile.flush()
+
+    async def _buffer_data(self, data: bytes) -> None:
+        self.buffer.extend(data)
+        if not self.buffer:
+            return
+
+        lines = self.buffer.split(b'\n')
+        if lines[-1]:  # trailing line was not (yet) terminated
+            self.buffer = lines[-1]
+        else:
+            self.buffer.clear()
+
+        for line in lines[:-1]:
+            await self._handle_line(
+                line.decode(self.encoding, errors='replace'))
+
+        if data == b'' and self.buffer:
+            # EOL; flush the remainder of the buffer.
+            await self._handle_line(
+                self.buffer.decode(self.encoding, errors='replace') + '⏎')
+
+    async def _handle_line(self, line: str) -> None:
+        self.logger.log(self.level, line)
+
+
+class ExecManager:
+    """
+    Simple demo for executing a child process while gathering its output.
+    """
+    logger = logging.getLogger(__name__)
+
+    def __init__(self) -> None:
+        self.process: Optional[asyncio.subprocess.Process] = None
+        self.log: Optional[BinaryIO] = None
+        self.stdout: Optional[StreamWatcher] = None
+        self.stderr: Optional[StreamWatcher] = None
+        self._tasks: List[asyncio.Task[Any]] = []
+
+    async def launch(self, binary: Union[str, Path], *args: str) -> None:
+        """Launch the executable, but don't wait for it."""
+        self.logger.debug("launching '%s'", binary)
+        self.logger.debug("%s", ' '.join((str(binary),) + args))
+        self.process = await asyncio.create_subprocess_exec(
+            str(binary),
+            *args,
+            stdin=asyncio.subprocess.DEVNULL,
+            stdout=asyncio.subprocess.PIPE,
+            stderr=asyncio.subprocess.PIPE,
+        )
+        # Type hints for mypy
+        assert self.process.stdout is not None
+        assert self.process.stderr is not None
+
+        self.log = open("qemu.log", "wb")
+        self.stdout = StreamWatcher(
+            self.process.stdout,
+            self.logger.getChild('stdout'),
+            logging.INFO,
+            self.log)
+        self.stderr = StreamWatcher(
+            self.process.stderr,
+            self.logger.getChild('stderr'),
+            logging.WARNING,
+            self.log,
+            flush=True)
+        self._tasks.append(asyncio.create_task(self.stdout.run()))
+        self._tasks.append(asyncio.create_task(self.stderr.run()))
+
+    async def wait(self) -> None:
+        """Wait for the process and all watchers to finish."""
+        if self.process is None:
+            raise Exception("Nothing's running, pal!")
+        self.logger.debug("bundling reader tasks and process waiter ...")
+        task = asyncio.gather(
+            *self._tasks,
+            self.process.wait(),
+            return_exceptions=True,
+        )
+        # return_exceptions=True means that if any coroutine raises an
+        # exception, all other coroutines will be cancelled and waited on.
+        # Without this, the other coroutines continue to run after first
+        # exception.
+        self.logger.debug("waiting on bundled task ...")
+        await task
+        self.logger.debug("bundled task done.")
+        if self.log is not None:
+            self.logger.debug("closing logfile")
+            self.log.close()
+            if self.process.returncode == 0:
+                self.logger.debug("No errors detected; deleting qemu.log")
+                os.unlink("qemu.log")
+            else:
+                self.logger.debug(
+                    "Process returned non-zero returncode, keeping qemu.log")
+            self.log = None
+
+
+async def main(binary: str, *args: str) -> int:
+    """Run a subprocess, print out some stuff, have a good time."""
+    logging.basicConfig(level=logging.DEBUG)
+    proc = ExecManager()
+    await proc.launch(binary, *args)
+    assert proc.stdout is not None
+    assert proc.stderr is not None
+    logging.debug("process launched; waiting on termination")
+    await proc.wait()
+    logging.debug("process terminated.")
+
+    stdout = proc.stdout.data.getvalue()
+    if stdout:
+        print("========== stdout ==========")
+        print(stdout.decode(proc.stdout.encoding), end='')
+        print("=" * 80)
+
+    stderr = proc.stderr.data.getvalue()
+    if stderr:
+        print("========== stderr ==========")
+        print(stderr.decode(proc.stderr.encoding), end='')
+        print("=" * 80)
+
+    assert proc.process is not None
+    assert proc.process.returncode is not None
+    print(f"process returncode was {proc.process.returncode}")
+    print("OK, seeya!")
+    return proc.process.returncode
+
+
+if __name__ == '__main__':
+    sys.exit(asyncio.run(main(*sys.argv[1:])))