Source Wrapper¶
- class dffml.source.wrapper.ContextManagedWrapperSource(config: Optional[Type[BaseConfig]])[source]¶
- classmethod remove_self_from_args(args)[source]¶
Remove class from args if called as method
Examples
>>> class MyConextManagedSource(ContextManagedWrapperSource): ... def arg_zero_is_self_with_remove(*args): ... args = MyConextManagedSource.remove_self_from_args(args) ... return isinstance(args[0], MyConextManagedSource) ... ... def arg_zero_is_self(*args): ... return isinstance(args[0], MyConextManagedSource) >>> >>> source = MyConextManagedSource() >>> print(source.arg_zero_is_self("feedface")) True >>> print(source.arg_zero_is_self_with_remove("feedface")) False
- exception dffml.source.wrapper.FunctionDidNotYieldSource[source]¶
Raised when the function wrapped by context_managed_wrapper_source() did not yield an object of instance
dffml.source.source.BaseSource
.
- exception dffml.source.wrapper.FunctionMustBeGenerator[source]¶
Raised when the function to be wrapped by context_managed_wrapper_source() was not an async generator or a generator. The wrapped function must be one of the two in order to be made into a context manager.
- class dffml.source.wrapper.WrapperSource(config: Optional[Type[BaseConfig]])[source]¶
Source used to wrap other sources
Examples
A memory source pre-populated with a some records.
>>> from dffml.noasync import load >>> from dffml import ( ... BaseConfig, ... WrapperSource, ... MemorySource, ... Record, ... entrypoint, ... ) >>> >>> @entrypoint("myrecords") ... class MyRecordsSource(WrapperSource): ... CONFIG = BaseConfig ... ... async def __aenter__(self) -> "MyRecordsSource": ... self.source = MemorySource(records=[ ... Record("myrecord0", data={"features": {"a": 1}}), ... Record("myrecord1", data={"features": {"b": 2}}), ... ]) ... return await super().__aenter__() >>> >>> for record in load(MyRecordsSource()): ... print(record.export()) {'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}} {'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}}
- CONTEXT¶
alias of
WrapperSourceContext
- class dffml.source.wrapper.WrapperSourceContext(parent: BaseSource)[source]¶
- async record(key: str) AsyncIterator[Record] [source]¶
Get a record from the source or add it if it doesn’t exist.
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = await ctx.record("example") ... # Let's also try calling `record` for a record that doesnt exist. ... one = await ctx.record("one") ... await ctx.update(one) ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}} {'key': 'one', 'extra': {}}
- async records() AsyncIterator[Record] [source]¶
Returns a list of records retrieved from self.src
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... async with source() as ctx: ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
- async update(record: Record)[source]¶
Updates a record for a source
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = Record("one", data=dict(features=dict(feed="face"))) ... # ... Update one into our records ... ... await ctx.update(example) ... # Let's check out our records after calling `record` and `update`. ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}
- dffml.source.wrapper.context_managed_wrapper_source(entrypoint_name, qualname_suffix: str = 'Source') ContextManagedWrapperSource [source]¶
Given a function which can function as a context manager (any function which uses the
yield
keyword). Make the function into a context manger or an async context manager depending on if it’s definedasync
or not.Create a subclass of
ContextManagedWrapperSource
Examples
A memory source pre-populated with a some records.
>>> from dffml.noasync import load >>> from dffml import ( ... MemorySource, ... Record, ... context_managed_wrapper_source, ... ) >>> >>> @context_managed_wrapper_source("my.records") ... def my_records(): ... yield MemorySource(records=[ ... Record("myrecord0", data={"features": {"a": 1}}), ... Record("myrecord1", data={"features": {"b": 2}}), ... ]) >>> >>> print(my_records.source) <class 'dffml.base.MyRecordsSource'> >>> for record in load(my_records.source()): ... print(record.export()) {'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}} {'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}} >>> >>> @context_managed_wrapper_source("async.my.records") ... async def async_my_records(): ... yield MemorySource(records=[ ... Record("myrecord0", data={"features": {"a": 1}}), ... Record("myrecord1", data={"features": {"b": 2}}), ... ]) >>> >>> print(async_my_records.source) <class 'dffml.base.AsyncMyRecordsSource'> >>> for record in load(async_my_records.source()): ... print(record.export()) {'key': 'myrecord0', 'features': {'a': 1}, 'extra': {}} {'key': 'myrecord1', 'features': {'b': 2}, 'extra': {}}