Example SQLite source¶
This tutorial is for implementing a data source. Which is probably what you want if the way you store your data doesn’t have an existing Sources plugin. Or even more likely, you have a very specific way an existing application you’re trying to integrate with stores data you want in it’s database.
Create the Package¶
To create a new source we first create a new python package. DFFML has a script to create it for you.
$ dffml service dev create source dffml-source-sqlite
$ cd dffml-source-sqlite
This creates a Python package for you with a source that stores Record
objects
in memory, called MiscSource
, and some tests.
Edit the Source¶
The implementation of a source consists mainly of creating a subclass of
dffml.source.source.BaseSourceContext
. Often there will be some initial
connection establishment in the dffml.source.source.BaseSource
as well
(as we will see in the sqlite example).
- class dffml.source.source.BaseSourceContext(parent: BaseSource)[source]
- abstract async record(key: str)[source]
Get a record from the source or add it if it doesn’t exist.
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = await ctx.record("example") ... # Let's also try calling `record` for a record that doesnt exist. ... one = await ctx.record("one") ... await ctx.update(one) ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}} {'key': 'one', 'extra': {}}
- abstract async records() AsyncIterator[Record] [source]
Returns a list of records retrieved from self.src
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source: ... async with source() as ctx: ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
- abstract async update(record: Record)[source]
Updates a record for a source
Examples
>>> import asyncio >>> from dffml import * >>> >>> async def main(): ... async with MemorySource(records=[]) as source: ... # Open, update, and close ... async with source() as ctx: ... example = Record("one", data=dict(features=dict(feed="face"))) ... # ... Update one into our records ... ... await ctx.update(example) ... # Let's check out our records after calling `record` and `update`. ... async for record in ctx.records(): ... print(record.export()) >>> >>> asyncio.run(main()) {'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}
We essentially just fill out the methods in the context class. And do any context entry and exit we need to do in the context class and its parent.
Interact with the Database¶
For this tutorial we’ll be implementing a source which knows how to save and
load data from a sqlite
database. We’ll be using the aiosqlite
package
to do this.
If we had a sqlite
database will custom columns we could implement it like
so.
dffml_source_sqlite/misc.py
import aiosqlite
from collections import OrderedDict
from typing import AsyncIterator
from dffml import config, Record, BaseSource, BaseSourceContext
@config
class CustomSQLiteSourceConfig:
filename: str
class CustomSQLiteSourceContext(BaseSourceContext):
async def update(self, record: Record):
db = self.parent.db
# Store feature data
feature_cols = self.parent.FEATURE_COLS
feature_data = OrderedDict.fromkeys(feature_cols)
feature_data.update(record.features(feature_cols))
await db.execute(
"INSERT OR REPLACE INTO features (key, "
+ ", ".join(feature_cols)
+ ") "
"VALUES(?, " + ", ".join("?" * len(feature_cols)) + ")",
[record.key] + list(feature_data.values()),
)
# Store prediction
try:
prediction = record.prediction("target_name")
prediction_cols = self.parent.PREDICTION_COLS
prediction_data = OrderedDict.fromkeys(prediction_cols)
prediction_data.update(prediction.dict())
await db.execute(
"INSERT OR REPLACE INTO prediction (key, "
+ ", ".join(prediction_cols)
+ ") "
"VALUES(?, " + ", ".join("?" * len(prediction_cols)) + ")",
[record.key] + list(prediction_data.values()),
)
except KeyError:
pass
async def records(self) -> AsyncIterator[Record]:
# NOTE This logic probably isn't what you want. Only for demo purposes.
keys = await self.parent.db.execute("SELECT key FROM features")
for row in await keys.fetchall():
yield await self.record(row["key"])
async def record(self, key: str):
db = self.parent.db
record = Record(key)
# Get features
features = await db.execute(
"SELECT " + ", ".join(self.parent.FEATURE_COLS) + " "
"FROM features WHERE key=?",
(record.key,),
)
features = await features.fetchone()
if features is not None:
record.evaluated(features)
# Get prediction
prediction = await db.execute(
"SELECT * FROM prediction WHERE " "key=?", (record.key,)
)
prediction = await prediction.fetchone()
if prediction is not None:
record.predicted(
"target_name", prediction["value"], prediction["confidence"]
)
return record
async def __aexit__(self, exc_type, exc_value, traceback):
await self.parent.db.commit()
class CustomSQLiteSource(BaseSource):
CONFIG = CustomSQLiteSourceConfig
CONTEXT = CustomSQLiteSourceContext
FEATURE_COLS = ["PetalLength", "PetalWidth", "SepalLength", "SepalWidth"]
PREDICTION_COLS = ["value", "confidence"]
async def __aenter__(self) -> "BaseSourceContext":
self.__db = aiosqlite.connect(self.config.filename)
self.db = await self.__db.__aenter__()
self.db.row_factory = aiosqlite.Row
# Create table for feature data
await self.db.execute(
"CREATE TABLE IF NOT EXISTS features ("
"key TEXT PRIMARY KEY NOT NULL, "
+ (" REAL, ".join(self.FEATURE_COLS))
+ " REAL"
")"
)
# Create table for predictions
await self.db.execute(
"CREATE TABLE IF NOT EXISTS prediction ("
"key TEXT PRIMARY KEY, " + "value TEXT, "
"confidence REAL"
")"
)
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.__db.__aexit__(exc_type, exc_value, traceback)
Register your source¶
Modify the entry_points.txt file and change the dffml.source
entry point to point to your new source class (not the one ending in
Context
).
entry_points.txt
[dffml.source]
customsqlite = dffml_source_sqlite.custom_sqlite:CustomSQLiteSource
This allows you to use your source with the CLI and HTTP API (after you install it).
We also need to add to the install_requires
list in setup.cfg
. This list
tells pip that aiosqlite
needs to be installed. We need to install it since
we will be importing it. Add the following line to the list after dffml
.
setup.cfg
aiosqlite>=0.15.0
Install your package¶
The following command installs your new source.
$ python -m pip install -e .[dev]
Write the tests¶
tests/test_source.py
import unittest
from dffml import AsyncTestCase, FileSourceTest
from dffml_source_sqlite.misc import (
CustomSQLiteSourceConfig,
CustomSQLiteSource,
)
class TestCustomSQliteSource(FileSourceTest, AsyncTestCase):
async def setUpSource(self):
return CustomSQLiteSource(
CustomSQLiteSourceConfig(filename=self.testfile)
)
@unittest.skip("tags not implemented")
async def test_tag(self):
"""
tags not implemented
"""
Run the tests¶
$ python -m unittest discover -v