Source Wrapper

class dffml.source.wrapper.ContextManagedWrapperSource(config: Optional[Type[BaseConfig]])[source]
classmethod remove_self_from_args(args)[source]

Remove class from args if called as method

Examples

>>> class MyConextManagedSource(ContextManagedWrapperSource):
...     def arg_zero_is_self_with_remove(*args):
...         args = MyConextManagedSource.remove_self_from_args(args)
...         return isinstance(args[0], MyConextManagedSource)
...
...     def arg_zero_is_self(*args):
...         return isinstance(args[0], MyConextManagedSource)
>>>
>>> source = MyConextManagedSource()
>>> print(source.arg_zero_is_self("feedface"))
True
>>> print(source.arg_zero_is_self_with_remove("feedface"))
False
exception dffml.source.wrapper.FunctionDidNotYieldSource[source]

Raised when the function wrapped by context_managed_wrapper_source() did not yield an object of instance dffml.source.source.BaseSource.

exception dffml.source.wrapper.FunctionMustBeGenerator[source]

Raised when the function to be wrapped by context_managed_wrapper_source() was not an async generator or a generator. The wrapped function must be one of the two in order to be made into a context manager.

class dffml.source.wrapper.WrapperSource(config: Optional[Type[BaseConfig]])[source]

Source used to wrap other sources

Examples

A memory source pre-populated with a some records.

>>> from dffml.noasync import load
>>> from dffml import (
...     BaseConfig,
...     WrapperSource,
...     MemorySource,
...     Record,
...     entrypoint,
... )
>>>
>>> @entrypoint("myrecords")
... class MyRecordsSource(WrapperSource):
...     CONFIG = BaseConfig
...
...     async def __aenter__(self) -> "MyRecordsSource":
...         self.source = MemorySource(records=[
...             Record("myrecord0", data={"features": {"a": 1}}),
...             Record("myrecord1", data={"features": {"b": 2}}),
...         ])
...         return await super().__aenter__()
>>>
>>> for record in load(MyRecordsSource()):
...     print(record.export())
{'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}}
{'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}}
CONTEXT

alias of WrapperSourceContext

class dffml.source.wrapper.WrapperSourceContext(parent: BaseSource)[source]
async record(key: str) AsyncIterator[Record][source]

Get a record from the source or add it if it doesn’t exist.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = await ctx.record("example")
...             # Let's also try calling `record` for a record that doesnt exist.
...             one = await ctx.record("one")
...             await ctx.update(one)
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
{'key': 'one', 'extra': {}}
async records() AsyncIterator[Record][source]

Returns a list of records retrieved from self.src

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         async with source() as ctx:
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
async update(record: Record)[source]

Updates a record for a source

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = Record("one", data=dict(features=dict(feed="face")))
...             # ... Update one into our records ...
...             await ctx.update(example)
...             # Let's check out our records after calling `record` and `update`.
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}
dffml.source.wrapper.context_managed_wrapper_source(entrypoint_name, qualname_suffix: str = 'Source') ContextManagedWrapperSource[source]

Given a function which can function as a context manager (any function which uses the yield keyword). Make the function into a context manger or an async context manager depending on if it’s defined async or not.

Create a subclass of ContextManagedWrapperSource

Examples

A memory source pre-populated with a some records.

>>> from dffml.noasync import load
>>> from dffml import (
...     MemorySource,
...     Record,
...     context_managed_wrapper_source,
... )
>>>
>>> @context_managed_wrapper_source("my.records")
... def my_records():
...     yield MemorySource(records=[
...         Record("myrecord0", data={"features": {"a": 1}}),
...         Record("myrecord1", data={"features": {"b": 2}}),
...     ])
>>>
>>> print(my_records.source)
<class 'dffml.base.MyRecordsSource'>
>>> for record in load(my_records.source()):
...     print(record.export())
{'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}}
{'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}}
>>>
>>> @context_managed_wrapper_source("async.my.records")
... async def async_my_records():
...     yield MemorySource(records=[
...         Record("myrecord0", data={"features": {"a": 1}}),
...         Record("myrecord1", data={"features": {"b": 2}}),
...     ])
>>>
>>> print(async_my_records.source)
<class 'dffml.base.AsyncMyRecordsSource'>
>>> for record in load(async_my_records.source()):
...     print(record.export())
{'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}}
{'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}}