New Operations Tutorial

This tutorial will explain what operations are and how you can use them. In the process we’ll create a meta static analysis tool, shouldi.

Operations are the core of DFFML, they have inputs and outputs, are configurable and are run by the Orchestrator in what amounts to a large event loop. The events in the event loop are pieces of data entering the network. When a piece of data which matches the data types of one of the operation’s inputs enters the network, that operation is then run.

We’re going to write a few operations which will run some Python static analysis tools. With the goal being to create a command line utility called shouldi which will provide us with the information we need to make the decision, should I install Python package X? When it’s done it’ll look like this

$ shouldi install dffml insecure-package
dffml is okay to install
Do not install insecure-package!
    safety_check_number_of_issues: 1
    bandit_output: {'CONFIDENCE.HIGH': 0.0, 'CONFIDENCE.LOW': 0.0, 'CONFIDENCE.MEDIUM': 0.0, 'CONFIDENCE.UNDEFINED': 0.0, 'SEVERITY.HIGH': 0.0, 'SEVERITY.LOW': 0.0, 'SEVERITY.MEDIUM': 0.0, 'SEVERITY.UNDEFINED': 0.0, 'loc': 100, 'nosec': 0, 'CONFIDENCE.HIGH_AND_SEVERITY.HIGH': 0}

In the second half of this tutorial, we’ll deploy the tool as an HTTP API endpoint rather than a command line application.

$ curl -s \
  --header "Content-Type: application/json" \
  --request POST \
  --data '{"insecure-package": [{"value":"insecure-package","definition":"package"}]}' \
  http://localhost:8080/shouldi | python -m json.tool
{
    "insecure-package": {
        "safety_check_number_of_issues": 1,
        "bandit_output": {
            "CONFIDENCE.HIGH": 0,
            "CONFIDENCE.LOW": 0,
            "CONFIDENCE.MEDIUM": 0,
            "CONFIDENCE.UNDEFINED": 0,
            "SEVERITY.HIGH": 0,
            "SEVERITY.LOW": 0,
            "SEVERITY.MEDIUM": 0,
            "SEVERITY.UNDEFINED": 0,
            "loc": 100,
            "nosec": 0,
            "CONFIDENCE.HIGH_AND_SEVERITY.HIGH": 0
        }
    }
}

Tools

We’ll write this meta static analysis tool by collecting and interpreting the output of static analysis tools. The two tools we’ll use are safety and bandit.

safety is a tool that checks for known vulnerabilities in packages published on PyPi. This is how running safety on the command line works, we supply the package name and version.

$ echo insecure-package==0.1.0 | safety check --stdin
╒══════════════════════════════════════════════════════════════════════════════╕
│                                                                              │
│                               /$$$$$$            /$$                         │
│                              /$$__  $$          | $$                         │
│           /$$$$$$$  /$$$$$$ | $$  \__//$$$$$$  /$$$$$$   /$$   /$$           │
│          /$$_____/ |____  $$| $$$$   /$$__  $$|_  $$_/  | $$  | $$           │
│         |  $$$$$$   /$$$$$$$| $$_/  | $$$$$$$$  | $$    | $$  | $$           │
│          \____  $$ /$$__  $$| $$    | $$_____/  | $$ /$$| $$  | $$           │
│          /$$$$$$$/|  $$$$$$$| $$    |  $$$$$$$  |  $$$$/|  $$$$$$$           │
│         |_______/  \_______/|__/     \_______/   \___/   \____  $$           │
│                                                          /$$  | $$           │
│                                                         |  $$$$$$/           │
│  by pyup.io                                              \______/            │
│                                                                              │
╞══════════════════════════════════════════════════════════════════════════════╡
│ REPORT                                                                       │
│ checked 1 packages, using default DB                                         │
╞════════════════════════════╤═══════════╤══════════════════════════╤══════════╡
│ package                    │ installed │ affected                 │ ID       │
╞════════════════════════════╧═══════════╧══════════════════════════╧══════════╡
│ insecure-package           │ 0.1.0     │ <0.2.0                   │ 25853    │
╘══════════════════════════════════════════════════════════════════════════════╛

