Sources¶
Sources are implementations of dffml.source.source.BaseSource
, they
abstract the loading and storage of data / datasets.
If you want to get started creating your own source, check out the Example SQLite source.
dffml¶
pip install dffml
csv¶
Official
Uses a CSV file as the source of record feature data
Args
filename: String
tag: String
default: untagged
readwrite: String
default: False
allowempty: String
default: False
key: String
default: key
tagcol: String
default: tag
delimiter: String
default: ,
loadfiles: List of strings
default: None
nostrip: String
default: False
dataframe¶
Official
Proxy for a pandas DataFrame
Examples¶
You can pass a pandas DataFrame to this class directly via the Python API. Or you can create DataFrames from other data sources via the Python API or the command line.
Example of creating a DataFrame from HTML via command line.
Create an HTML table.
index.html
<table>
<tr>
<th>Years</th>
<th>Salary</th>
</tr>
<tr>
<td>0</td>
<td>10</td>
</tr>
<tr>
<td>1</td>
<td>20</td>
</tr>
<tr>
<td>2</td>
<td>30</td>
</tr>
</table>
Start the HTTP server to server the HTML page with the table
$ python -m http.server 8000
In another terminal. List all the records in the source.
$ dffml list records \
-sources table=dataframe \
-source-table-html http://127.0.0.1:8000/index.html \
-source-table-protocol_allowlist http://
[
{
"extra": {},
"features": {
"Salary": 10,
"Years": 0
},
"key": "0"
},
{
"extra": {},
"features": {
"Salary": 20,
"Years": 1
},
"key": "1"
},
{
"extra": {},
"features": {
"Salary": 30,
"Years": 2
},
"key": "2"
}
]
Args
dataframe: pandas.DataFrame
default: None
The pandas DataFrame to proxy
predictions: List of strings
default: []
Prediction columns whose values we have to update
html: String
default: None
Construct a DataFrame using DataFrame.read_html(). Passing this as URL
html_table_index: Integer
default: 0
If there are multiple html tables on a page, which one? Array indexed, so first table means 0, if you want the second table on the page, use 1 here.
protocol_allowlist: List of strings
default: [’https://’]
List of protocols allowed for
html
URL. Example["http://"]
db¶
Official
No description
Args
db: Entrypoint
table_name: String
model_columns: List of strings
df¶
Official
No description
Args
source: Entrypoint
Source to wrap
dataflow: DataFlow
DataFlow to use for preprocessing
features: List of features
default: []
Features to pass as definitions to each context from each record to be preprocessed
inputs: List of strings
default: []
Other inputs to add under each ctx (record’s key will be used as the context)
record_def: String
default: None
Definition to be used for record.key.If set, record.key will be added to the set of inputs under each context (which is also the record’s key)
length: String
default: None
Definition name to add as source length
all_for_single: String
default: False
Run all records through dataflow before grabing results of desired record on a call to record()
no_strict: String
default: False
Do not exit on operation exceptions, just log errors
orchestrator: Entrypoint
default: MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None))
Orchestrator
dfpreprocess¶
Official
>>> import asyncio
>>> from dffml import *
>>>
>>> records = [
... Record(
... "0",
... data={
... "features": {
... "Years": 1,
... "Expertise": 3,
... "Trust": 0.2,
... "Salary": 20,
... }
... },
... ),
... ]
>>>
>>> features = Features(
... Feature("Years", int, 1),
... Feature("Expertise", int, 1),
... Feature("Trust", float, 1),
... Feature("Salary", int, 1),
... )
>>>
>>> dataflow = DataFlow(multiply, AssociateDefinition)
>>> dataflow.flow["multiply"].inputs["multiplicand"] = [
... {"seed": ["Years", "Expertise", "Trust", "Salary"]}
... ]
>>> dataflow.seed = [
... Input(
... value={
... feature.name: multiply.op.outputs["product"].name
... for feature in features
... },
... definition=AssociateDefinition.op.inputs["spec"],
... ),
... Input(value=10, definition=multiply.op.inputs["multiplier"],),
... ]
>>>
>>>
>>> memory_source = Sources(MemorySource(MemorySourceConfig(records=records)))
>>>
>>> source = DataFlowPreprocessSource(
... DataFlowPreprocessSourceConfig(
... source=memory_source, dataflow=dataflow, features=features,
... )
... )
>>>
>>>
>>> async def main():
... async with source as src:
... async with src() as sctx:
... async for record in sctx.records():
... print(record.features())
...
>>>
>>> asyncio.run(main())
{'Years': 10, 'Expertise': 30, 'Trust': 2.0, 'Salary': 200}
Args
source: Entrypoint
Source to wrap
dataflow: DataFlow
DataFlow to use for preprocessing
features: List of features
default: []
Features to pass as definitions to each context from each record to be preprocessed
inputs: List of strings
default: []
Other inputs to add under each ctx (record’s key will be used as the context)
record_def: String
default: None
Definition to be used for record.key.If set, record.key will be added to the set of inputs under each context (which is also the record’s key)
length: String
default: None
Definition name to add as source length
all_for_single: String
default: False
Run all records through dataflow before grabing results of desired record on a call to record()
no_strict: String
default: False
Do not exit on operation exceptions, just log errors
orchestrator: Entrypoint
default: MemoryOrchestrator(MemoryOrchestratorConfig(input_network=MemoryInputNetwork(MemoryInputNetworkConfig()), operation_network=MemoryOperationNetwork(MemoryOperationNetworkConfig(operations=[])), lock_network=MemoryLockNetwork(MemoryLockNetworkConfig()), opimp_network=MemoryOperationImplementationNetwork(MemoryOperationImplementationNetworkConfig(operations={})), rchecker=MemoryRedundancyChecker(MemoryRedundancyCheckerConfig(kvstore=MemoryKeyValueStore(MemoryKeyValueStoreConfig()))), max_ctxs=None))
Orchestrator
dir¶
Official
Source to read files in a folder.
Args
foldername: String
feature: String
Name of the feature the data will be referenced as
labels: List of strings
default: [‘unlabelled’]
Image labels
save: Entrypoint
default: None
idx1¶
Official
Source to read files in IDX1 format (such as MNIST digit label dataset).
Args
filename: String
feature: String
Name of the feature the data will be referenced as
readwrite: String
default: False
allowempty: String
default: False
idx3¶
Official
Source to read files in IDX3 format (such as MNIST digit image dataset).
Args
filename: String
feature: String
Name of the feature the data will be referenced as
readwrite: String
default: False
allowempty: String
default: False
ini¶
Official
Source to read files in .ini format.
Args
filename: String
readwrite: String
default: False
allowempty: String
default: False
iris.training¶
Official
No description
Args
cache_dir: Path
default: /home/runner/.cache/dffml/datasets/iris
cache_dir
json¶
Official
JSONSource reads and write from a JSON file on open / close. Otherwise stored in memory.
Args
filename: String
tag: String
default: untagged
readwrite: String
default: False
allowempty: String
default: False
memory¶
Official
Stores records in a dict in memory
Args
records: List of records
display: Integer
default: 10
When repr() is called, how many records to display
op¶
Official
No description
Args
opimp: OperationImplementation
args: List of strings
default: []
Arguments to operation in input order
allowempty: String
default: False
Raise an error if the source is empty after running the loading operation
dffml_source_mysql¶
pip install dffml-source-mysql
mysql¶
Official
Warning
update
config property is a SQL query which MUST handle insersion or update. Columns to be updated should list feature columns first, followed by prediction columns.features
config property MUST have keys in same order as they appear withinupdate
query.prediction
config property MUST have keys in same order as they appear withinupdate
query.
Examples¶
Read MySQL or MariaDB’s documentation to understand how to properly setup your server for encrypted connections.
Args
user: String
Username
password: String
Password
db: String
Name of database to use
key: String
Column name of record key
features: typing.Dict[str, str]
Mapping of feature names to column names
predictions: typing.Dict[str, typing.Tuple[str, str]]
Mapping of prediction names to tuples of (value, confidence) column names
update: String
Query to update a single record
record: String
Query to get a single record
records: String
Query to get a single record
init: String
default: None
Query to run on initial connection
host: String
default: 127.0.0.1
Host/address to connect to
port: Integer
default: 3306
Port to connect to
ca: String
default: None
Server certificate to use for TLS validation
insecure: String
default: False
Must be true to accept risks of non-TLS connection