This section contains reference documentation for the
sloop Python module.
class Break(Exception)
Raised by run_until_complete when simulation was interrupted, e.g.
when the end-user issued the break CLI command.
class Error(Exception)
Raised for incorrect usage of sloop
def get_running_loop()
Like asyncio.get_running_loop, but must be run from a task
created by Sloop's create_task.
def global_event_loop()
The default sloop event loop
def run_until_complete(fut)
Similar to asyncio.EventLoop.run_until_complete, but runs
simulation until a coroutine, future or task is done. If the
argument is a coroutine, then the coroutine is scheduled as a task
within the global_event_loop sloop event loop. May only be called
while simulation is stopped, from Global Context.
def create_task(coro, *, name=None)
Like asyncio.create_task, create a task in the currently
running sloop event loop. Raises RuntimeError if not called
from a sloop corotuine.
def call_soon(fun, *args)
Like asyncio.call_soon, but must be run from a sloop task
def wrap_future(fut)
Similar to asyncio.wrap_future, wrap a future-like object
so it can be awaited within a sloop loop.
Mainly useful to wrap the results of higher-order asyncio coroutines,
e.g. sloop.wrap_future(asyncio.gather(...)) or map(sloop.wrap_future, asyncio.as_completed(...)).
Raises RuntimeError if not called
from a sloop corotuine.
async def wait_call(fun, *args)
Similar to asyncio.to_thread, run a slow function
outside the Sloop event loop and await the result.
Like asyncio, sloop emits a warning if a coroutine takes too
long between waits; wrapping a call in wait_call silences
this. However, unlike asyncio.to_thread, wait_call schedules
the function to run in Global Context rather than in a separate
thread, so it will still block the progress of simulation. Thus, the
motivation for the function is to allow preservation of asyncio's
convention that expensive code should not be run within coroutines.
Raises RuntimeError if not called from a sloop corotuine.
async def wait(s: snoop.Snooper)
Wait for the next time a snooper yields.
async def trace(snooper, until)
Collect all values yielded by snooper until the first time
until yields, and return a list of these values.
class Tracer
Asynchronous context manager that converts a snooper into an
asynchronous iterator of yielded values. The asynchronous iterator is
returned by __enter__, so typical usage looks like this:
async with Tracer(SomeSnooper(...)) as t:
async for value in t:
...
def __init__(self, snooper: snoop.Snooper, until: snoop.Snooper=None)
snooper is the snooper whose values to collect. until
specifies when to stop iteration; the default is to continue
indefinitely.
class CallbackScope(contextlib.AbstractAsyncContextManager)
Reusable asynchronous context manager that subscribes a callback to a snooper while entered.
def __init__(self, snp: snoop.Snooper[T], cb: Callable[[T], None])
The callback cb is subscribed to snp while entered.
def timeout(snooper)
Similar to asyncio.timeout in Python 3.11, returns an
asynchronous context manager that interrupts the block by raising a
TimeoutError exception if it does not finish in time. Unlike
asyncio.timeout, the timeout is specified by a snooper object,
typically an instance of sloop.Seconds.