Source Dfpreprocess

class dffml.source.dfpreprocess.DataFlowPreprocessSource(config: Optional[Type[BaseConfig]])[source]
>>> import asyncio
>>> from dffml import *
>>>
>>> records = [
...     Record(
...         "0",
...         data={
...             "features": {
...                 "Years": 1,
...                 "Expertise": 3,
...                 "Trust": 0.2,
...                 "Salary": 20,
...             }
...         },
...     ),
... ]
>>>
>>> features = Features(
...     Feature("Years", int, 1),
...     Feature("Expertise", int, 1),
...     Feature("Trust", float, 1),
...     Feature("Salary", int, 1),
... )
>>>
>>> dataflow = DataFlow(multiply, AssociateDefinition)
>>> dataflow.flow["multiply"].inputs["multiplicand"] = [
...     {"seed": ["Years", "Expertise", "Trust", "Salary"]}
... ]
>>> dataflow.seed = [
...     Input(
...         value={
...             feature.name: multiply.op.outputs["product"].name
...             for feature in features
...         },
...         definition=AssociateDefinition.op.inputs["spec"],
...     ),
...     Input(value=10, definition=multiply.op.inputs["multiplier"],),
... ]
>>>
>>>
>>> memory_source = Sources(MemorySource(MemorySourceConfig(records=records)))
>>>
>>> source = DataFlowPreprocessSource(
...     DataFlowPreprocessSourceConfig(
...         source=memory_source, dataflow=dataflow, features=features,
...     )
... )
>>>
>>>
>>> async def main():
...     async with source as src:
...         async with src() as sctx:
...             async for record in sctx.records():
...                 print(record.features())
...
>>>
>>> asyncio.run(main())
{'Years': 10, 'Expertise': 30, 'Trust': 2.0, 'Salary': 200}
CONFIG

alias of DataFlowPreprocessSourceConfig

CONTEXT

alias of DataFlowPreprocessSourceContext

class dffml.source.dfpreprocess.DataFlowPreprocessSourceConfig(source: dffml.source.source.BaseSource, dataflow: dffml.df.types.DataFlow, features: dffml.feature.feature.Features = [], inputs: List[str] = <factory>, record_def: str = None, length: str = None, all_for_single: bool = False, no_strict: bool = False, orchestrator: dffml.df.base.BaseOrchestrator = MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None)))[source]
no_enforce_immutable()

By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.

Examples

>>> from dffml import config
>>>
>>> @config
... class MyConfig:
...     C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
...     config.C = 1
class dffml.source.dfpreprocess.DataFlowPreprocessSourceContext(parent: BaseSource)[source]
async record(key: str) AsyncIterator[Record][source]

Get a record from the source or add it if it doesn’t exist.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = await ctx.record("example")
...             # Let's also try calling `record` for a record that doesnt exist.
...             one = await ctx.record("one")
...             await ctx.update(one)
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
{'key': 'one', 'extra': {}}
async records() AsyncIterator[Record][source]

Returns a list of records retrieved from self.src

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         async with source() as ctx:
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
async update(record: Record)[source]

Updates a record for a source

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = Record("one", data=dict(features=dict(feed="face")))
...             # ... Update one into our records ...
...             await ctx.update(example)
...             # Let's check out our records after calling `record` and `update`.
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}
class dffml.source.dfpreprocess.RecordContextHandle(ctx: BaseInputSetContext)[source]
class dffml.source.dfpreprocess.RecordInputSetContext(record: Record)[source]