Source Df¶
- class dffml.source.df.DataFlowSource(config: Optional[Type[BaseConfig]])[source]¶
- CONFIG¶
alias of
DataFlowSourceConfig
- CONTEXT¶
alias of
DataFlowSourceContext
- class dffml.source.df.DataFlowSourceConfig(source: dffml.source.source.BaseSource, dataflow: dffml.df.types.DataFlow, features: dffml.feature.feature.Features = [], inputs: List[str] = <factory>, record_def: str = None, length: str = None, all_for_single: bool = False, no_strict: bool = False, orchestrator: dffml.df.base.BaseOrchestrator = MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None)))[source]¶
- no_enforce_immutable()¶
By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.
Examples
>>> from dffml import config >>> >>> @config ... class MyConfig: ... C: int >>> >>> config = MyConfig(C=2) >>> with config.no_enforce_immutable(): ... config.C = 1
- class dffml.source.df.DataFlowSourceContext(parent: BaseSource)[source]¶
- async record(key: str) AsyncIterator[Record] [source]¶
Get a record from the source or add it if it doesn’t exist.
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = await ctx.record("example") ... # Let's also try calling `record` for a record that doesnt exist. ... one = await ctx.record("one") ... await ctx.update(one) ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}} {'key': 'one', 'extra': {}}
- async records() AsyncIterator[Record] [source]¶
Returns a list of records retrieved from self.src
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... async with source() as ctx: ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
- async update()[source]¶
Updates a record for a source
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = Record("one", data=dict(features=dict(feed="face"))) ... # ... Update one into our records ... ... await ctx.update(example) ... # Let's check out our records after calling `record` and `update`. ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}