Operation Dataflow¶
- exception dffml.operation.dataflow.InvalidCustomRunDataFlowContext[source]¶
Thrown when custom inputs for dffml.dataflow.run do not list an input with string as its primitive as the first input.
- exception dffml.operation.dataflow.InvalidCustomRunDataFlowOutputs[source]¶
Thrown when outputs for a custom dffml.dataflow.run do not match that of it’s subflow.
- class dffml.operation.dataflow.RunDataFlowConfig(dataflow: dffml.df.types.DataFlow)[source]¶
- no_enforce_immutable()¶
By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.
Examples
>>> from dffml import config >>> >>> @config ... class MyConfig: ... C: int >>> >>> config = MyConfig(C=2) >>> with config.no_enforce_immutable(): ... config.C = 1
- class dffml.operation.dataflow.run_dataflow(parent: OperationImplementation, ctx: BaseInputSetContext, octx: BaseOrchestratorContext)[source]¶
Starts a subflow
self.config.dataflow
and addsinputs
in it.- Parameters:
inputs (dict) – The inputs to add to the subflow. These should be a key value mapping of the context string to the inputs which should be seeded for that context string.
- Returns:
Maps context strings in inputs to output after running through dataflow.
- Return type:
Examples
The following shows how to use run dataflow in its default behavior.
>>> import asyncio >>> from dffml import * >>> >>> URL = Definition(name="URL", primitive="string") >>> >>> subflow = DataFlow.auto(GetSingle) >>> subflow.definitions[URL.name] = URL >>> subflow.seed.append( ... Input( ... value=[URL.name], ... definition=GetSingle.op.inputs["spec"] ... ) ... ) >>> >>> dataflow = DataFlow.auto(run_dataflow, GetSingle) >>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow) >>> dataflow.seed.append( ... Input( ... value=[run_dataflow.op.outputs["results"].name], ... definition=GetSingle.op.inputs["spec"] ... ) ... ) >>> >>> async def main(): ... async for ctx, results in MemoryOrchestrator.run(dataflow, { ... "run_subflow": [ ... Input( ... value={ ... "dffml": [ ... { ... "value": "https://github.com/intel/dffml", ... "definition": URL.name ... } ... ] ... }, ... definition=run_dataflow.op.inputs["inputs"] ... ) ... ] ... }): ... print(results) >>> >>> asyncio.run(main()) {'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}
The following shows how to use run dataflow with custom inputs and outputs. This allows you to run a subflow as if it were an operation.
>>> import asyncio >>> from dffml import * >>> >>> URL = Definition(name="URL", primitive="string") >>> >>> @op( ... inputs={"url": URL}, ... outputs={"last": Definition("last_element_in_path", primitive="string")}, ... ) ... def last_path(url): ... return {"last": url.split("/")[-1]} >>> >>> subflow = DataFlow.auto(last_path, GetSingle) >>> subflow.seed.append( ... Input( ... value=[last_path.op.outputs["last"].name], ... definition=GetSingle.op.inputs["spec"], ... ) ... ) >>> >>> dataflow = DataFlow.auto(run_dataflow, GetSingle) >>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace( ... inputs={"URL": URL}, ... outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]}, ... expand=[], ... ) >>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow) >>> dataflow.seed.append( ... Input( ... value=[last_path.op.outputs["last"].name], ... definition=GetSingle.op.inputs["spec"], ... ) ... ) >>> dataflow.update(auto_flow=True) >>> >>> async def main(): ... async for ctx, results in MemoryOrchestrator.run( ... dataflow, ... { ... "run_subflow": [ ... Input(value="https://github.com/intel/dffml", definition=URL) ... ] ... }, ... ): ... print(results) >>> >>> asyncio.run(main()) {'last_element_in_path': 'dffml'}
- imp¶
alias of
DffmlDataflowRunImplementation