bandit is a tool that does static analysis on the source code of Python projects to check for things like SQL injections. This is how running bandit on the command line works, we supply the path to the source directory to scan.

$ bandit -r distributed-android-testing/
[main]  INFO    profile include tests: None
[main]  INFO    profile exclude tests: None
[main]  INFO    cli include tests: None
[main]  INFO    cli exclude tests: None
[main]  INFO    running on Python 3.7.3
67 [0.. 50.. ]
Run started:2019-10-04 19:41:06.701058

Test results:
>> Issue: [B108:hardcoded_tmp_directory] Probable insecure usage of temp file/directory.
   Severity: Medium   Confidence: Medium
   Location: distributed-android-testing/docker/docker.py:20
   More Info: https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html
19              "chmod 700 /tmp/docker_install.sh",
20              "/tmp/docker_install.sh",
21              "usermod -aG docker ${USER}",
22              "service docker restart"
23          ]
24          kwargs["sudo"] = True
25          ssh.run_all(command, **kwargs)

--------------------------------------------------
>> Issue: [B104:hardcoded_bind_all_interfaces] Possible binding to all interfaces.
   Severity: Medium   Confidence: Medium
   Location: distributed-android-testing/docker/gitlab_webhooks/app.py:23
   More Info: https://bandit.readthedocs.io/en/latest/plugins/b104_hardcoded_bind_all_interfaces.html
22      PORT = 9898
23      ADDRESS = "0.0.0.0"
24      STREAM = True

Plan

Our plan is to run these tools and make a decision as to if we should install the package or not based on their reports.

The first step will be to write an Operation which wraps each tool.

An Operation is similar to a function signature, it consists of a name, inputs, and outputs. The op decorator is a shorthand way of creating an Operation from a function. An Operation is analogous to a function prototype in C.

Creating our Package

Create a new package using the create script.

$ dffml service dev create operations shouldi
$ cd shouldi

Note

All the code for this example is located under the examples/shouldi directory of the DFFML source code.

Remove the example files as we won’t be needing them

$ rm shouldi/operations.py shouldi/definitions.py tests/test_operations.py

Installing Static Analysis Tools

The tools we’ll be using are bandit and safety. We’ll also need to make http requests so let’s install aiohttp too.

Some people are familiar with the requirements.txt file used to declare dependences. For packages, we can declare our dependencies right in our setup.py file.

setup.py

common.KWARGS["install_requires"] += [
    "aiohttp>=3.5.4",
    "bandit>=1.6.2",
    "safety>=1.8.5",
]

Note

These versions will change over time, you should always check PyPi to find the latest version and use that version.

Now install the newly created package in development mode.

$ python3.7 -m pip install -e .

Safety Operation

To get parsable output, we’ll run safety with the --json flag.

$ echo insecure-package==0.1.0 | safety check --stdin --json
[
    [
        "insecure-package",
        "<0.2.0",
        "0.1.0",
        "This is an insecure package with lots of exploitable security vulnerabilities.",
        "25853"
    ]
]

Let’s now write the operation to call safety via a subprocess.

shouldi/safety.py

import sys
import json
import asyncio
from typing import Dict, Any

from dffml.df.base import op
from dffml.df.types import Definition

package = Definition(name="package", primitive="str")
package_version = Definition(name="package_version", primitive="str")
safety_check_number_of_issues = Definition(
    name="safety_check_number_of_issues", primitive="int"
)


