Df Base

class dffml.df.base.BaseContextHandle(ctx: BaseInputSetContext)[source]
class dffml.df.base.BaseDataFlowObject(config: Optional[Type[BaseConfig]])[source]

Data Flow Objects create their child contexts’ by passing only itself as an argument to the child’s __init__ (of type BaseDataFlowObjectContext).

classmethod args(args, *above) Dict[str, Arg][source]

Return a dict containing arguments required for this class

classmethod config(config, *above) BaseConfig[source]

Create the BaseConfig required to instantiate this class by parsing the config dict.

class dffml.df.base.BaseDataFlowObjectContext(config: BaseConfig, parent: BaseDataFlowObject)[source]

Data Flow Object Contexts are instantiated by being passed their config, and their parent, a BaseDataFlowObject.

class dffml.df.base.BaseDefinitionSetContext(config: BaseConfig, parent: BaseInputNetworkContext, ctx: BaseInputSetContext)[source]
abstract async inputs(Definition: Definition) AsyncIterator[Input][source]

Asynchronous iterator of all inputs within a context, which are of a definition.

class dffml.df.base.BaseInputNetwork(config: Optional[Type[BaseConfig]])[source]

Input networks store all of the input data and output data of operations, which in turn becomes input data to other operations.

class dffml.df.base.BaseInputNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]

Abstract Base Class for context managing input_set

abstract async add(input_set: BaseInputSet)[source]

Adds new input set to the network

abstract async added(ctx: BaseInputSetContext) BaseInputSet[source]

Returns when a new input set has entered the network within a context

abstract async ctx() BaseInputSetContext[source]

Returns when a new input set context has entered the network

abstract async definition(ctx: BaseInputSetContext, definition: str) Definition[source]

Search for the definition within a context given its name as a string. Return the definition. Otherwise raise a DefinitionNotInContext error. If the context is not present, raise a ContextNotPresent error.

abstract definitions(ctx: BaseInputSetContext) BaseDefinitionSetContext[source]

Return a DefinitionSet context that can be used to access the inputs within the given context, by definition.

abstract async gather_inputs(rctx: BaseRedundancyCheckerContext, operation: Operation, ctx: Optional[BaseInputSetContext] = None) AsyncIterator[BaseParameterSet][source]

Generate all possible permutations of applicable inputs for an operation that, according to the redundancy checker, haven’t been run yet.

class dffml.df.base.BaseInputSet(config: BaseInputSetConfig)[source]
abstract async add(item: Input) None[source]

Add an input to the input set.

abstract async remove_input(item: Input) None[source]

Removes item from input set

abstract async remove_unvalidated_inputs() BaseInputSet[source]

Removes unvalidated inputs from internal list and returns the same.

class dffml.df.base.BaseInputSetConfig(ctx)[source]
property ctx

Alias for field number 0

class dffml.df.base.BaseInputSetContext[source]
class dffml.df.base.BaseKeyValueStore(config: Optional[Type[BaseConfig]])[source]

Abstract Base Class for key value storage

class dffml.df.base.BaseKeyValueStoreContext(config: BaseConfig, parent: BaseDataFlowObject)[source]

Abstract Base Class for key value storage context

abstract async get(key: str) Optional[bytes][source]

Get a value from the key value store

abstract async set(name: str, value: bytes)[source]

Get a value in the key value store

class dffml.df.base.BaseLockNetwork(config: Optional[Type[BaseConfig]])[source]

Acquires locks on inputs which may not be used simultaneously

class dffml.df.base.BaseLockNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]
abstract async acquire(parameter_set: BaseParameterSet) bool[source]

An async context manager which will acquire locks of all inputs within the parameter set.

class dffml.df.base.BaseOperationImplementationNetwork(config: Optional[Type[BaseConfig]])[source]

Knows where operations are or if they can be made

class dffml.df.base.BaseOperationImplementationNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]
abstract async contains(operation: Operation) bool[source]

Checks if the network contains / has the ability to run a given operation.

