Source code for dffml.operation.output

import copy
import collections
from typing import Dict, Any, NamedTuple, List

from ..df.types import Definition, Stage, DataFlow
from ..df.base import (
    op,
    OperationImplementationContext,
    BaseInputSetContext,
    BaseInputNetworkContext,
)
from ..df.exceptions import DefinitionNotInContext
from ..util.data import traverse_get


[docs]class GroupBySpec(NamedTuple): group: Definition by: Definition @classmethod async def resolve( cls, ctx: BaseInputSetContext, ictx: BaseInputNetworkContext, exported: Dict[str, Any], ): # TODO Address the need to copy operation implementation inputs dict # In case the input is used elsewhere in the network exported = copy.deepcopy(exported) # Look up the definiton for the group and by fields for convert in ["group", "by"]: exported[convert] = await ictx.definition(ctx, exported[convert]) return cls(**exported)
group_by_spec = Definition(name="group_by_spec", primitive="Dict[str, Any]") group_by_output = Definition( name="group_by_output", primitive="Dict[str, List[Any]]" )
[docs]@op( name="group_by", inputs={"spec": group_by_spec}, outputs={"output": group_by_output}, stage=Stage.OUTPUT, ) class GroupBy(OperationImplementationContext):
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: # Convert group_by_spec into a dict with values being of the NamedTuple # type GroupBySpec outputs = { key: await GroupBySpec.resolve(self.ctx, self.octx.ictx, value) for key, value in inputs["spec"].items() } self.logger.debug("output spec: %s", outputs) # Acquire all definitions within the context async with self.octx.ictx.definitions(self.ctx) as od: # Output dict want = {} # Group each requested output for output_name, output in outputs.items(): # Create an array for this output data want[output_name] = [] # Create an ordered dict which will be keyed off of and ordered # by the values of the output.group definition as seen in the # input network context group_by = {} async for item in od.inputs(output.by): group_by[item.value] = (item, {}) group_by = collections.OrderedDict(sorted(group_by.items())) # Find all inputs within the input network for the by definition async for item in od.inputs(output.group): # Get all the parents of the input parents = list(item.get_parents()) for group, related in group_by.values(): # Ensure that the definition we need to group by is in # the parents if not group in parents: continue if not output.by.name in related: related[output.by.name] = [] related[output.by.name].append(item.value) for index, (_group, qdata) in group_by.items(): for def_name, results in qdata.items(): for value in results: want[output_name].insert(index, value) # # If only one and single is set then convert list to single # # item for output dict # if len(want[output_name]) == 1 and output.single: # want[output_name] = want[output_name][0] # # If the output needs to be a dict then make it one. This # # will convert an array of arrays to a dict. # elif output.ismap: # want[output_name] = dict(want[output_name]) return want
get_multi_spec = Definition(name="get_multi_spec", primitive="array") get_multi_output = Definition(name="get_multi_output", primitive="map")
[docs]@op( name="get_multi", inputs={"spec": get_multi_spec}, outputs={"output": get_multi_output}, stage=Stage.OUTPUT, ) class GetMulti(OperationImplementationContext): """ Output operation to get all Inputs matching given definitions. Parameters ---------- spec : list List of definition names. Any Inputs with matching definition will be returned. Returns ------- dict Maps definition names to all the Inputs of that definition Examples -------- The following shows how to grab all Inputs with the URL definition. If we had we run an operation which output a URL, that output URL would have also been returned to us. >>> import asyncio >>> from dffml import * >>> >>> URL = Definition(name="URL", primitive="string") >>> >>> dataflow = DataFlow.auto(GetMulti) >>> dataflow.seed.append( ... Input( ... value=[URL.name], ... definition=GetMulti.op.inputs["spec"] ... ) ... ) >>> >>> async def main(): ... async for ctx, results in MemoryOrchestrator.run(dataflow, [ ... Input( ... value="https://github.com/intel/dffml", ... definition=URL ... ), ... Input( ... value="https://github.com/intel/cve-bin-tool", ... definition=URL ... ) ... ]): ... print(results) ... >>> asyncio.run(main()) {'URL': ['https://github.com/intel/dffml', 'https://github.com/intel/cve-bin-tool']} """
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: # TODO Address the need to copy operation implementation inputs dict # In case the input is used elsewhere in the network exported = copy.deepcopy(inputs["spec"]) name_map = {} for i, input_value in enumerate(exported): if isinstance(input_value, dict): name, value = list(input_value.items())[0] name_map[value] = name exported[i] = value # Look up the definiton for each for convert in range(0, len(exported)): exported[convert] = await self.octx.ictx.definition( self.ctx, exported[convert] ) self.logger.debug("output spec: %s", exported) # Acquire all definitions within the context async with self.octx.ictx.definitions(self.ctx) as od: # Output dict want = {} # Group each requested output for definition in exported: async for item in od.inputs(definition): want.setdefault(definition.name, []) want[definition.name].append(item.value) # Rename outputs if present in name_map for key, value in want.copy().items(): if name_map.get(key, None): want[name_map[key]] = value want.pop(key) return want
get_single_spec = Definition(name="get_single_spec", primitive="array") get_single_output = Definition(name="get_single_output", primitive="map")
[docs]@op( name="get_single", inputs={"spec": get_single_spec}, outputs={"output": get_single_output}, stage=Stage.OUTPUT, ) class GetSingle(GetMulti): """ Output operation to get a single Input for each definition given. Parameters ---------- spec : list List of definition names. An Input with matching definition will be returned. Returns ------- dict Maps definition names to an Input of that definition Examples -------- The following shows how to grab an Inputs with the URL definition. If we had we run an operation which output a URL, that output URL could have also been returned to us. >>> import asyncio >>> from dffml import * >>> >>> URL = Definition(name="URL", primitive="string") >>> ORG = Definition(name="ORG", primitive="string") >>> >>> dataflow = DataFlow.auto(GetSingle) >>> dataflow.seed.append( ... Input( ... value=[{"Repo Link": URL.name}, ORG.name], ... definition=GetSingle.op.inputs["spec"] ... ) ... ) >>> >>> async def main(): ... async for ctx, results in MemoryOrchestrator.run(dataflow, [ ... Input( ... value="https://github.com/intel/dffml", ... definition=URL ... ), ... Input( ... value="Intel", ... definition=ORG ... ) ... ]): ... print(results) ... >>> asyncio.run(main()) {'ORG': 'Intel', 'Repo Link': 'https://github.com/intel/dffml'} """
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: # Output dict want = await super().run(inputs) for key, value in want.items(): want[key] = value.pop() return want
associate_spec = Definition(name="associate_spec", primitive="List[str]") associate_output = Definition( name="associate_output", primitive="Dict[str, Any]" )
[docs]@op( name="associate", inputs={"spec": associate_spec}, outputs={"output": associate_output}, stage=Stage.OUTPUT, ) class Associate(OperationImplementationContext):
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: # TODO Address the need to copy operation implementation inputs dict # In case the input is used elsewhere in the network exported = copy.deepcopy(inputs["spec"]) # Look up the definiton for each try: for convert in range(0, len(exported)): exported[convert] = await self.octx.ictx.definition( self.ctx, exported[convert] ) except DefinitionNotInContext: return {exported[1]: {}} # Make exported into key, value which it will be in output key, value = exported # Acquire all definitions within the context async with self.octx.ictx.definitions(self.ctx) as od: # Output dict want = {} async for item in od.inputs(value): parents = item.get_parents() for parent in parents: if key == parent.definition: want[parent.value] = item.value break return {value.name: want}
[docs]@op( name="associate_definition", inputs={"spec": associate_spec}, outputs={"output": associate_output}, stage=Stage.OUTPUT, ) class AssociateDefinition(OperationImplementationContext): """ Examples -------- >>> import asyncio >>> from dffml import * >>> >>> feed_def = Definition(name="feed", primitive="string") >>> dead_def = Definition(name="dead", primitive="string") >>> output = Definition(name="output", primitive="string") >>> >>> feed_input = Input(value="my favorite value", definition=feed_def) >>> face_input = Input( ... value="face", definition=output, parents=[feed_input] ... ) >>> >>> dead_input = Input( ... value="my second favorite value", definition=dead_def ... ) >>> beef_input = Input( ... value="beef", definition=output, parents=[dead_input] ... ) >>> >>> async def main(): ... for value in ["feed", "dead"]: ... async for ctx, results in MemoryOrchestrator.run( ... DataFlow.auto(AssociateDefinition), ... [ ... feed_input, ... face_input, ... dead_input, ... beef_input, ... Input( ... value={value: "output"}, ... definition=AssociateDefinition.op.inputs["spec"], ... ), ... ], ... ): ... print(results) >>> >>> asyncio.run(main()) {'feed': 'face'} {'dead': 'beef'} """
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: # Look up the definition for each definition name given try: spec = { await self.octx.ictx.definition( self.ctx, key ): await self.octx.ictx.definition(self.ctx, value) for key, value in inputs["spec"].items() } except DefinitionNotInContext: return {key: {} for key in inputs["spec"]} # Output dict want = {} # Acquire all definitions within the context async with self.octx.ictx.definitions(self.ctx) as od: # Make exported into key, value which it will be in output for key, value in spec.items(): async for item in od.inputs(value): parents = item.get_parents() for parent in parents: if key.name == parent.definition.name: want[key.name] = item.value break return want
[docs]class RemapConfig(NamedTuple): dataflow: DataFlow @classmethod def _fromdict(cls, **kwargs): kwargs["dataflow"] = DataFlow._fromdict(**kwargs["dataflow"]) return cls(**kwargs)
[docs]class RemapFailure(Exception): """ Raised whem results of a dataflow could not be remapped. """
# TODO Make it so that only one output operation gets run, the result of that # operation is the result of the dataflow @op( inputs={"spec": Definition(name="remap_spec", primitive="map")}, outputs={"response": Definition(name="remap_output", primitive="map")}, stage=Stage.OUTPUT, config_cls=RemapConfig, ) async def remap( self: OperationImplementationContext, spec: Dict[str, List[str]] ): # Create a new orchestrator context. Specify that it should use the existing # input set context, this way the output operations we'll be running have # access to the data from this data flow rather than a new sub flow. async with self.octx.parent( self.config.dataflow, reuse={"ictx": self.octx.ictx} ) as octx: _ctx, result = [result async for result in octx.run(ctx=self.ctx)][0] # Remap the output operations to their feature (copied logic # from CLI) remap = {} for (feature_name, traverse) in spec.items(): try: remap[feature_name] = traverse_get(result, *traverse) except KeyError as error: raise RemapFailure( "failed to remap %r. Results do not contain %r: %s" % (feature_name, ".".join(traverse), result) ) from error # Results have been remapped return remap