Operation Db

class dffml.operation.db.DatabaseQueryConfig(database: dffml.db.base.BaseDatabase)[source]
no_enforce_immutable()

By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.

Examples

>>> from dffml import config
>>>
>>> @config
... class MyConfig:
...     C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
...     config.C = 1
async dffml.operation.db.db_query_create_table(self, *, table_name: str, cols: List[str] = [])[source]

Generates a create table query in the database.

Parameters:
  • table_name (str) – The name of the table to be created.

  • cols (list[str]) – Columns of the table.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> dataflow = DataFlow(
...     operations={"db_query_create": db_query_create_table.op,},
...     configs={"db_query_create": DatabaseQueryConfig(database=sdb),},
...     seed=[],
... )
>>>
>>> inputs = [
...     Input(
...         value="myTable1",
...         definition=db_query_create_table.op.inputs["table_name"],
...     ),
...     Input(
...         value={
...             "key": "real",
...             "firstName": "text",
...             "lastName": "text",
...             "age": "real",
...         },
...         definition=db_query_create_table.op.inputs["cols"],
...     ),
... ]
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         pass
>>>
>>> asyncio.run(main())
async dffml.operation.db.db_query_insert(self, *, table_name: str, data: Dict[str, Any])[source]

Generates an insert query in the database.