abstract async dispatch(ictx: BaseInputNetworkContext, lctx: BaseLockNetworkContext, operation: Operation, parameter_set: BaseParameterSet)[source]

Schedule the running of an operation

abstract async instantiable(operation: Operation) bool[source]

Prior to figuring out which operation implementation networks contain an operation, if none do, they will need to instantiate it on the fly.

abstract async instantiate(operation: Operation, config: BaseConfig) bool[source]

Instantiate a given operation so that it can be run within this network.

abstract async operation_completed()[source]

Returns when an operation finishes

abstract async run(operation: Operation, inputs: Dict[str, Any]) Union[bool, Dict[str, Any]][source]

Find the operation implementation for the given operation and create an operation implementation context, call the run method of the context and return the results.

class dffml.df.base.BaseOperationNetwork(config: Optional[Type[BaseConfig]])[source]

Operation networks hold Operation objects to allow for looking up of their inputs, outputs, and conditions.

class dffml.df.base.BaseOperationNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]

Abstract Base Class for context managing operations

abstract async add(operations: List[Operation])[source]

Add operations to the network

abstract async operations(input_set: Optional[BaseInputSet] = None, stage: Stage = Stage.PROCESSING) AsyncIterator[Operation][source]

Retrieve all operations in the network of a given stage filtering by operations who have inputs with definitions in the input set.

class dffml.df.base.BaseOrchestrator(config: Optional[Type[BaseConfig]])[source]
class dffml.df.base.BaseOrchestratorConfig(input_network: dffml.df.base.BaseInputNetwork, operation_network: dffml.df.base.BaseOperationNetwork, lock_network: dffml.df.base.BaseLockNetwork, opimp_network: dffml.df.base.BaseOperationImplementationNetwork, rchecker: dffml.df.base.BaseRedundancyChecker)[source]
class dffml.df.base.BaseOrchestratorContext(config: BaseConfig, parent: BaseDataFlowObject)[source]
abstract async operations_parameter_set_pairs(ctx: BaseInputSetContext, *, new_input_set: Optional[BaseInputSet] = None, stage: Stage = Stage.PROCESSING) AsyncIterator[Tuple[Operation, BaseParameterSet]][source]

Use new_input_set to determine which operations in the network might be up for running. Cross check using existing inputs to generate per input set context novel input pairings. Yield novel input pairings along with their operations as they are generated.

abstract async run_operations(strict: bool = True) AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]][source]

Run all the operations then run cleanup and output operations

class dffml.df.base.BaseParameterSet(config: BaseParameterSetConfig)[source]
class dffml.df.base.BaseParameterSetConfig(ctx)[source]
property ctx

Alias for field number 0

class dffml.df.base.BaseRedundancyChecker(config: Optional[Type[BaseConfig]])[source]

Redundancy Checkers ensure that each operation within a context only gets run with a give permutation of inputs once.

class dffml.df.base.BaseRedundancyCheckerConfig(key_value_store)[source]
property key_value_store

Alias for field number 0

class dffml.df.base.BaseRedundancyCheckerContext(config: BaseConfig, parent: BaseDataFlowObject)[source]

Abstract Base Class for redundancy checking context

exception dffml.df.base.FailedToLoadOperationImplementation[source]

Raised when an OperationImplementation wasn’t found to be registered with the dffml.operation entrypoint.

exception dffml.df.base.OpCouldNotDeterminePrimitive[source]

op could not determine the primitive of the parameter

exception dffml.df.base.OperationException[source]

Raised by the orchestrator when an operation throws an exception.

class dffml.df.base.OperationImplementation(config: Optional[BaseConfig])[source]
classmethod load(loading: Optional[str] = None)[source]

Loads all installed loading and returns them as a list. Sources to be loaded should be registered to ENTRYPOINT via setuptools.

class dffml.df.base.OperationImplementationContext(parent: OperationImplementation, ctx: BaseInputSetContext, octx: BaseOrchestratorContext)[source]
property config

Alias for self.parent.config

abstract async run(inputs: Dict[str, Any]) Union[bool, Dict[str, Any]][source]

