Sources

Sources are implementations of dffml.source.source.BaseSource, they abstract the loading and storage of data / datasets.

If you want to get started creating your own source, check out the Example SQLite source.

dffml

pip install dffml

csv

Official

Uses a CSV file as the source of record feature data

Args

  • filename: String

  • tag: String

    • default: untagged

  • readwrite: String

    • default: False

  • allowempty: String

    • default: False

  • key: String

    • default: key

  • tagcol: String

    • default: tag

  • delimiter: String

    • default: ,

  • loadfiles: List of strings

    • default: None

  • nostrip: String

    • default: False

dataframe

Official

Proxy for a pandas DataFrame

Examples

You can pass a pandas DataFrame to this class directly via the Python API. Or you can create DataFrames from other data sources via the Python API or the command line.

Example of creating a DataFrame from HTML via command line.

Create an HTML table.

index.html

<table>
  <tr>
    <th>Years</th>
    <th>Salary</th>
  </tr>
  <tr>
    <td>0</td>
    <td>10</td>
  </tr>
  <tr>
    <td>1</td>
    <td>20</td>
  </tr>
  <tr>
    <td>2</td>
    <td>30</td>
  </tr>
</table>

Start the HTTP server to server the HTML page with the table

$ python -m http.server 8000

In another terminal. List all the records in the source.

$ dffml list records \
    -sources table=dataframe \
    -source-table-html http://127.0.0.1:8000/index.html \
    -source-table-protocol_allowlist http://

[
    {
        "extra": {},
        "features": {
            "Salary": 10,
            "Years": 0
        },
        "key": "0"
    },
    {
        "extra": {},
        "features": {
            "Salary": 20,
            "Years": 1
        },
        "key": "1"
    },
    {
        "extra": {},
        "features": {
            "Salary": 30,
            "Years": 2
        },
        "key": "2"
    }
]

Args

  • dataframe: pandas.DataFrame

    • default: None

    • The pandas DataFrame to proxy

  • predictions: List of strings

    • default: []

    • Prediction columns whose values we have to update

  • html: String

    • default: None

    • Construct a DataFrame using DataFrame.read_html(). Passing this as URL

  • html_table_index: Integer

    • default: 0

    • If there are multiple html tables on a page, which one? Array indexed, so first table means 0, if you want the second table on the page, use 1 here.

  • protocol_allowlist: List of strings

    • default: [’https://’]

    • List of protocols allowed for html URL. Example ["http://"]

db

Official

No description

Args

  • db: Entrypoint

  • table_name: String

  • model_columns: List of strings

df

Official

No description

Args

  • source: Entrypoint

    • Source to wrap

  • dataflow: DataFlow

    • DataFlow to use for preprocessing

  • features: List of features

    • default: []

    • Features to pass as definitions to each context from each record to be preprocessed

  • inputs: List of strings

    • default: []

    • Other inputs to add under each ctx (record’s key will be used as the context)

  • record_def: String

    • default: None

    • Definition to be used for record.key.If set, record.key will be added to the set of inputs under each context (which is also the record’s key)

  • length: String

    • default: None

    • Definition name to add as source length

  • all_for_single: String

    • default: False

    • Run all records through dataflow before grabing results of desired record on a call to record()

  • no_strict: String

    • default: False

    • Do not exit on operation exceptions, just log errors

  • orchestrator: Entrypoint

    • default: MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None))

    • Orchestrator

dfpreprocess

Official

>>> import asyncio
>>> from dffml import *
>>>
>>> records = [
...     Record(
...         "0",
...         data={
...             "features": {
...                 "Years": 1,
...                 "Expertise": 3,
...                 "Trust": 0.2,
...                 "Salary": 20,
...             }
...         },
...     ),
... ]
>>>
>>> features = Features(
...     Feature("Years", int, 1),
...     Feature("Expertise", int, 1),
...     Feature("Trust", float, 1),
...     Feature("Salary", int, 1),
... )
>>>
>>> dataflow = DataFlow(multiply, AssociateDefinition)
>>> dataflow.flow["multiply"].inputs["multiplicand"] = [
...     {"seed": ["Years", "Expertise", "Trust", "Salary"]}
... ]
>>> dataflow.seed = [
...     Input(
...         value={
...             feature.name: multiply.op.outputs["product"].name
...             for feature in features
...         },
...         definition=AssociateDefinition.op.inputs["spec"],
...     ),
...     Input(value=10, definition=multiply.op.inputs["multiplier"],),
... ]
>>>
>>>
>>> memory_source = Sources(MemorySource(MemorySourceConfig(records=records)))
>>>
>>> source = DataFlowPreprocessSource(
...     DataFlowPreprocessSourceConfig(
...         source=memory_source, dataflow=dataflow, features=features,
...     )
... )
>>>
>>>
>>> async def main():
...     async with source as src:
...         async with src() as sctx:
...             async for record in sctx.records():
...                 print(record.features())
...
>>>
>>> asyncio.run(main())
{'Years': 10, 'Expertise': 30, 'Trust': 2.0, 'Salary': 200}

