High_Level Dataflow¶
- async dffml.high_level.dataflow.run(dataflow: DataFlow, *input_sets: Union[List[Input], BaseInputSet], orchestrator: Optional[BaseOrchestrator] = None, strict: bool = True, ctx: Optional[BaseInputSetContext] = None, halt: Optional[Event] = None) AsyncIterator[Tuple[BaseInputSetContext, Dict[str, Any]]] [source]¶
Run a DataFlow
Run a DataFlow using the the default orchestrator (
MemoryOrchestrator
), or the specified one.- Parameters:
dataflow (DataFlow) –
DataFlow
to run.input_sets (InputSet, list, dict, optional) –
Inputs
to give to theDataFlow
when it starts. Can be in multiple formats.If each element is a
list
then it’s expected that each element of that list be anInput
, in this case anInputSet
will be created with a random string used as theStringInputSetContext
.If a
dict
is given then each key will become aStringInputSetContext
. The value for each key should be alist
ofInput
objects.If each element is a
InputSet
then each contextInputSetContext
will have its respectiveInputs
added to it.orchestrator (BaseOrchestrator, optional) – Orchestrator to use, defaults to
MemoryOrchestrator
ifNone
.strict (bool, optional) – If true (default), raise exceptions when they occur in operations. If false, log exceptions without raising.
ctx (BaseInputSetContext, optional) – If given and input_sets is a
list
then add inputs under the given context. Otherwise they are added under randomly generated contexts.halt (Event, optional) – If given, keep the dataflow running until this
asyncio.Event
is set.
- Returns:
tuple
ofInputSetContext
anddict
where contents are determined by output operations. If multiple output operations are used, then the top level keys will be the names of the output operations. If only one is used, then thedict
will be whatever the return value of that output operation was.- Return type:
asynciterator
Examples
The following creates a TCP echo server. We write an operation which using a DataFlow to open a connection and send a message to the server.
For the inputs to the DataFlow, we create 2 Input objects whose values are the message to be sent to the TCP server. We also create Input objects for the host and port. When running a DataFlow, operations will be run with each possible permutation of their inputs.
Because there is a different Input object for each of the 2 “echo” messages, one will get combined with the host and port to make an argument list for the
send_to_server
operation. The other also combines with the host and port to make another argument list. Both argument lists are used to call thesend_to_server
operation.>>> # Socket server derived from >>> # https://docs.python.org/3/library/socketserver.html#asynchronous-mixins >>> import asyncio >>> import threading >>> import socketserver >>> from dffml import * >>> >>> class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): ... def handle(self): ... self.request.sendall(self.request.recv(1024)) >>> >>> class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): ... pass >>> >>> @op ... async def send_to_server(host: str, port: int, message: str): ... reader, writer = await asyncio.open_connection(host, port) ... ... writer.write(message.encode()) ... await writer.drain() ... ... data = await reader.read(100) ... print(f"Client sent {message!r}, got: {data.decode()!r}") ... ... writer.close() ... await writer.wait_closed() >>> >>> # List of messages to send to the server, 2 long, each value is "echo" >>> messages = [Input(value="echo", definition=send_to_server.op.inputs["message"]) ... for _ in range(0, 2)] >>> >>> # DataFlow consisting of the single operation >>> dataflow = DataFlow.auto(send_to_server) >>> >>> async def main(): ... # Create a server with and pass 0 to get a random port assigned ... server = ThreadedTCPServer(("127.0.0.1", 0), ThreadedTCPRequestHandler) ... with server: ... host, port = server.server_address ... ... # Start a thread to run the server in the background ... server_thread = threading.Thread(target=server.serve_forever) ... # Exit the server thread when the main thread terminates ... server_thread.daemon = True ... server_thread.start() ... ... # Add the host and port to the list of Inputs for the DataFlow ... inputs = messages + [ ... Input(value=host, definition=send_to_server.op.inputs["host"]), ... Input(value=port, definition=send_to_server.op.inputs["port"]) ... ] ... ... try: ... async for ctx, results in run(dataflow, inputs): ... pass ... finally: ... server.shutdown() >>> >>> asyncio.run(main()) Client sent 'echo', got: 'echo' Client sent 'echo', got: 'echo'