Implementation of the operation goes here. Should take and return a dict with keys matching the input and output parameters of the Operation object associated with this operation implementation context.

subflow(dataflow)[source]

Registers subflow dataflow with parent flow and yields an instance of BaseOrchestratorContext

>>> async def my_operation(arg):
...     async with self.subflow(self.config.dataflow) as octx:
...         return octx.run({"ctx_str": []})
exception dffml.df.base.OperationImplementationNotInstantiable[source]

OperationImplementation cannot be instantiated and is required to continue.

exception dffml.df.base.OperationImplementationNotInstantiated[source]

OperationImplementation is instantiable, but is not has not been instantiated within the network and was required to continue.

Attempted to run operation which could be instantiated, but has not yet been.

class dffml.df.base.StringContextHandle(ctx: BaseInputSetContext)[source]
class dffml.df.base.StringInputSetContext(as_string)[source]
dffml.df.base.isoperation(item)[source]

Similar to inspect.isclass and that family of functions. Returns true if item is an instance of Operation.

dffml.df.base.isopimp(item)[source]

Similar to inspect.isclass and that family of functions. Returns true if item is a subclass of OperationImpelmentation.

dffml.df.base.isopwraped(item)[source]

Similar to inspect.isclass and that family of functions. Returns true if a function has been wrapped with op.

dffml.df.base.mk_base_in(predicate)[source]

Creates the functions which use inspect getmembers to extract operations or implementations from some list which.

dffml.df.base.op(*args, imp_enter=None, ctx_enter=None, config_cls=None, valid_return_none=True, **kwargs)[source]

The op decorator creates a subclass of dffml.df.OperationImplementation and assigns that OperationImplementation to the .imp parameter of the function it decorates.

If the decorated object is not already a class which is a subclass of OperationImplementationContext, it creates an dffml.df.OperationImplementationContext and assigns it to the CONTEXT class parameter of the OperationImplementation which was created.

Upon context entry into the OperationImplementation, imp_enter is iterated over and the values in that dict are entered. The value yielded upon entry is assigned to a parameter in the OperationImplementation instance named after the respective key.

Examples

>>> from dffml import Definition, Input, op
>>> from typing import NamedTuple, List, Dict
>>>
>>> class Person(NamedTuple):
...     name: str
...     age: int
...
>>> @op
... def cannotVote(p: List[Person]):
...     return list(filter(lambda person: person.age < 18, p))
...
>>>
>>> Input(
...     value=[
...         {"name": "Bob", "age": 20},
...         {"name": "Mark", "age": 21},
...         {"name": "Alice", "age": 90},
...     ],
...     definition=cannotVote.op.inputs["p"],
... )
Input(value=[Person(name='Bob', age=20), Person(name='Mark', age=21), Person(name='Alice', age=90)], definition=cannotVote.inputs.p)
>>>
>>> @op
... def canVote(p: Dict[str, Person]) -> Dict[str, Person]:
...     return {
...         person.name: person
...         for person in filter(lambda person: person.age >= 18, p.values())
...     }
...
>>>
>>> Input(
...     value={
...         "Bob": {"name": "Bob", "age": 19},
...         "Alice": {"name": "Alice", "age": 21},
...         "Mark": {"name": "Mark", "age": 90},
...     },
...     definition=canVote.op.inputs["p"],
... )
Input(value={'Bob': Person(name='Bob', age=19), 'Alice': Person(name='Alice', age=21), 'Mark': Person(name='Mark', age=90)}, definition=canVote.inputs.p)
>>>
>>> Input(
...     value={
...         "Bob": {"name": "Bob", "age": 19},
...         "Alice": {"name": "Alice", "age": 21},
...         "Mark": {"name": "Mark", "age": 90},
...     },
...     definition=canVote.op.outputs["result"],
... )
Input(value={'Bob': Person(name='Bob', age=19), 'Alice': Person(name='Alice', age=21), 'Mark': Person(name='Mark', age=90)}, definition=canVote.outputs.result)