High_Level Dataflow

async dffml.high_level.dataflow.run(dataflow: DataFlow, *input_sets: Union[List[Input], BaseInputSet], orchestrator: Optional[BaseOrchestrator] = None, strict: bool = True, ctx: Optional[BaseInputSetContext] = None, halt: Optional[Event] = None) AsyncIterator[Tuple[BaseInputSetContext, Dict[str, Any]]][source]

Run a DataFlow

Run a DataFlow using the the default orchestrator (MemoryOrchestrator), or the specified one.

Parameters:
  • dataflow (DataFlow) – DataFlow to run.

  • input_sets (InputSet, list, dict, optional) –

    Inputs to give to the DataFlow when it starts. Can be in multiple formats.

    If each element is a list then it’s expected that each element of that list be an Input, in this case an InputSet will be created with a random string used as the StringInputSetContext.

    If a dict is given then each key will become a StringInputSetContext. The value for each key should be a list of Input objects.

    If each element is a InputSet then each context InputSetContext will have its respective Inputs added to it.

  • orchestrator (BaseOrchestrator, optional) – Orchestrator to use, defaults to MemoryOrchestrator if None.

  • strict (bool, optional) – If true (default), raise exceptions when they occur in operations. If false, log exceptions without raising.

  • ctx (BaseInputSetContext, optional) – If given and input_sets is a list then add inputs under the given context. Otherwise they are added under randomly generated contexts.

  • halt (Event, optional) – If given, keep the dataflow running until this asyncio.Event is set.

Returns:

tuple of InputSetContext and dict where contents are determined by output operations. If multiple output operations are used, then the top level keys will be the names of the output operations. If only one is used, then the dict will be whatever the return value of that output operation was.

Return type:

asynciterator

Examples

The following creates a TCP echo server. We write an operation which using a DataFlow to open a connection and send a message to the server.

For the inputs to the DataFlow, we create 2 Input objects whose values are the message to be sent to the TCP server. We also create Input objects for the host and port. When running a DataFlow, operations will be run with each possible permutation of their inputs.

Flow chart showing how both echo messages create a parameter set including that echo message and the host and port

Because there is a different Input object for each of the 2 “echo” messages, one will get combined with the host and port to make an argument list for the send_to_server operation. The other also combines with the host and port to make another argument list. Both argument lists are used to call the send_to_server operation.

>>> # Socket server derived from
>>> # https://docs.python.org/3/library/socketserver.html#asynchronous-mixins
>>> import asyncio
>>> import threading
>>> import socketserver
>>> from dffml import *
>>>
>>> class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
...     def handle(self):
...         self.request.sendall(self.request.recv(1024))
>>>
>>> class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
...     pass
>>>
>>> @op
... async def send_to_server(host: str, port: int, message: str):
...     reader, writer = await asyncio.open_connection(host, port)
...
...     writer.write(message.encode())
...     await writer.drain()
...
...     data = await reader.read(100)
...     print(f"Client sent {message!r}, got: {data.decode()!r}")
...
...     writer.close()
...     await writer.wait_closed()
>>>
>>> # List of messages to send to the server, 2 long, each value is "echo"
>>> messages = [Input(value="echo", definition=send_to_server.op.inputs["message"])
...             for _ in range(0, 2)]
>>>
>>> # DataFlow consisting of the single operation
>>> dataflow = DataFlow.auto(send_to_server)
>>>
>>> async def main():
...     # Create a server with and pass 0 to get a random port assigned
...     server = ThreadedTCPServer(("127.0.0.1", 0), ThreadedTCPRequestHandler)
...     with server:
...         host, port = server.server_address
...
...         # Start a thread to run the server in the background
...         server_thread = threading.Thread(target=server.serve_forever)
...         # Exit the server thread when the main thread terminates
...         server_thread.daemon = True
...         server_thread.start()
...
...         # Add the host and port to the list of Inputs for the DataFlow
...         inputs = messages + [
...             Input(value=host, definition=send_to_server.op.inputs["host"]),
...             Input(value=port, definition=send_to_server.op.inputs["port"])
...         ]
...
...         try:
...             async for ctx, results in run(dataflow, inputs):
...                 pass
...         finally:
...             server.shutdown()
>>>
>>> asyncio.run(main())
Client sent 'echo', got: 'echo'
Client sent 'echo', got: 'echo'