Parameters:
  • table_name (str) – The name of the table to insert data in to.

  • data (dict) – Data to be inserted into the table.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> dataflow = DataFlow(
...     operations={
...         "db_query_insert": db_query_insert.op,
...         "db_query_lookup": db_query_lookup.op,
...         "get_single": GetSingle.imp.op,
...     },
...     configs={
...         "db_query_lookup": DatabaseQueryConfig(database=sdb),
...         "db_query_insert": DatabaseQueryConfig(database=sdb),
...     },
...     seed=[],
... )
>>>
>>> inputs = {
...     "insert": [
...         Input(
...             value="myTable", definition=db_query_insert.op.inputs["table_name"],
...         ),
...         Input(
...            value={"key": 10, "firstName": "John", "lastName": "Doe", "age": 16},
...             definition=db_query_insert.op.inputs["data"],
...         ),
...     ],
...     "lookup": [
...         Input(
...             value="myTable", definition=db_query_lookup.op.inputs["table_name"],
...         ),
...         Input(
...             value=["firstName", "lastName", "age"],
...             definition=db_query_lookup.op.inputs["cols"],
...         ),
...         Input(value=[], definition=db_query_lookup.op.inputs["conditions"],),
...         Input(
...             value=[db_query_lookup.op.outputs["lookups"].name],
...             definition=GetSingle.op.inputs["spec"],
...         ),
...     ]
... }
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         if result:
...             print(result)
>>>
>>> asyncio.run(main())
{'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}]}
async dffml.operation.db.db_query_insert_or_update(self, *, table_name: str, data: Dict[str, Any])[source]

Automatically uses the better suited operation, insert query or update query.

Parameters:
  • table_name (str) – The name of the table to insert data in to.

  • data (dict) – Data to be inserted or updated into the table.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> person = {"key": 11, "firstName": "John", "lastName": "Wick", "age": 38}
>>>
>>> dataflow = DataFlow(
...     operations={
...         "db_query_insert_or_update": db_query_insert_or_update.op,
...         "db_query_lookup": db_query_lookup.op,
...         "get_single": GetSingle.imp.op,
...     },
...     configs={
...         "db_query_insert_or_update": DatabaseQueryConfig(database=sdb),
...         "db_query_lookup": DatabaseQueryConfig(database=sdb),
...     },
...     seed=[],
... )
>>>
>>> inputs = {
...     "insert_or_update": [
...         Input(
...             value="myTable", definition=db_query_update.op.inputs["table_name"],
...         ),
...         Input(
...             value=person,
...             definition=db_query_update.op.inputs["data"],
...         ),
...     ],
...     "lookup": [
...         Input(
...             value="myTable",
...             definition=db_query_lookup.op.inputs["table_name"],
...         ),
...         Input(
...             value=["firstName", "lastName", "age"],
...             definition=db_query_lookup.op.inputs["cols"],
...         ),
...         Input(value=[], definition=db_query_lookup.op.inputs["conditions"],),
...         Input(
...             value=[db_query_lookup.op.outputs["lookups"].name],
...             definition=GetSingle.op.inputs["spec"],
...         ),
...     ],
... }
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         if result:
...             print(result)
>>>
>>> asyncio.run(main())
{'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 38}]}
>>>
>>> person["age"] += 1
>>>
>>> asyncio.run(main())
{'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 39}]}
async dffml.operation.db.db_query_lookup(self, *, table_name: str, cols: List[str] = [], conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = []) Dict[str, Any][source]

Generates a lookup query in the database.

Parameters:
  • table_name (str) – The name of the table.

  • cols (list[str]) – Columns of the table.

  • conditions (Conditions) – Query conditions.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> dataflow = DataFlow(
...     operations={
...         "db_query_lookup": db_query_lookup.op,
...         "get_single": GetSingle.imp.op,
...     },
...     configs={"db_query_lookup": DatabaseQueryConfig(database=sdb),},
...     seed=[],
... )
>>>
>>> inputs = {
...     "lookup": [
...         Input(
...             value="myTable",
...             definition=db_query_lookup.op.inputs["table_name"],
...         ),
...         Input(
...             value=["firstName", "lastName", "age"],
...             definition=db_query_lookup.op.inputs["cols"],
...         ),
...         Input(value=[], definition=db_query_lookup.op.inputs["conditions"],),
...         Input(
...             value=[db_query_lookup.op.outputs["lookups"].name],
...             definition=GetSingle.op.inputs["spec"],
...         ),
...     ],
... }
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         if result:
...             print(result)
>>>
>>> asyncio.run(main())
{'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}, {'firstName': 'John', 'lastName': 'Wick', 'age': 39}]}
async dffml.operation.db.db_query_remove(self, *, table_name: str, conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = [])[source]

Generates a remove table query in the database.

Parameters:
  • table_name (str) – The name of the table to insert data in to.

  • conditions (Conditions) – Query conditions.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> dataflow = DataFlow(
...     operations={
...         "db_query_lookup": db_query_lookup.op,
...         "db_query_remove": db_query_remove.op,
...         "get_single": GetSingle.imp.op,
...     },
...     configs={
...         "db_query_remove": DatabaseQueryConfig(database=sdb),
...         "db_query_lookup": DatabaseQueryConfig(database=sdb),
...     },
...     seed=[],
... )
>>>
>>> inputs = {
...     "remove": [
...         Input(
...             value="myTable",
...             definition=db_query_remove.op.inputs["table_name"],
...         ),
...         Input(value=[],
...         definition=db_query_remove.op.inputs["conditions"],),
...     ],
...     "lookup": [
...         Input(
...             value="myTable",
...             definition=db_query_lookup.op.inputs["table_name"],
...         ),
...         Input(
...             value=["firstName", "lastName", "age"],
...             definition=db_query_lookup.op.inputs["cols"],
...         ),
...         Input(value=[], definition=db_query_lookup.op.inputs["conditions"],),
...         Input(
...             value=[db_query_lookup.op.outputs["lookups"].name],
...             definition=GetSingle.op.inputs["spec"],
...         ),
...     ],
... }
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         if result:
...             print(result)
>>>
>>> asyncio.run(main())
{'query_lookups': []}
async dffml.operation.db.db_query_update(self, *, table_name: str, data: Dict[str, Any], conditions: Union[List[List[Condition]], List[List[Tuple[str]]]] = [])[source]

Generates an Update table query in the database.

Parameters:
  • table_name (str) – The name of the table to insert data in to.

  • data (dict) – Data to be updated into the table.

  • conditions (list) – List of query conditions.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db"))
>>>
>>> dataflow = DataFlow(
...     operations={
...         "db_query_update": db_query_update.op,
...         "db_query_lookup": db_query_lookup.op,
...         "get_single": GetSingle.imp.op,
...     },
...     configs={
...         "db_query_update": DatabaseQueryConfig(database=sdb),
...         "db_query_lookup": DatabaseQueryConfig(database=sdb),
...     },
...     seed=[],
... )
>>>
>>> inputs = {
...     "update": [
...         Input(
...             value="myTable",
...             definition=db_query_update.op.inputs["table_name"],
...         ),
...         Input(
...             value={
...                 "key": 10,
...                 "firstName": "John",
...                 "lastName": "Doe",
...                 "age": 17,
...             },
...             definition=db_query_update.op.inputs["data"],
...         ),
...         Input(value=[], definition=db_query_update.op.inputs["conditions"],),
...     ],
...     "lookup": [
...         Input(
...             value="myTable",
...             definition=db_query_lookup.op.inputs["table_name"],
...         ),
...         Input(
...             value=["firstName", "lastName", "age"],
...             definition=db_query_lookup.op.inputs["cols"],
...         ),
...         Input(value=[], definition=db_query_lookup.op.inputs["conditions"],),
...         Input(
...             value=[db_query_lookup.op.outputs["lookups"].name],
...             definition=GetSingle.op.inputs["spec"],
...         ),
...     ],
... }
>>>
>>> async def main():
...     async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
...         if result:
...             print(result)
>>>
>>> asyncio.run(main())
{'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 17}]}