Operation Dataflow

exception dffml.operation.dataflow.InvalidCustomRunDataFlowContext[source]

Thrown when custom inputs for dffml.dataflow.run do not list an input with string as its primitive as the first input.

exception dffml.operation.dataflow.InvalidCustomRunDataFlowOutputs[source]

Thrown when outputs for a custom dffml.dataflow.run do not match that of it’s subflow.

class dffml.operation.dataflow.RunDataFlowConfig(dataflow: dffml.df.types.DataFlow)[source]
no_enforce_immutable()

By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.

Examples

>>> from dffml import config
>>>
>>> @config
... class MyConfig:
...     C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
...     config.C = 1
class dffml.operation.dataflow.run_dataflow(parent: OperationImplementation, ctx: BaseInputSetContext, octx: BaseOrchestratorContext)[source]

Starts a subflow self.config.dataflow and adds inputs in it.

Parameters:

inputs (dict) – The inputs to add to the subflow. These should be a key value mapping of the context string to the inputs which should be seeded for that context string.

Returns:

Maps context strings in inputs to output after running through dataflow.

Return type:

dict

Examples

The following shows how to use run dataflow in its default behavior.

>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> subflow = DataFlow.auto(GetSingle)
>>> subflow.definitions[URL.name] = URL
>>> subflow.seed.append(
...     Input(
...         value=[URL.name],
...         definition=GetSingle.op.inputs["spec"]
...     )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
...     Input(
...         value=[run_dataflow.op.outputs["results"].name],
...         definition=GetSingle.op.inputs["spec"]
...     )
... )
>>>
>>> async def main():
...     async for ctx, results in MemoryOrchestrator.run(dataflow, {
...         "run_subflow": [
...             Input(
...                 value={
...                     "dffml": [
...                         {
...                             "value": "https://github.com/intel/dffml",
...                             "definition": URL.name
...                         }
...                     ]
...                 },
...                 definition=run_dataflow.op.inputs["inputs"]
...             )
...         ]
...     }):
...         print(results)
>>>
>>> asyncio.run(main())
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}

The following shows how to use run dataflow with custom inputs and outputs. This allows you to run a subflow as if it were an operation.

>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> @op(
...     inputs={"url": URL},
...     outputs={"last": Definition("last_element_in_path", primitive="string")},
... )
... def last_path(url):
...     return {"last": url.split("/")[-1]}
>>>
>>> subflow = DataFlow.auto(last_path, GetSingle)
>>> subflow.seed.append(
...     Input(
...         value=[last_path.op.outputs["last"].name],
...         definition=GetSingle.op.inputs["spec"],
...     )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace(
...     inputs={"URL": URL},
...     outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]},
...     expand=[],
... )
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
...     Input(
...         value=[last_path.op.outputs["last"].name],
...         definition=GetSingle.op.inputs["spec"],
...     )
... )
>>> dataflow.update(auto_flow=True)
>>>
>>> async def main():
...     async for ctx, results in MemoryOrchestrator.run(
...         dataflow,
...         {
...             "run_subflow": [
...                 Input(value="https://github.com/intel/dffml", definition=URL)
...             ]
...         },
...     ):
...         print(results)
>>>
>>> asyncio.run(main())
{'last_element_in_path': 'dffml'}
imp

alias of DffmlDataflowRunImplementation

async run(inputs: Dict[str, Any]) Dict[str, Any][source]

Implementation of the operation goes here. Should take and return a dict with keys matching the input and output parameters of the Operation object associated with this operation implementation context.

async run_default(inputs: Dict[str, Any]) Dict[str, Any][source]

The default implementation for the dataflow.run operation is the uctx mode. This mode is when we map unique strings to a list of inputs to be given to the respective string’s context.