Using Locks In DataFlow

All the operations in DFFML runs asynchronously. DFFML takes care of locking objects which might be used by multiple operations so that you don’t run in to any race conditions. This example shows one such usage.

Note

All the code for this example is located under the examples/dataflow/locking directory of the DFFML source code.

First we import required packages and objects, and create Definitions for each data type our operation will use. Note that LOCKED_OBJ has lock=True.

example.py

import asyncio
from dffml import Definition, DataFlow, Input, op
from dffml.noasync import run

OBJ = Definition(name="obj", primitive="mapping")
LOCKED_OBJ = Definition(name="locked_obj", primitive="mapping", lock=True)
SLEEP_TIME = Definition(name="sleep_time", primitive="int")
INTEGER = Definition(name="integer", primitive="int")

Our operation run_me takes an object(of definition OBJ) sets the i attribute of it, sleeps for given time finally prints the value which was given as input and the current value of the object.

example.py

@op(inputs={"obj": OBJ, "sleep_for": SLEEP_TIME, "i": INTEGER})
async def run_me(obj: dict, sleep_for: int, i: int) -> None:
    obj["i"] = i
    await asyncio.sleep(sleep_for)
    print(f"set i = {i}, got i = {obj['i']}")

We’ll run the dataflow with two values for i. If the operations run as expected when printing, the value given as input and that of the object would be same.

example.py

print("Running dataflow without locked object")
for ctx, result in run(
    DataFlow(run_me),
    [
        Input(value={}, definition=OBJ),
        Input(value=0.1, definition=SLEEP_TIME),
        Input(value=0.2, definition=SLEEP_TIME),
        Input(value=1, definition=INTEGER),
        Input(value=2, definition=INTEGER),
    ],
):
    pass

But this codeblock gives an output

Running dataflow without locked object
set i = 2, got i = 1
set i = 1, got i = 1
set i = 2, got i = 1
set i = 1, got i = 1

We can see that the output is not what we expect. Since everything is running asynchronously, when one operations sleeps the other operation might start running and it replaces the value. This is where locks come handy. We’ll set run_me to take object of definition LOCKED_OBJ instead of OBJ and run the dataflow again.

example.py

print("Running dataflow with locked object")
run_me.op = run_me.op._replace(
    inputs={"obj": LOCKED_OBJ, "sleep_for": SLEEP_TIME, "i": INTEGER}
)
for ctx, result in run(
    DataFlow(run_me),
    [
        Input(value={}, definition=LOCKED_OBJ),
        Input(value=0.1, definition=SLEEP_TIME),
        Input(value=0.2, definition=SLEEP_TIME),
        Input(value=1, definition=INTEGER),
        Input(value=2, definition=INTEGER),
    ],
):
    pass

This time the output is as expected

Running dataflow with locked object
set i = 2, got i = 2
set i = 1, got i = 1
set i = 2, got i = 2
set i = 1, got i = 1