Util Asynchelper

WARNING: concurrent can be much slower for quick tasks. It is best used for long running concurrent tasks.

class dffml.util.asynchelper.AsyncContextManagerList(*args)[source]
CONTEXT

alias of AsyncContextManagerListContext

class dffml.util.asynchelper.AsyncContextManagerListContext(parent: AsyncContextManagerList)[source]
async dffml.util.asynchelper.aenter_stack(obj: Any, context_managers: Dict[str, AbstractAsyncContextManager], call: bool = True) AsyncExitStack[source]

Create a contextlib.AsyncExitStack then go through each key, value pair in the dict of async context managers. Enter the context of each async context manager and call setattr on obj to set the attribute by the name of key to the value yielded by the async context manager.

If call is true then the context entered will be the context returned by calling each context manager respectively.

dffml.util.asynchelper.context_stacker(inherit: Type, context_managers: Dict[str, AbstractAsyncContextManager]) Type[source]

Using aenter_stack()