Source Df

class dffml.source.df.DataFlowSource(config: Optional[Type[BaseConfig]])[source]
CONFIG

alias of DataFlowSourceConfig

CONTEXT

alias of DataFlowSourceContext

class dffml.source.df.DataFlowSourceConfig(source: dffml.source.source.BaseSource, dataflow: dffml.df.types.DataFlow, features: dffml.feature.feature.Features = [], inputs: List[str] = <factory>, record_def: str = None, length: str = None, all_for_single: bool = False, no_strict: bool = False, orchestrator: dffml.df.base.BaseOrchestrator = MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None)))[source]
no_enforce_immutable()

By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.

Examples

>>> from dffml import config
>>>
>>> @config
... class MyConfig:
...     C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
...     config.C = 1
class dffml.source.df.DataFlowSourceContext(parent: BaseSource)[source]
async record(key: str) AsyncIterator[Record][source]

Get a record from the source or add it if it doesn’t exist.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = await ctx.record("example")
...             # Let's also try calling `record` for a record that doesnt exist.
...             one = await ctx.record("one")
...             await ctx.update(one)
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
{'key': 'one', 'extra': {}}
async records() AsyncIterator[Record][source]

Returns a list of records retrieved from self.src

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         async with source() as ctx:
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
async update()[source]

Updates a record for a source

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = Record("one", data=dict(features=dict(feed="face")))
...             # ... Update one into our records ...
...             await ctx.update(example)
...             # Let's check out our records after calling `record` and `update`.
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}