Args

  • source: Entrypoint

    • Source to wrap

  • dataflow: DataFlow

    • DataFlow to use for preprocessing

  • features: List of features

    • default: []

    • Features to pass as definitions to each context from each record to be preprocessed

  • inputs: List of strings

    • default: []

    • Other inputs to add under each ctx (record’s key will be used as the context)

  • record_def: String

    • default: None

    • Definition to be used for record.key.If set, record.key will be added to the set of inputs under each context (which is also the record’s key)

  • length: String

    • default: None

    • Definition name to add as source length

  • all_for_single: String

    • default: False

    • Run all records through dataflow before grabing results of desired record on a call to record()

  • no_strict: String

    • default: False

    • Do not exit on operation exceptions, just log errors

  • orchestrator: Entrypoint

    • default: MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None))

    • Orchestrator

dir

Official

Source to read files in a folder.

Args

  • foldername: String

  • feature: String

    • Name of the feature the data will be referenced as

  • labels: List of strings

    • default: [‘unlabelled’]

    • Image labels

  • save: Entrypoint

    • default: None

idx1

Official

Source to read files in IDX1 format (such as MNIST digit label dataset).

Args

  • filename: String

  • feature: String

    • Name of the feature the data will be referenced as

  • readwrite: String

    • default: False

  • allowempty: String

    • default: False

idx3

Official

Source to read files in IDX3 format (such as MNIST digit image dataset).

Args

  • filename: String

  • feature: String

    • Name of the feature the data will be referenced as

  • readwrite: String

    • default: False

  • allowempty: String

    • default: False

ini

Official

Source to read files in .ini format.

Args

  • filename: String

  • readwrite: String

    • default: False

  • allowempty: String

    • default: False

iris.training

Official

No description

Args

  • cache_dir: Path

    • default: /home/runner/.cache/dffml/datasets/iris

    • cache_dir

json

Official

JSONSource reads and write from a JSON file on open / close. Otherwise stored in memory.

Args

  • filename: String

  • tag: String

    • default: untagged

  • readwrite: String

    • default: False

  • allowempty: String

    • default: False

memory

Official

Stores records in a dict in memory

Args

  • records: List of records

  • display: Integer

    • default: 10

    • When repr() is called, how many records to display

op

Official

No description

Args

  • opimp: OperationImplementation

  • args: List of strings

    • default: []

    • Arguments to operation in input order

  • allowempty: String

    • default: False

    • Raise an error if the source is empty after running the loading operation

dffml_source_mysql

pip install dffml-source-mysql

mysql

Official

Warning

  • update config property is a SQL query which MUST handle insersion or update. Columns to be updated should list feature columns first, followed by prediction columns.

  • features config property MUST have keys in same order as they appear within update query.

  • prediction config property MUST have keys in same order as they appear within update query.

Examples

Read MySQL or MariaDB’s documentation to understand how to properly setup your server for encrypted connections.

Args

  • user: String

    • Username

  • password: String

    • Password

  • db: String

    • Name of database to use

  • key: String

    • Column name of record key

  • features: typing.Dict[str, str]

    • Mapping of feature names to column names

  • predictions: typing.Dict[str, typing.Tuple[str, str]]

    • Mapping of prediction names to tuples of (value, confidence) column names

  • update: String

    • Query to update a single record

  • record: String

    • Query to get a single record

  • records: String

    • Query to get a single record

  • init: String

    • default: None

    • Query to run on initial connection

  • host: String

    • default: 127.0.0.1

    • Host/address to connect to

  • port: Integer

    • default: 3306

    • Port to connect to

  • ca: String

    • default: None

    • Server certificate to use for TLS validation

  • insecure: String

    • default: False

    • Must be true to accept risks of non-TLS connection