Using Locks In DataFlow¶
All the operations in DFFML runs asynchronously. DFFML takes care of locking objects which might be used by multiple operations so that you don’t run in to any race conditions. This example shows one such usage.
Note
All the code for this example is located under the examples/dataflow/locking directory of the DFFML source code.
First we import required packages and objects, and create Definitions for each
data type our operation will use. Note that LOCKED_OBJ
has lock=True
.
example.py
import asyncio
from dffml import Definition, DataFlow, Input, op
from dffml.noasync import run
OBJ = Definition(name="obj", primitive="mapping")
LOCKED_OBJ = Definition(name="locked_obj", primitive="mapping", lock=True)
SLEEP_TIME = Definition(name="sleep_time", primitive="int")
INTEGER = Definition(name="integer", primitive="int")
Our operation run_me
takes an object(of definition OBJ) sets the i
attribute of it, sleeps for given time finally prints the value which was given
as input and the current value of the object.
example.py
@op(inputs={"obj": OBJ, "sleep_for": SLEEP_TIME, "i": INTEGER})
async def run_me(obj: dict, sleep_for: int, i: int) -> None:
obj["i"] = i
await asyncio.sleep(sleep_for)
print(f"set i = {i}, got i = {obj['i']}")
We’ll run the dataflow with two values for i. If the operations run as expected when printing, the value given as input and that of the object would be same.
example.py
print("Running dataflow without locked object")
for ctx, result in run(
DataFlow(run_me),
[
Input(value={}, definition=OBJ),
Input(value=0.1, definition=SLEEP_TIME),
Input(value=0.2, definition=SLEEP_TIME),
Input(value=1, definition=INTEGER),
Input(value=2, definition=INTEGER),
],
):
pass
But this codeblock gives an output
Running dataflow without locked object
set i = 2, got i = 1
set i = 1, got i = 1
set i = 2, got i = 1
set i = 1, got i = 1
We can see that the output is not what we expect. Since everything is running asynchronously,
when one operations sleeps the other operation might start running and it replaces the
value. This is where locks come handy. We’ll set run_me
to take object of definition
LOCKED_OBJ
instead of OBJ
and run the dataflow again.
example.py
print("Running dataflow with locked object")
run_me.op = run_me.op._replace(
inputs={"obj": LOCKED_OBJ, "sleep_for": SLEEP_TIME, "i": INTEGER}
)
for ctx, result in run(
DataFlow(run_me),
[
Input(value={}, definition=LOCKED_OBJ),
Input(value=0.1, definition=SLEEP_TIME),
Input(value=0.2, definition=SLEEP_TIME),
Input(value=1, definition=INTEGER),
Input(value=2, definition=INTEGER),
],
):
pass
This time the output is as expected
Running dataflow with locked object
set i = 2, got i = 2
set i = 1, got i = 1
set i = 2, got i = 2
set i = 1, got i = 1