Source code for dffml.operation.db

import inspect
from typing import Dict, Any, Optional, List

from ..base import config
from ..df.base import op
from ..db.base import Conditions, BaseDatabase
from ..df.types import Definition


# definitions
QUERY_TABLE = Definition(name="query_table", primitive="str")
QUERY_DATA = Definition(name="query_data", primitive="Dict[str, Any]")
QUERY_CONDITIONS = Definition(name="query_conditions", primitive="Conditions")
QUERY_COLS = Definition(name="query_cols", primitive="Dict[str, str]")
QUERY_LOOKUPS = Definition(name="query_lookups", primitive="Dict[str, Any]")


[docs]@config class DatabaseQueryConfig: database: BaseDatabase
# TODO Remove this? # TODO Figure out a way to handle defaults so that all inputs need not be passed to the # flow on execution # Note : Add `query_type`:str to `DatabaseQueryConfig` before use. @op( inputs={ "table_name": QUERY_TABLE, "data": QUERY_DATA, "conditions": QUERY_CONDITIONS, "cols": QUERY_COLS, }, outputs={"lookups": QUERY_LOOKUPS}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query( self, *, table_name: str, data: Dict[str, Any] = {}, conditions: Conditions = [], cols: List[str] = [], ) -> Optional[Dict[str, Any]]: frame = inspect.currentframe() args, _, _, values = inspect.getargvalues(frame) kwargs = {arg: values[arg] for arg in args[1:]} query_fn = self.config.query_type if "create" in query_fn: query_fn = "create_table" allowed = ["create_table", "remove", "update", "insert", "lookup"] if not query_fn in allowed: raise ValueError(f"Only queries of type {allowed} is allowed") query_fn = getattr(self.dbctx, query_fn) try: await query_fn(**kwargs) return {"lookups": {}} except TypeError as e: if "async_gen" in repr(e): result = query_fn(**kwargs) return {"lookups": [res async for res in result]} else: raise e
[docs]@op( inputs={"table_name": QUERY_TABLE, "cols": QUERY_COLS}, outputs={}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_create_table( self, *, table_name: str, cols: List[str] = [] ): """ Generates a create table query in the database. Parameters ---------- table_name : str The name of the table to be created. cols : list[str] Columns of the table. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={"db_query_create": db_query_create_table.op,}, ... configs={"db_query_create": DatabaseQueryConfig(database=sdb),}, ... seed=[], ... ) >>> >>> inputs = [ ... Input( ... value="myTable1", ... definition=db_query_create_table.op.inputs["table_name"], ... ), ... Input( ... value={ ... "key": "real", ... "firstName": "text", ... "lastName": "text", ... "age": "real", ... }, ... definition=db_query_create_table.op.inputs["cols"], ... ), ... ] >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... pass >>> >>> asyncio.run(main()) """ await self.dbctx.create_table(table_name=table_name, cols=cols)
[docs]@op( inputs={"table_name": QUERY_TABLE, "data": QUERY_DATA}, outputs={}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_insert(self, *, table_name: str, data: Dict[str, Any]): """ Generates an insert query in the database. Parameters ---------- table_name : str The name of the table to insert data in to. data : dict Data to be inserted into the table. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_insert": db_query_insert.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... "db_query_insert": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "insert": [ ... Input( ... value="myTable", definition=db_query_insert.op.inputs["table_name"], ... ), ... Input( ... value={"key": 10, "firstName": "John", "lastName": "Doe", "age": 16}, ... definition=db_query_insert.op.inputs["data"], ... ), ... ], ... "lookup": [ ... Input( ... value="myTable", definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ] ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}]} """ await self.dbctx.insert(table_name=table_name, data=data)
[docs]@op( inputs={ "table_name": QUERY_TABLE, "data": QUERY_DATA, "conditions": QUERY_CONDITIONS, }, outputs={}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_update( self, *, table_name: str, data: Dict[str, Any], conditions: Conditions = [] ): """ Generates an Update table query in the database. Parameters ---------- table_name : str The name of the table to insert data in to. data : dict Data to be updated into the table. conditions : list List of query conditions. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_update": db_query_update.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_update": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "update": [ ... Input( ... value="myTable", ... definition=db_query_update.op.inputs["table_name"], ... ), ... Input( ... value={ ... "key": 10, ... "firstName": "John", ... "lastName": "Doe", ... "age": 17, ... }, ... definition=db_query_update.op.inputs["data"], ... ), ... Input(value=[], definition=db_query_update.op.inputs["conditions"],), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 17}]} """ await self.dbctx.update( table_name=table_name, data=data, conditions=conditions )
[docs]@op( inputs={"table_name": QUERY_TABLE, "conditions": QUERY_CONDITIONS}, outputs={}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_remove( self, *, table_name: str, conditions: Conditions = [] ): """ Generates a remove table query in the database. Parameters ---------- table_name : str The name of the table to insert data in to. conditions : Conditions Query conditions. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_lookup": db_query_lookup.op, ... "db_query_remove": db_query_remove.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_remove": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "remove": [ ... Input( ... value="myTable", ... definition=db_query_remove.op.inputs["table_name"], ... ), ... Input(value=[], ... definition=db_query_remove.op.inputs["conditions"],), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': []} """ await self.dbctx.remove(table_name=table_name, conditions=conditions)
[docs]@op( inputs={ "table_name": QUERY_TABLE, "cols": QUERY_COLS, "conditions": QUERY_CONDITIONS, }, outputs={"lookups": QUERY_LOOKUPS}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_lookup( self, *, table_name: str, cols: List[str] = [], conditions: Conditions = [] ) -> Dict[str, Any]: """ Generates a lookup query in the database. Parameters ---------- table_name : str The name of the table. cols : list[str] Columns of the table. conditions : Conditions Query conditions. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={"db_query_lookup": DatabaseQueryConfig(database=sdb),}, ... seed=[], ... ) >>> >>> inputs = { ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Doe', 'age': 16}, {'firstName': 'John', 'lastName': 'Wick', 'age': 39}]} """ result = self.dbctx.lookup( table_name=table_name, cols=cols, conditions=conditions ) return {"lookups": [res async for res in result]}
[docs]@op( inputs={"table_name": QUERY_TABLE, "data": QUERY_DATA}, outputs={}, config_cls=DatabaseQueryConfig, imp_enter={"database": (lambda self: self.config.database)}, ctx_enter={"dbctx": (lambda self: self.parent.database())}, ) async def db_query_insert_or_update( self, *, table_name: str, data: Dict[str, Any] ): """ Automatically uses the better suited operation, insert query or update query. Parameters ---------- table_name : str The name of the table to insert data in to. data : dict Data to be inserted or updated into the table. Examples -------- >>> import asyncio >>> from dffml import * >>> >>> sdb = SqliteDatabase(SqliteDatabaseConfig(filename="examples.db")) >>> >>> person = {"key": 11, "firstName": "John", "lastName": "Wick", "age": 38} >>> >>> dataflow = DataFlow( ... operations={ ... "db_query_insert_or_update": db_query_insert_or_update.op, ... "db_query_lookup": db_query_lookup.op, ... "get_single": GetSingle.imp.op, ... }, ... configs={ ... "db_query_insert_or_update": DatabaseQueryConfig(database=sdb), ... "db_query_lookup": DatabaseQueryConfig(database=sdb), ... }, ... seed=[], ... ) >>> >>> inputs = { ... "insert_or_update": [ ... Input( ... value="myTable", definition=db_query_update.op.inputs["table_name"], ... ), ... Input( ... value=person, ... definition=db_query_update.op.inputs["data"], ... ), ... ], ... "lookup": [ ... Input( ... value="myTable", ... definition=db_query_lookup.op.inputs["table_name"], ... ), ... Input( ... value=["firstName", "lastName", "age"], ... definition=db_query_lookup.op.inputs["cols"], ... ), ... Input(value=[], definition=db_query_lookup.op.inputs["conditions"],), ... Input( ... value=[db_query_lookup.op.outputs["lookups"].name], ... definition=GetSingle.op.inputs["spec"], ... ), ... ], ... } >>> >>> async def main(): ... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs): ... if result: ... print(result) >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 38}]} >>> >>> person["age"] += 1 >>> >>> asyncio.run(main()) {'query_lookups': [{'firstName': 'John', 'lastName': 'Wick', 'age': 39}]} """ await self.dbctx.insert_or_update(table_name=table_name, data=data)