import enum
import asyncio
from .asynchelper import concurrently
[docs]class Subprocess(enum.Enum):
CREATED = "created"
COMPLETED = "completed"
STDOUT_READLINE = "stdout.readline"
STDERR_READLINE = "stderr.readline"
async def exec_subprocess(cmd, **kwargs):
kwargs = {
"stdout": asyncio.subprocess.PIPE,
"stderr": asyncio.subprocess.PIPE,
**kwargs,
}
proc = await asyncio.create_subprocess_exec(*cmd, **kwargs)
yield Subprocess.CREATED, proc
work = {
asyncio.create_task(proc.wait()): "wait",
}
for output in ["stdout", "stderr"]:
if output in kwargs and kwargs[output] is asyncio.subprocess.PIPE:
coro = getattr(proc, output).readline()
task = asyncio.create_task(coro)
work[task] = f"{output}.readline"
async for event, result in concurrently(work):
if event.endswith("readline"):
# Yield line to caller
yield Subprocess[event.replace(".", "_").upper()], result
# Read another line if that's the event
coro = getattr(proc, event.split(".")[0]).readline()
task = asyncio.create_task(coro)
work[task] = event
else:
# When wait() returns process has exited
break
# Yield when process exits
yield Subprocess.COMPLETED, proc.returncode
[docs]async def run_command(cmd, logger=None, **kwargs):
r"""
Run a command using :py:func:`asyncio.create_subprocess_exec`.
If ``logger`` is supplied, write stdout and stderr to logger debug.
``kwargs`` are passed to :py:func:`asyncio.create_subprocess_exec`, except
for stdout and stderr which are pipes used for logging.
Examples
--------
>>> import sys
>>> import asyncio
>>> import logging
>>>
>>> import dffml
>>>
>>> logging.basicConfig(level=logging.DEBUG)
>>> logger = logging.getLogger("mylogger")
>>>
>>> asyncio.run(dffml.run_command([
... sys.executable, "-c", "print('Hello World')"
... ], logger=logger))
You should see "Hello World" in the logging output
.. code-block::
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:mylogger:Running ['/usr/bin/python3.7', '-c', "print('Hello World')"], {'stdout': -1, 'stderr': -1}
DEBUG:mylogger:['/usr/bin/python3.7', '-c', "print('Hello World')"]: stdout.readline: Hello World
DEBUG:mylogger:['/usr/bin/python3.7', '-c', "print('Hello World')"]: stderr.readline:
"""
# Combination of stdout and stderr
output = []
if logger is not None:
logger.debug(f"Running {cmd}, {kwargs}")
async for event, result in exec_subprocess(cmd, **kwargs):
if event == Subprocess.CREATED:
# Set proc when created
proc = result
elif event in [Subprocess.STDOUT_READLINE, Subprocess.STDERR_READLINE]:
# Log line read
if logger is not None:
logger.debug(f"{cmd}: {event}: {result.decode().rstrip()}")
# Append to output in case of error
output.append(result)
# Raise if anything goes wrong
elif event == Subprocess.COMPLETED and result != 0:
raise RuntimeError(repr(cmd) + ": " + b"\n".join(output).decode())