Df Base¶
- class dffml.df.base.BaseContextHandle(ctx: BaseInputSetContext)[source]¶
- class dffml.df.base.BaseDataFlowObject(config: Optional[Type[BaseConfig]])[source]¶
Data Flow Objects create their child contexts’ by passing only itself as an argument to the child’s __init__ (of type BaseDataFlowObjectContext).
- classmethod args(args, *above) Dict[str, Arg] [source]¶
Return a dict containing arguments required for this class
- classmethod config(config, *above) BaseConfig [source]¶
Create the BaseConfig required to instantiate this class by parsing the config dict.
- class dffml.df.base.BaseDataFlowObjectContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
Data Flow Object Contexts are instantiated by being passed their config, and their parent, a BaseDataFlowObject.
- class dffml.df.base.BaseDefinitionSetContext(config: BaseConfig, parent: BaseInputNetworkContext, ctx: BaseInputSetContext)[source]¶
- abstract async inputs(Definition: Definition) AsyncIterator[Input] [source]¶
Asynchronous iterator of all inputs within a context, which are of a definition.
- class dffml.df.base.BaseInputNetwork(config: Optional[Type[BaseConfig]])[source]¶
Input networks store all of the input data and output data of operations, which in turn becomes input data to other operations.
- class dffml.df.base.BaseInputNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
Abstract Base Class for context managing input_set
- abstract async add(input_set: BaseInputSet)[source]¶
Adds new input set to the network
- abstract async added(ctx: BaseInputSetContext) BaseInputSet [source]¶
Returns when a new input set has entered the network within a context
- abstract async ctx() BaseInputSetContext [source]¶
Returns when a new input set context has entered the network
- abstract async definition(ctx: BaseInputSetContext, definition: str) Definition [source]¶
Search for the definition within a context given its name as a string. Return the definition. Otherwise raise a DefinitionNotInContext error. If the context is not present, raise a ContextNotPresent error.
- abstract definitions(ctx: BaseInputSetContext) BaseDefinitionSetContext [source]¶
Return a DefinitionSet context that can be used to access the inputs within the given context, by definition.
- abstract async gather_inputs(rctx: BaseRedundancyCheckerContext, operation: Operation, ctx: Optional[BaseInputSetContext] = None) AsyncIterator[BaseParameterSet] [source]¶
Generate all possible permutations of applicable inputs for an operation that, according to the redundancy checker, haven’t been run yet.
- class dffml.df.base.BaseInputSet(config: BaseInputSetConfig)[source]¶
-
- abstract async remove_unvalidated_inputs() BaseInputSet [source]¶
Removes unvalidated inputs from internal list and returns the same.
- class dffml.df.base.BaseKeyValueStore(config: Optional[Type[BaseConfig]])[source]¶
Abstract Base Class for key value storage
- class dffml.df.base.BaseKeyValueStoreContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
Abstract Base Class for key value storage context
- class dffml.df.base.BaseLockNetwork(config: Optional[Type[BaseConfig]])[source]¶
Acquires locks on inputs which may not be used simultaneously
- class dffml.df.base.BaseLockNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
- abstract async acquire(parameter_set: BaseParameterSet) bool [source]¶
An async context manager which will acquire locks of all inputs within the parameter set.
- class dffml.df.base.BaseOperationImplementationNetwork(config: Optional[Type[BaseConfig]])[source]¶
Knows where operations are or if they can be made
- class dffml.df.base.BaseOperationImplementationNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
- abstract async contains(operation: Operation) bool [source]¶
Checks if the network contains / has the ability to run a given operation.
- abstract async dispatch(ictx: BaseInputNetworkContext, lctx: BaseLockNetworkContext, operation: Operation, parameter_set: BaseParameterSet)[source]¶
Schedule the running of an operation
- abstract async instantiable(operation: Operation) bool [source]¶
Prior to figuring out which operation implementation networks contain an operation, if none do, they will need to instantiate it on the fly.
- abstract async instantiate(operation: Operation, config: BaseConfig) bool [source]¶
Instantiate a given operation so that it can be run within this network.
- class dffml.df.base.BaseOperationNetwork(config: Optional[Type[BaseConfig]])[source]¶
Operation networks hold Operation objects to allow for looking up of their inputs, outputs, and conditions.
- class dffml.df.base.BaseOperationNetworkContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
Abstract Base Class for context managing operations
- abstract async operations(input_set: Optional[BaseInputSet] = None, stage: Stage = Stage.PROCESSING) AsyncIterator[Operation] [source]¶
Retrieve all operations in the network of a given stage filtering by operations who have inputs with definitions in the input set.
- class dffml.df.base.BaseOrchestrator(config: Optional[Type[BaseConfig]])[source]¶
- class dffml.df.base.BaseOrchestratorConfig(input_network: dffml.df.base.BaseInputNetwork, operation_network: dffml.df.base.BaseOperationNetwork, lock_network: dffml.df.base.BaseLockNetwork, opimp_network: dffml.df.base.BaseOperationImplementationNetwork, rchecker: dffml.df.base.BaseRedundancyChecker)[source]¶
- class dffml.df.base.BaseOrchestratorContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
- abstract async operations_parameter_set_pairs(ctx: BaseInputSetContext, *, new_input_set: Optional[BaseInputSet] = None, stage: Stage = Stage.PROCESSING) AsyncIterator[Tuple[Operation, BaseParameterSet]] [source]¶
Use new_input_set to determine which operations in the network might be up for running. Cross check using existing inputs to generate per input set context novel input pairings. Yield novel input pairings along with their operations as they are generated.
- abstract async run_operations(strict: bool = True) AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]] [source]¶
Run all the operations then run cleanup and output operations
- class dffml.df.base.BaseParameterSet(config: BaseParameterSetConfig)[source]¶
- class dffml.df.base.BaseRedundancyChecker(config: Optional[Type[BaseConfig]])[source]¶
Redundancy Checkers ensure that each operation within a context only gets run with a give permutation of inputs once.
- class dffml.df.base.BaseRedundancyCheckerConfig(key_value_store)[source]¶
- property key_value_store¶
Alias for field number 0
- class dffml.df.base.BaseRedundancyCheckerContext(config: BaseConfig, parent: BaseDataFlowObject)[source]¶
Abstract Base Class for redundancy checking context
- exception dffml.df.base.FailedToLoadOperationImplementation[source]¶
Raised when an OperationImplementation wasn’t found to be registered with the dffml.operation entrypoint.
- exception dffml.df.base.OpCouldNotDeterminePrimitive[source]¶
op could not determine the primitive of the parameter
- exception dffml.df.base.OperationException[source]¶
Raised by the orchestrator when an operation throws an exception.
- class dffml.df.base.OperationImplementation(config: Optional[BaseConfig])[source]¶
- class dffml.df.base.OperationImplementationContext(parent: OperationImplementation, ctx: BaseInputSetContext, octx: BaseOrchestratorContext)[source]¶
- property config¶
Alias for self.parent.config
- exception dffml.df.base.OperationImplementationNotInstantiable[source]¶
OperationImplementation cannot be instantiated and is required to continue.
- exception dffml.df.base.OperationImplementationNotInstantiated[source]¶
OperationImplementation is instantiable, but is not has not been instantiated within the network and was required to continue.
Attempted to run operation which could be instantiated, but has not yet been.
- class dffml.df.base.StringContextHandle(ctx: BaseInputSetContext)[source]¶
- dffml.df.base.isoperation(item)[source]¶
Similar to inspect.isclass and that family of functions. Returns true if item is an instance of Operation.
- dffml.df.base.isopimp(item)[source]¶
Similar to inspect.isclass and that family of functions. Returns true if item is a subclass of OperationImpelmentation.
- dffml.df.base.isopwraped(item)[source]¶
Similar to inspect.isclass and that family of functions. Returns true if a function has been wrapped with op.
- dffml.df.base.mk_base_in(predicate)[source]¶
Creates the functions which use inspect getmembers to extract operations or implementations from some list which.
- dffml.df.base.op(*args, imp_enter=None, ctx_enter=None, config_cls=None, valid_return_none=True, **kwargs)[source]¶
The
op
decorator creates a subclass ofdffml.df.OperationImplementation
and assigns thatOperationImplementation
to the.imp
parameter of the function it decorates.If the decorated object is not already a class which is a subclass of
OperationImplementationContext
, it creates andffml.df.OperationImplementationContext
and assigns it to theCONTEXT
class parameter of theOperationImplementation
which was created.Upon context entry into the
OperationImplementation
, imp_enter is iterated over and the values in thatdict
are entered. The value yielded upon entry is assigned to a parameter in theOperationImplementation
instance named after the respective key.Examples
>>> from dffml import Definition, Input, op >>> from typing import NamedTuple, List, Dict >>> >>> class Person(NamedTuple): ... name: str ... age: int ... >>> @op ... def cannotVote(p: List[Person]): ... return list(filter(lambda person: person.age < 18, p)) ... >>> >>> Input( ... value=[ ... {"name": "Bob", "age": 20}, ... {"name": "Mark", "age": 21}, ... {"name": "Alice", "age": 90}, ... ], ... definition=cannotVote.op.inputs["p"], ... ) Input(value=[Person(name='Bob', age=20), Person(name='Mark', age=21), Person(name='Alice', age=90)], definition=cannotVote.inputs.p) >>> >>> @op ... def canVote(p: Dict[str, Person]) -> Dict[str, Person]: ... return { ... person.name: person ... for person in filter(lambda person: person.age >= 18, p.values()) ... } ... >>> >>> Input( ... value={ ... "Bob": {"name": "Bob", "age": 19}, ... "Alice": {"name": "Alice", "age": 21}, ... "Mark": {"name": "Mark", "age": 90}, ... }, ... definition=canVote.op.inputs["p"], ... ) Input(value={'Bob': Person(name='Bob', age=19), 'Alice': Person(name='Alice', age=21), 'Mark': Person(name='Mark', age=90)}, definition=canVote.inputs.p) >>> >>> Input( ... value={ ... "Bob": {"name": "Bob", "age": 19}, ... "Alice": {"name": "Alice", "age": 21}, ... "Mark": {"name": "Mark", "age": 90}, ... }, ... definition=canVote.op.outputs["result"], ... ) Input(value={'Bob': Person(name='Bob', age=19), 'Alice': Person(name='Alice', age=21), 'Mark': Person(name='Mark', age=90)}, definition=canVote.outputs.result)