Source Dfpreprocess¶
- class dffml.source.dfpreprocess.DataFlowPreprocessSource(config: Optional[Type[BaseConfig]])[source]¶
>>> import asyncio >>> from dffml import * >>> >>> records = [ ... Record( ... "0", ... data={ ... "features": { ... "Years": 1, ... "Expertise": 3, ... "Trust": 0.2, ... "Salary": 20, ... } ... }, ... ), ... ] >>> >>> features = Features( ... Feature("Years", int, 1), ... Feature("Expertise", int, 1), ... Feature("Trust", float, 1), ... Feature("Salary", int, 1), ... ) >>> >>> dataflow = DataFlow(multiply, AssociateDefinition) >>> dataflow.flow["multiply"].inputs["multiplicand"] = [ ... {"seed": ["Years", "Expertise", "Trust", "Salary"]} ... ] >>> dataflow.seed = [ ... Input( ... value={ ... feature.name: multiply.op.outputs["product"].name ... for feature in features ... }, ... definition=AssociateDefinition.op.inputs["spec"], ... ), ... Input(value=10, definition=multiply.op.inputs["multiplier"],), ... ] >>> >>> >>> memory_source = Sources(MemorySource(MemorySourceConfig(records=records))) >>> >>> source = DataFlowPreprocessSource( ... DataFlowPreprocessSourceConfig( ... source=memory_source, dataflow=dataflow, features=features, ... ) ... ) >>> >>> >>> async def main(): ... async with source as src: ... async with src() as sctx: ... async for record in sctx.records(): ... print(record.features()) ... >>> >>> asyncio.run(main()) {'Years': 10, 'Expertise': 30, 'Trust': 2.0, 'Salary': 200}
- CONFIG¶
alias of
DataFlowPreprocessSourceConfig
- CONTEXT¶
alias of
DataFlowPreprocessSourceContext
- class dffml.source.dfpreprocess.DataFlowPreprocessSourceConfig(source: dffml.source.source.BaseSource, dataflow: dffml.df.types.DataFlow, features: dffml.feature.feature.Features = [], inputs: List[str] = <factory>, record_def: str = None, length: str = None, all_for_single: bool = False, no_strict: bool = False, orchestrator: dffml.df.base.BaseOrchestrator = MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None)))[source]¶
- no_enforce_immutable()¶
By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.
Examples
>>> from dffml import config >>> >>> @config ... class MyConfig: ... C: int >>> >>> config = MyConfig(C=2) >>> with config.no_enforce_immutable(): ... config.C = 1
- class dffml.source.dfpreprocess.DataFlowPreprocessSourceContext(parent: BaseSource)[source]¶
- async record(key: str) AsyncIterator[Record] [source]¶
Get a record from the source or add it if it doesn’t exist.
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = await ctx.record("example") ... # Let's also try calling `record` for a record that doesnt exist. ... one = await ctx.record("one") ... await ctx.update(one) ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}} {'key': 'one', 'extra': {}}
- async records() AsyncIterator[Record] [source]¶
Returns a list of records retrieved from self.src
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... async with source() as ctx: ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
- async update(record: Record)[source]¶
Updates a record for a source
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = Record("one", data=dict(features=dict(feed="face"))) ... # ... Update one into our records ... ... await ctx.update(example) ... # Let's check out our records after calling `record` and `update`. ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}
- class dffml.source.dfpreprocess.RecordContextHandle(ctx: BaseInputSetContext)[source]¶