@op(
    name="safety_check",
    inputs={"package": package, "version": package_version},
    outputs={"issues": safety_check_number_of_issues},
    conditions=[],
)
async def safety_check(package: str, version: str) -> Dict[str, Any]:
    pinned = f"{package}=={version}"

    proc = await asyncio.create_subprocess_exec(
        sys.executable,
        "-m",
        "safety",
        "check",
        "--stdin",
        "--json",
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout, _stderr = await proc.communicate(pinned.encode() + b"\n")

    issues = json.loads(stdout)

    return {"issues": len(issues)}

Write a test for it

tests/test_safety.py

from dffml.util.asynctestcase import AsyncTestCase

from shouldi.safety import safety_check


class TestSafetyCheck(AsyncTestCase):
    async def test_run(self):
        results = await safety_check("insecure-package", "0.1.0")
        self.assertEqual(results["issues"], 1)

Run the tests

$ python3.7 setup.py test -s tests.test_safety

Bandit Operation

To get parsable output, we’ll run with the -f json flag.

$ bandit -r -f json distributed-android-testing/
{
  "metrics": {
    "_totals": {
      "CONFIDENCE.HIGH": 9.0,
      "CONFIDENCE.LOW": 0.0,
      "CONFIDENCE.MEDIUM": 3.0,
      "CONFIDENCE.UNDEFINED": 0.0,
      "SEVERITY.HIGH": 0.0,
      "SEVERITY.LOW": 10.0,
      "SEVERITY.MEDIUM": 2.0,
      "SEVERITY.UNDEFINED": 0.0,
      "loc": 5658,
      "nosec": 0
    }
  },
  "results": [
    {
      "code": "19         \"chmod 700 /tmp/docker_install.sh\",\n20         \"/tmp/docker_install.sh\",\n21         \"usermod -aG docker ${USER}\",\n22         \"service docker restart\"\n23     ]\n24     kwargs[\"sudo\"] = True\n25     ssh.run_all(command, **kwargs)\n",
      "filename": "distributed-android-testing/docker/docker.py",
      "issue_confidence": "MEDIUM",
      "issue_severity": "MEDIUM",
      "issue_text": "Probable insecure usage of temp file/directory.",
      "line_number": 20,
      "line_range": [
        18,
        19,
        20,
        21,
        22
      ],
      "more_info": "https://bandit.readthedocs.io/en/latest/plugins/b108_hardcoded_tmp_directory.html",
      "test_id": "B108",
      "test_name": "hardcoded_tmp_directory"
    },
    {
      "code": "22 PORT = 9898\n23 ADDRESS = \"0.0.0.0\"\n24 STREAM = True\n",
      "filename": "distributed-android-testing/docker/gitlab_webhooks/app.py",
      "issue_confidence": "MEDIUM",
      "issue_severity": "MEDIUM",
      "issue_text": "Possible binding to all interfaces.",
      "line_number": 23,
      "line_range": [
        23
      ],
      "more_info": "https://bandit.readthedocs.io/en/latest/plugins/b104_hardcoded_bind_all_interfaces.html",
      "test_id": "B104",
      "test_name": "hardcoded_bind_all_interfaces"
    }
  ]
}

Let’s now write the operation to call safety via a subprocess.

shouldi/bandit.py

import sys
import json
import asyncio
from typing import Dict, Any

from dffml.df.base import op
from dffml.df.types import Definition

package_src_dir = Definition(name="package_src_dir", primitive="str")
bandit_output = Definition(name="bandit_output", primitive="Dict[str, Any]")


@op(inputs={"pkg": package_src_dir}, outputs={"report": bandit_output})
async def run_bandit(pkg: str) -> Dict[str, Any]:
    """
    CLI usage: dffml service dev run -log debug shouldi.bandit:run_bandit -pkg .
    """
    proc = await asyncio.create_subprocess_exec(
        sys.executable,
        "-m",
        "bandit",
        "-r",
        "-f",
        "json",
        pkg,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    stdout, _stderr = await proc.communicate()
    if len(stdout) == 0:
        raise Exception
    bandit_op = stdout.decode()
    bandit_op = json.loads(bandit_op)
    t_results = bandit_op["results"]
    high_sev_high_conf = 0
    for item in t_results:
        if (
            item["issue_confidence"] == "HIGH"
            and item["issue_severity"] == "HIGH"
        ):
            high_sev_high_conf += 1
    final_result = bandit_op["metrics"]["_totals"]
    final_result["CONFIDENCE.HIGH_AND_SEVERITY.HIGH"] = high_sev_high_conf
    return {"report": final_result}

Write a test for it

tests/test_bandit.py

import os

from dffml.util.asynctestcase import AsyncTestCase

from shouldi.bandit import run_bandit


class TestRunBanditOp(AsyncTestCase):
    async def test_run(self):
        results = await run_bandit(os.getcwd())
        self.assertEqual(
            type(results["report"]["CONFIDENCE.HIGH_AND_SEVERITY.HIGH"]), int
        )

Run the tests

$ python3.7 setup.py test -s tests.test_bandit

What’s the Data Flow?

So far shouldi uses two tools.

  • bandit

    • Which runs checks on the source code of a package to look for things like SQL injections

  • safety

    • Which checks if there are any open CVEs in a package

We’re only planning on providing our tool with the package name. So we’ll need to find the package version to run safety, and download the source code of the package to run bandit.

This is the directed graph that defines the dataflow of operations that make up shouldi it shows us how all the operations we talked about above are connected using other opertions which grabbed the package version and source code from PyPi.

Diagram showing DataFlow for processing stage

The DataFlow above describes the following process:

  • In the processing stage we run all our data collection operations

    • Our input is the package name

      • This will be given to us on the command line

    • Access the PyPi API and get the JSON describing the package information

    • Concurrently

      • Extract the version from the package information

        • Run safety using the version and the package name

      • Extract the URL of the latest release from the package information

        • Use the URL to download and extract the package source to a directory

          • Run bandit using the package source directory

  • In the cleanup stage we release resources created in the processing stage

    • Remove the package source directory

  • In the output stage we run operations which select data generated in the processing stage and use that selected data as the output of the dataflow.

    • Run the get_single operation which selects data matching the definitions we care about.

PyPi Operations

Let’s write an operation to grab the JSON information about a package.

shouldi/pypi.py

import shutil
import tempfile
from typing import Dict, Any

import aiohttp

from dffml.df.base import op
from dffml.df.types import Definition, Stage

from .safety import package, package_version
from .bandit import package_src_dir

package_json = Definition(name="package_json", primitive="Dict[str, Any]")
package_url = Definition(name="package_url", primitive="str")


@op(
    inputs={"package": package},
    outputs={"response_json": package_json},
    # imp_enter allows us to create instances of objects which are async context
    # managers and assign them to self.parent which is an object of type
    # OperationImplementation which will be alive for the lifetime of the
    # Orchestrator which runs all these operations.
    imp_enter={
        "session": (lambda self: aiohttp.ClientSession(trust_env=True))
    },
)
async def pypi_package_json(self, package: str) -> Dict[str, Any]:
    """
    Download the information on the package in JSON format.
    """
    url = f"https://pypi.org/pypi/{package}/json"
    async with self.parent.session.get(url) as resp:  # skipcq: BAN-B310
        package_json = await resp.json()
        return {"response_json": package_json}

After we have the package information, we extract the version and URL where we can get the source code.

shouldi/pypi.py

@op(
    inputs={"response_json": package_json},
    outputs={"version": package_version},
)
async def pypi_latest_package_version(response_json: Dict[str, Any]) -> str:
    """
    Grab the version from the package information.
    """
    return {"version": response_json["info"]["version"]}


@op(inputs={"response_json": package_json}, outputs={"url": package_url})
async def pypi_package_url(response_json: Dict["str", Any]) -> str:
    """
    Grab the URL of the latest source code release from the package information.
    """
    url_dicts = response_json["urls"]
    for url_dict in url_dicts:
        if (
            url_dict["python_version"] == "source"
            and url_dict["packagetype"] == "sdist"
        ):
            return {"url": url_dict["url"]}

Once we have the URL, we download the package source and extract it to a temporary directory.

shouldi/pypi.py

@op(
    inputs={"url": package_url},
    outputs={"directory": package_src_dir},
    imp_enter={
        "session": (lambda self: aiohttp.ClientSession(trust_env=True))
    },
)
async def pypi_package_contents(self, url: str) -> str:
    """
    Download a source code release and extract it to a temporary directory.
    """
    package_src_dir = tempfile.mkdtemp(prefix="pypi-")
    async with self.parent.session.get(url) as resp:
        # Create a temporary file to extract to
        with tempfile.NamedTemporaryFile(
            prefix="pypi-", suffix=".tar.gz"
        ) as package_src_file:
            package_src_file.write(await resp.read())
            shutil.unpack_archive(package_src_file.name, package_src_dir)
            return {"directory": package_src_dir}

Finally, we make a cleanup operation to remove the directory once we’re done with it.

shouldi/pypi.py

@op(inputs={"directory": package_src_dir}, outputs={}, stage=Stage.CLEANUP)
async def cleanup_pypi_package(directory: str):
    """
    Remove the directory containing the source code release.
    """
    shutil.rmtree(directory)

Now we write tests for each operation.

tests/test_pypi.py

import os
import shutil
import tempfile

from dffml.util.asynctestcase import AsyncTestCase

from shouldi.pypi import pypi_package_json
from shouldi.pypi import pypi_latest_package_version
from shouldi.pypi import pypi_package_url
from shouldi.pypi import pypi_package_contents
from shouldi.pypi import cleanup_pypi_package


class TestPyPiOperations(AsyncTestCase):
    PACKAGE = {"name": "insecure-package"}
    INT_RESULT_JSON = {}

    async def test_000_package_json(self):
        # Call the .test method created by the @op decorator. This sets up the
        # aiohttp.ClientSession object.
        results = await pypi_package_json.test(package=self.PACKAGE["name"])
        self.assertIs(type(results["response_json"]), dict)
        self.INT_RESULT_JSON.update(results["response_json"])

    async def test_001_package_version(self):
        results = await pypi_latest_package_version(self.INT_RESULT_JSON)
        self.assertEqual(results["version"], "0.1.0")

    async def test_002_package_url(self):
        results = await pypi_package_url(self.INT_RESULT_JSON)
        self.assertIn("insecure-package-0.1.0.tar.gz", results["url"])
        self.PACKAGE.update(results)

    async def test_003_package_contents(self):
        try:
            results = await pypi_package_contents.test(url=self.PACKAGE["url"])
            no_files = os.listdir(results["directory"])
            self.assertGreater(len(no_files), 0)
        finally:
            shutil.rmtree(results["directory"])

    async def test_004_cleanup_package(self):
        tempdir = tempfile.mkdtemp()
        await cleanup_pypi_package(tempdir)
        self.assertFalse(os.path.isdir(tempdir))

Run the tests

$ python3.7 setup.py test -s tests.test_pypi

CLI

Writing the CLI is as simple as importing our operations and having the memory orchestrator run them. DFFML also provides a quick and dirty CLI abstraction based on argparse which will speed things up.

shouldi/cli.py

# Command line interface helpers
from dffml.util.cli.cmd import CMD
from dffml.util.cli.arg import Arg

# DataFlow specific classes
from dffml.df.types import DataFlow, Input
from dffml.df.memory import MemoryOrchestrator

# The GetSingle operation will grab the data we want from the ouputs of our
# operations and present it as the result
from dffml.operation.output import GetSingle

# Import all the operations we wrote
from shouldi.bandit import run_bandit
from shouldi.pypi import pypi_latest_package_version
from shouldi.pypi import pypi_package_json
from shouldi.pypi import pypi_package_url
from shouldi.pypi import pypi_package_contents
from shouldi.pypi import cleanup_pypi_package
from shouldi.safety import safety_check

# Link inputs and outputs together according to their definitions
DATAFLOW = DataFlow.auto(
    pypi_package_json,
    pypi_latest_package_version,
    pypi_package_url,
    pypi_package_contents,
    cleanup_pypi_package,
    safety_check,
    run_bandit,
    GetSingle,
)
# Seed inputs are added to each executing context. The following Input tells the
# GetSingle output operation that we want the output of the network to include
# data matching the "issues" output of the safety_check operation, and the
# "report" output of the run_bandit operation, for each context.
DATAFLOW.seed.append(
    Input(
        value=[
            safety_check.op.outputs["issues"].name,
            run_bandit.op.outputs["report"].name,
        ],
        definition=GetSingle.op.inputs["spec"],
    )
)


class Install(CMD):

    arg_packages = Arg(
        "packages", nargs="+", help="Package to check if we should install"
    )

    async def run(self):
        # Create an Orchestrator which will manage the running of our operations
        async with MemoryOrchestrator.withconfig({}) as orchestrator:
            # Create a orchestrator context, everything in DFFML follows this
            # one-two context entry pattern
            async with orchestrator(DATAFLOW) as octx:
                # Run all the operations, Each iteration of this loop happens
                # when all inputs are exhausted for a context, the output
                # operations are then run and their results are yielded
                async for package_name, results in octx.run(
                    {
                        # For each package add a new input set to the input network
                        # The context operations execute under is the package name
                        # to evaluate. Contexts ensure that data pertaining to
                        # package A doesn't mingle with data pertaining to package B
                        package_name: [
                            # The only input to the operations is the package name.
                            Input(
                                value=package_name,
                                definition=pypi_package_json.op.inputs[
                                    "package"
                                ],
                            )
                        ]
                        for package_name in self.packages
                    }
                ):
                    # Grab the number of safety issues and the bandit report
                    # from the results dict
                    safety_issues = results[
                        safety_check.op.outputs["issues"].name
                    ]
                    bandit_report = results[
                        run_bandit.op.outputs["report"].name
                    ]
                    # Decide if those numbers mean we should stop ship or not
                    if (
                        safety_issues > 0
                        or bandit_report["CONFIDENCE.HIGH_AND_SEVERITY.HIGH"]
                        > 5
                    ):
                        print(f"Do not install {package_name}!")
                        for definition_name, result in results.items():
                            print(f"    {definition_name}: {result}")
                    else:
                        print(f"{package_name} is okay to install")


class ShouldI(CMD):

    install = Install

Let’s test out the code in shouldi.cli before making it accessible via the command line.

tests/test_cli.py

import io
from unittest.mock import patch

from dffml.util.asynctestcase import AsyncTestCase

from shouldi.cli import ShouldI


class TestCLI(AsyncTestCase):
    async def test_install(self):
        with patch("sys.stdout", new_callable=io.StringIO) as stdout:
            await ShouldI.install.cli("insecure-package", "shouldi")
            output = stdout.getvalue()
        self.assertIn("shouldi is okay to install", output)
        self.assertIn("Do not install insecure-package!", output)

Run the all the tests this time

$ python3.7 setup.py test

We want this to be usable as a command line utility, Python’s setuptools allows us to define console entry_points. All we have to do is tell setuptools what Python function we want it to call when a user runs a given command line application. The name of our CLI is shouldi and the function we want to run is main in the ShouldI class which is in the shouldi.cli module.

setup.py

common.KWARGS["entry_points"] = {
    "console_scripts": ["shouldi = shouldi.cli:ShouldI.main"],

Re-install the package via pip

$ python3.7 -m pip install -e .

Now we should be able to run our new tool via the CLI! (Provided your $PATH is set up correctly).

$ shouldi install dffml insecure-package
dffml is okay to install
Do not install insecure-package!
    safety_check_number_of_issues: 1
    bandit_output: {'CONFIDENCE.HIGH': 0.0, 'CONFIDENCE.LOW': 0.0, 'CONFIDENCE.MEDIUM': 0.0, 'CONFIDENCE.UNDEFINED': 0.0, 'SEVERITY.HIGH': 0.0, 'SEVERITY.LOW': 0.0, 'SEVERITY.MEDIUM': 0.0, 'SEVERITY.UNDEFINED': 0.0, 'loc': 100, 'nosec': 0, 'CONFIDENCE.HIGH_AND_SEVERITY.HIGH': 0}

Visualizing the DataFlow

DataFlows can be visualized using mermaidjs.

Note

Installing the dffml-config-yaml package will enable the -config yaml option. Allowing you to export to YAML instead of JSON. You can also convert between config file formats with the Convert command.

We first export the DataFlow to a config file on disk.

$ mkdir -p shouldi/deploy/df
$ dffml service dev export -config json shouldi.cli:DATAFLOW \
  > shouldi/deploy/df/shouldi.json

We then create the mermaidjs digarm from the DataFlow. The -simple flag says to only show connections between operations, don’t show which inputs and outputs are connected.

$ dffml dataflow diagram -simple shouldi/deploy/df/shouldi.json
graph TD
subgraph a759a07029077edc5c37fea0326fa281[Processing Stage]
style a759a07029077edc5c37fea0326fa281 fill:#afd388b5,stroke:#a4ca7a
a55c24c0d1363ec4d3c9e20883f3c740[pypi_latest_package_version]
d273c0a72c6acc57e33c2f7162fa7363[pypi_package_contents]
83503ba9fe6c0f5649644d26e59c5590[pypi_package_json]
00f7f4637f6f67120e83e75c78949806[pypi_package_url]
9220cb5f5732d9e6dcc130a4908ddf92[run_bandit]
88517e4cd0cae33deff50d987f2683fe[safety_check]
end
subgraph a4827add25f5c7d5895c5728b74e2beb[Cleanup Stage]
style a4827add25f5c7d5895c5728b74e2beb fill:#afd388b5,stroke:#a4ca7a
7ec0058800fd4bed6fb63633330588c7[cleanup_pypi_package]
end
subgraph 58ca4d24d2767176f196436c2890b926[Output Stage]
style 58ca4d24d2767176f196436c2890b926 fill:#afd388b5,stroke:#a4ca7a
b42e9e149e775202b18841f1f67061c4[get_single]
end
subgraph inputs[Inputs]
style inputs fill:#f6dbf9,stroke:#a178ca
d273c0a72c6acc57e33c2f7162fa7363 --> 7ec0058800fd4bed6fb63633330588c7
d60584024f765273b6f41d6d36f8320c(get_single_spec)
d60584024f765273b6f41d6d36f8320c --> b42e9e149e775202b18841f1f67061c4
83503ba9fe6c0f5649644d26e59c5590 --> a55c24c0d1363ec4d3c9e20883f3c740
00f7f4637f6f67120e83e75c78949806 --> d273c0a72c6acc57e33c2f7162fa7363
314b1a20a4db6b3bf3f2627830da97a3(package)
314b1a20a4db6b3bf3f2627830da97a3 --> 83503ba9fe6c0f5649644d26e59c5590
83503ba9fe6c0f5649644d26e59c5590 --> 00f7f4637f6f67120e83e75c78949806
d273c0a72c6acc57e33c2f7162fa7363 --> 9220cb5f5732d9e6dcc130a4908ddf92
314b1a20a4db6b3bf3f2627830da97a3(package)
314b1a20a4db6b3bf3f2627830da97a3 --> 88517e4cd0cae33deff50d987f2683fe
a55c24c0d1363ec4d3c9e20883f3c740 --> 88517e4cd0cae33deff50d987f2683fe
end

You can now copy that graph and paste it in the mermaidjs live editor:

It should render the following SVG showing how all the operations are connected.

Diagram showing DataFlow

GitLab will render mermaidjs diagrams found in markdown files. There is also a sphinx plugin, and a command line utility.

Registering Operations

In order to make our operations visible to other plugins and packages using DFFML, we need to register them with Python’s entry_points system.

setup.py

common.KWARGS["entry_points"] = {
    "console_scripts": ["shouldi = shouldi.cli:ShouldI.main"],
    "dffml.operation": [
        "run_bandit = shouldi.bandit:run_bandit",
        "safety_check = shouldi.safety:safety_check",
        "pypi_latest_package_version = shouldi.pypi:pypi_latest_package_version",
        "pypi_package_json = shouldi.pypi:pypi_package_json",
        "pypi_package_url = shouldi.pypi:pypi_package_url",
        "pypi_package_contents = shouldi.pypi:pypi_package_contents",
        "cleanup_pypi_package = shouldi.pypi:cleanup_pypi_package",
    ],
}

Re-install the package via pip to make registrations take effect.

$ python3.7 -m pip install -e .

After you’ve registered the operations, services such as the HTTP API will have access to your operations.

To make sure your operations were registered, you can use the development service’s entrypoints list command. You should see the get_single operation we used to get our output as comming from dffml. You’ll also see your own operations as coming from shouldi.

$ dffml service dev entrypoints list dffml.operation
associate = dffml.operation.output:Associate -> dffml 0.2.1 (/usr/local/lib/python3.7/dist-packages)
dffml.mapping.create = dffml.operation.mapping:create_mapping -> dffml 0.2.1 (/usr/local/lib/python3.7/dist-packages)
dffml.mapping.extract = dffml.operation.mapping:mapping_extract_value -> dffml 0.2.1 (/usr/local/lib/python3.7/dist-packages)
get_single = dffml.operation.output:GetSingle -> dffml 0.2.1 (/usr/local/lib/python3.7/dist-packages)
group_by = dffml.operation.output:GroupBy -> dffml 0.2.1 (/usr/local/lib/python3.7/dist-packages)
cleanup_pypi_package = shouldi.pypi:cleanup_pypi_package -> shouldi 0.0.1 (/home/user/shouldi)
pypi_latest_package_version = shouldi.pypi:pypi_latest_package_version -> shouldi 0.0.1 (/home/user/shouldi)
pypi_package_contents = shouldi.pypi:pypi_package_contents -> shouldi 0.0.1 (/home/user/shouldi)
pypi_package_json = shouldi.pypi:pypi_package_json -> shouldi 0.0.1 (/home/user/shouldi)
pypi_package_url = shouldi.pypi:pypi_package_url -> shouldi 0.0.1 (/home/user/shouldi)
run_bandit = shouldi.bandit:run_bandit -> shouldi 0.0.1 (/home/user/shouldi)
safety_check = shouldi.safety:safety_check -> shouldi 0.0.1 (/home/user/shouldi)

The DataFlow HTTP Deployment usage example will show you how to expose your new meta static analysis tool over an HTTP interface.

$ curl -s \
  --header "Content-Type: application/json" \
  --request POST \
  --data '{"insecure-package": [{"value":"insecure-package","definition":"package"}]}' \
  http://localhost:8080/shouldi | python -m json.tool
{
    "insecure-package": {
        "safety_check_number_of_issues": 1,
        "bandit_output": {
            "CONFIDENCE.HIGH": 0,
            "CONFIDENCE.LOW": 0,
            "CONFIDENCE.MEDIUM": 0,
            "CONFIDENCE.UNDEFINED": 0,
            "SEVERITY.HIGH": 0,
            "SEVERITY.LOW": 0,
            "SEVERITY.MEDIUM": 0,
            "SEVERITY.UNDEFINED": 0,
            "loc": 100,
            "nosec": 0,
            "CONFIDENCE.HIGH_AND_SEVERITY.HIGH": 0
        }
    }
}