Operation Db¶
- class dffml.operation.db.DatabaseQueryConfig(database: dffml.db.base.BaseDatabase)[source]¶
- no_enforce_immutable()¶
By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.
Examples
>>> from dffml import config >>> >>> @config ... class MyConfig: ... C: int >>> >>> config = MyConfig(C=2) >>> with config.no_enforce_immutable(): ... config.C = 1
- async dffml.operation.db.db_query_create_table(self, *, table_name: str, cols: List[str] = [])[source]¶
Generates a create table query in the database.
- Parameters:
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={"db_query_create": db_query_create_table.op,}, ... configs={"db_query_create": DatabaseQueryConfig(database=sdb),}, ... seed=[], ... ) >>> >>> inputs = [ ... Input( ... value="myTable1", ... definition=db_query_create_table.op.inputs["table_name"], ... ), ... Input( ... value={ ... "key": "real", ... "firstName": "text", ... "lastName": "text", ... "age": "real", ... }, ... definition=db_query_create_table.op.inputs["cols"], ... ), ... ] >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... pass >>> >>> asyncio.run(main())
- async dffml.operation.db.db_query_insert(self, *, table_name: str, data: Dict[str, Any])[source]¶
Generates an insert query in the database.
- Parameters:
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_insert": db_query_insert.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... "db_query_insert": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "insert": [ ... Input( ... value="myTable", definition=db_query_insert.op.inputs["table_name"], ... ), ... Input( ... value={"key": 10, "firstName": "John", "lastName": "Doe", "age": 16}, ... definition=db_query_insert.op.inputs["data"], ... ), ... ], ... "lookup": [ ... Input( ... value="myTable", definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ] ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}]}
- async dffml.operation.db.db_query_insert_or_update(self, *, table_name: str, data: Dict[str, Any])[source]¶
Automatically uses the better suited operation, insert query or update query.
- Parameters:
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> person = {"key": 11, "firstName": "John", "lastName": "Wick", "age": 38} >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_insert_or_update": db_query_insert_or_update.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_insert_or_update": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "insert_or_update": [ ... Input( ... value="myTable", definition=db_query_update.op.inputs["table_name"], ... ), ... Input( ... value=person, ... definition=db_query_update.op.inputs["data"], ... ), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 38}]} >>> >>> person["age"] += 1 >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 39}]}
- async dffml.operation.db.db_query_lookup(self, *, table_name: str, cols: List[str] = [], conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = []) Dict[str, Any] [source]¶
Generates a lookup query in the database.
- Parameters:
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={"db_query_lookup": DatabaseQueryConfig(database=sdb),}, ... seed=[], ... ) >>> >>> inputs = { ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}, {'firstName': 'John', 'lastName': 'Wick', 'age': 39}]}
- async dffml.operation.db.db_query_remove(self, *, table_name: str, conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = [])[source]¶
Generates a remove table query in the database.
- Parameters:
table_name (str) – The name of the table to insert data in to.
conditions (Conditions) – Query conditions.
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_lookup": db_query_lookup.op, ... "db_query_remove": db_query_remove.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_remove": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "remove": [ ... Input( ... value="myTable", ... definition=db_query_remove.op.inputs["table_name"], ... ), ... Input(value=[], ... definition=db_query_remove.op.inputs["conditions"],), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': []}
- async dffml.operation.db.db_query_update(self, *, table_name: str, data: Dict[str, Any], conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = [])[source]¶
Generates an Update table query in the database.
- Parameters:
Examples
>>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_update": db_query_update.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_update": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "update": [ ... Input( ... value="myTable", ... definition=db_query_update.op.inputs["table_name"], ... ), ... Input( ... value={ ... "key": 10, ... "firstName": "John", ... "lastName": "Doe", ... "age": 17, ... }, ... definition=db_query_update.op.inputs["data"], ... ), ... Input(value=[], definition=db_query_update.op.inputs["conditions"],), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 17}]}