Example SQLite source

This tutorial is for implementing a data source. Which is probably what you want if the way you store your data doesn’t have an existing Sources plugin. Or even more likely, you have a very specific way an existing application you’re trying to integrate with stores data you want in it’s database.

Create the Package

To create a new source we first create a new python package. DFFML has a script to create it for you.

$ dffml service dev create source dffml-source-sqlite
$ cd dffml-source-sqlite

This creates a Python package for you with a source that stores Record objects in memory, called MiscSource, and some tests.

Edit the Source

The implementation of a source consists mainly of creating a subclass of dffml.source.source.BaseSourceContext. Often there will be some initial connection establishment in the dffml.source.source.BaseSource as well (as we will see in the sqlite example).

class dffml.source.source.BaseSourceContext(parent: BaseSource)[source]
abstract async record(key: str)[source]

Get a record from the source or add it if it doesn’t exist.

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = await ctx.record("example")
...             # Let's also try calling `record` for a record that doesnt exist.
...             one = await ctx.record("one")
...             await ctx.update(one)
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
{'key': 'one', 'extra': {}}
abstract async records() AsyncIterator[Record][source]

Returns a list of records retrieved from self.src

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[Record("example", data=dict(features=dict(dead="beef")))]) as source:
...         async with source() as ctx:
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'example', 'features': {'dead': 'beef'}, 'extra': {}}
abstract async update(record: Record)[source]

Updates a record for a source

Examples

>>> import asyncio
>>> from dffml import *
>>>
>>> async def main():
...     async with MemorySource(records=[]) as source:
...         # Open, update, and close
...         async with source() as ctx:
...             example = Record("one", data=dict(features=dict(feed="face")))
...             # ... Update one into our records ...
...             await ctx.update(example)
...             # Let's check out our records after calling `record` and `update`.
...             async for record in ctx.records():
...                 print(record.export())
>>>
>>> asyncio.run(main())
{'key': 'one', 'features': {'feed': 'face'}, 'extra': {}}

We essentially just fill out the methods in the context class. And do any context entry and exit we need to do in the context class and its parent.

Interact with the Database

For this tutorial we’ll be implementing a source which knows how to save and load data from a sqlite database. We’ll be using the aiosqlite package to do this.

If we had a sqlite database will custom columns we could implement it like so.

dffml_source_sqlite/misc.py

import aiosqlite
from collections import OrderedDict
from typing import AsyncIterator

from dffml import config, Record, BaseSource, BaseSourceContext


@config
class CustomSQLiteSourceConfig:
    filename: str


class CustomSQLiteSourceContext(BaseSourceContext):
    async def update(self, record: Record):
        db = self.parent.db
        # Store feature data
        feature_cols = self.parent.FEATURE_COLS
        feature_data = OrderedDict.fromkeys(feature_cols)
        feature_data.update(record.features(feature_cols))
        await db.execute(
            "INSERT OR REPLACE INTO features (key, "
            + ", ".join(feature_cols)
            + ") "
            "VALUES(?, " + ", ".join("?" * len(feature_cols)) + ")",
            [record.key] + list(feature_data.values()),
        )
        # Store prediction
        try:
            prediction = record.prediction("target_name")
            prediction_cols = self.parent.PREDICTION_COLS
            prediction_data = OrderedDict.fromkeys(prediction_cols)
            prediction_data.update(prediction.dict())
            await db.execute(
                "INSERT OR REPLACE INTO prediction (key, "
                + ", ".join(prediction_cols)
                + ") "
                "VALUES(?, " + ", ".join("?" * len(prediction_cols)) + ")",
                [record.key] + list(prediction_data.values()),
            )
        except KeyError:
            pass

    async def records(self) -> AsyncIterator[Record]:
        # NOTE This logic probably isn't what you want. Only for demo purposes.
        keys = await self.parent.db.execute("SELECT key FROM features")
        for row in await keys.fetchall():
            yield await self.record(row["key"])

    async def record(self, key: str):
        db = self.parent.db
        record = Record(key)
        # Get features
        features = await db.execute(
            "SELECT " + ", ".join(self.parent.FEATURE_COLS) + " "
            "FROM features WHERE key=?",
            (record.key,),
        )
        features = await features.fetchone()
        if features is not None:
            record.evaluated(features)
        # Get prediction
        prediction = await db.execute(
            "SELECT * FROM prediction WHERE " "key=?", (record.key,)
        )
        prediction = await prediction.fetchone()
        if prediction is not None:
            record.predicted(
                "target_name", prediction["value"], prediction["confidence"]
            )
        return record

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.parent.db.commit()


class CustomSQLiteSource(BaseSource):

    CONFIG = CustomSQLiteSourceConfig
    CONTEXT = CustomSQLiteSourceContext
    FEATURE_COLS = ["PetalLength", "PetalWidth", "SepalLength", "SepalWidth"]
    PREDICTION_COLS = ["value", "confidence"]

    async def __aenter__(self) -> "BaseSourceContext":
        self.__db = aiosqlite.connect(self.config.filename)
        self.db = await self.__db.__aenter__()
        self.db.row_factory = aiosqlite.Row
        # Create table for feature data
        await self.db.execute(
            "CREATE TABLE IF NOT EXISTS features ("
            "key TEXT PRIMARY KEY NOT NULL, "
            + (" REAL, ".join(self.FEATURE_COLS))
            + " REAL"
            ")"
        )
        # Create table for predictions
        await self.db.execute(
            "CREATE TABLE IF NOT EXISTS prediction ("
            "key TEXT PRIMARY KEY, " + "value TEXT, "
            "confidence REAL"
            ")"
        )
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.__db.__aexit__(exc_type, exc_value, traceback)

Register your source

Modify the entry_points.txt file and change the dffml.source entry point to point to your new source class (not the one ending in Context).

entry_points.txt

[dffml.source]
customsqlite = dffml_source_sqlite.custom_sqlite:CustomSQLiteSource

This allows you to use your source with the CLI and HTTP API (after you install it).

We also need to add to the install_requires list in setup.cfg. This list tells pip that aiosqlite needs to be installed. We need to install it since we will be importing it. Add the following line to the list after dffml.

setup.cfg

    aiosqlite>=0.15.0

Install your package

The following command installs your new source.

$ python -m pip install -e .[dev]

Write the tests

tests/test_source.py

import unittest

from dffml import AsyncTestCase, FileSourceTest

from dffml_source_sqlite.misc import (
    CustomSQLiteSourceConfig,
    CustomSQLiteSource,
)


class TestCustomSQliteSource(FileSourceTest, AsyncTestCase):
    async def setUpSource(self):
        return CustomSQLiteSource(
            CustomSQLiteSourceConfig(filename=self.testfile)
        )

    @unittest.skip("tags not implemented")
    async def test_tag(self):
        """
        tags not implemented
        """

Run the tests

$ python -m unittest discover -v