Detecting Issues in Fairness by Generating Model Card from Tensorflow Estimators

In this notebook we will create a TFX pipeline to create a Proxy model for COMPAS (originally published by Tensorflow Authors). First, we will train a tf.estimator with defined eval_input_reciever_fn. This will allow us to run userdefined metrics with tensorflow-model-analysis on seralized tf.Example.

After this pipeline has be created, we will show how Intel’s ModelCardGen class can take this tf.estimator in the form of an SavedModel and TFRecord to create a Model Card with interactive graphics.

Install Dependencies

[ ]:
!python -m pip install --no-cache-dir --no-deps \
    docker==7.0.0 \
    keras-tuner==1.4.7 \
    kubernetes==29.0.0 \
    ml-metadata==1.14.0 \
    portpicker==1.6.0 \
    tensorflow-transform==1.14.0 \
    tfx==1.14.0
[ ]:
!mkdir -p compas/data/train compas/data/eval

Import Libraries

[ ]:
import os
import tempfile
import pandas as pd
from sklearn.model_selection import train_test_split

# Intel Model Card Genorator
from intel_ai_safety.model_card_gen.model_card_gen import ModelCardGen
from intel_ai_safety.model_card_gen.datasets import TensorflowDataset

Download and preprocess the dataset

The COMPAS dataset is a common case study in the ML fairness literature1, 2, 3, where it is use to apply techniques for identifying and remediating issues around fairness. ___

  1. Wadsworth, C., Vera, F., Piech, C. (2017). Achieving Fairness Through Adversarial Learning: an Application to Recidivism Prediction. https://arxiv.org/abs/1807.00199.

  2. Chouldechova, A., G’Sell, M., (2017). Fairer and more accurate, but for whom? https://arxiv.org/abs/1707.00046.

  3. Berk et al., (2017), Fairness in Criminal Justice Risk Assessments: The State of the Art, https://arxiv.org/abs/1703.09207.

[ ]:
# Download the COMPAS dataset and setup the required filepaths.
_DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')
_DATA_PATH = 'https://storage.googleapis.com/compas_dataset/cox-violent-parsed.csv'
_DATA_FILEPATH = os.path.join('compas', 'data')

_COMPAS_DF = pd.read_csv(_DATA_PATH)

# To simpliy the case study, we will only use the columns that will be used for
# our model.
_COLUMN_NAMES = [
  'age',
  'c_charge_desc',
  'c_charge_degree',
  'c_days_from_compas',
  'is_recid',            # ground truth
  'juv_fel_count',
  'juv_misd_count',
  'juv_other_count',
  'priors_count',
  'r_days_from_arrest',
  'race',
  'sex',
  'vr_charge_desc',
  'score_text',          # COMPAS predction
]

_GROUND_TRUTH = 'is_recid'
_COMPAS_SCORE = 'score_text'

_COMPAS_DF = _COMPAS_DF[_COLUMN_NAMES]

# We will use 'is_recid' as our ground truth lable, which is boolean value
# indicating if a defendant committed another crime. There are some rows with -1
# indicating that there is no data. These rows we will drop from training.
_COMPAS_DF = _COMPAS_DF[_COMPAS_DF['is_recid'] != -1]
_COMPAS_DF = _COMPAS_DF.dropna(subset=['score_text'])
_COMPAS_DF['score_text'] = _COMPAS_DF.score_text.map({'Low': 0, 'High': 1, 'Medium': 1})
# is_recid field is ground truth to create a COMPAS proxy we will need to train on score_text
# _COMPAS_DF = _COMPAS_DF.rename(columns={'is_recid': 'ground_truth', 'score_text': 'compas_score'})

# Given the distribution between races in this dataset we will only focuse on
# recidivism for African-Americans and Caucasians.
_COMPAS_DF = _COMPAS_DF[
  _COMPAS_DF['race'].isin(['African-American', 'Caucasian'])]

X  = _COMPAS_DF[_COLUMN_NAMES]
# to create a COMPAS proxy we will need to train on score_text not to be confused with ground truth is_recid field
# y = _COMPAS_DF[[_COMPAS_SCORE]]

X_train, X_test = train_test_split(X, test_size=0.33, random_state=42)

# Load the DataFrame back to a CSV file for our TFX model.
X_train.to_csv(os.path.join(_DATA_FILEPATH, 'train', 'train.csv'), index=False, na_rep='')
X_test.to_csv(os.path.join(_DATA_FILEPATH, 'eval', 'eval.csv'), index=False, na_rep='')

TFX Pipeline Scripts

We opt to create a custom pipeline script so that we can transform data and train a model saved as artifacts to use in as input in Model Card Generator.

[ ]:
_transformer_path = os.path.join('compas', 'transformer.py')
[ ]:
%%writefile {_transformer_path}
import tensorflow as tf
import tensorflow_transform as tft

CATEGORICAL_FEATURE_KEYS = [
    'sex',
    'race',
    'c_charge_desc',
    'c_charge_degree',
]

INT_FEATURE_KEYS = [
    'age',
    'c_days_from_compas',
    'juv_fel_count',
    'juv_misd_count',
    'juv_other_count',
    'priors_count',
]

LABEL_KEY = 'is_recid'

# List of the unique values for the items within CATEGORICAL_FEATURE_KEYS.
MAX_CATEGORICAL_FEATURE_VALUES = [
    2,
    6,
    513,
    14,
]


def transformed_name(key):
  return '{}_xf'.format(key)


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: Map from feature keys to raw features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        vocab_filename=key)

  for key in INT_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  # Target label will be to see if the defendant is charged for another crime.
  outputs[transformed_name(LABEL_KEY)] = _fill_in_missing(inputs[LABEL_KEY])
  return outputs


def _fill_in_missing(tensor_value):
  """Replaces a missing values in a SparseTensor.

  Fills in missing values of `tensor_value` with '' or 0, and converts to a
  dense tensor.

  Args:
    tensor_value: A `SparseTensor` of rank 2. Its dense shape should have size
      at most 1 in the second dimension.

  Returns:
    A rank 1 tensor where missing values of `tensor_value` are filled in.
  """
  if not isinstance(tensor_value, tf.sparse.SparseTensor):
    return tensor_value
  default_value = '' if tensor_value.dtype == tf.string else 0
  sparse_tensor = tf.SparseTensor(
      tensor_value.indices,
      tensor_value.values,
      [tensor_value.dense_shape[0], 1])
  dense_tensor = tf.sparse.to_dense(sparse_tensor, default_value)
  return tf.squeeze(dense_tensor, axis=1)

[ ]:
_trainer_path = os.path.join('compas', 'trainer.py')
[ ]:
%%writefile {_trainer_path}

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from transformer import *

_BATCH_SIZE = 1000
_LEARNING_RATE = 0.00001
_MAX_CHECKPOINTS = 1
_SAVE_CHECKPOINT_STEPS = 999


def transformed_names(keys):
  return [transformed_name(key) for key in keys]


def transformed_name(key):
  return '{}_xf'.format(key)


def _gzip_reader_fn(filenames):
  """Returns a record reader that can read gzip'ed files.

  Args:
    filenames: A tf.string tensor or tf.data.Dataset containing one or more
      filenames.

  Returns: A nested structure of tf.TypeSpec objects matching the structure of
    an element of this dataset and specifying the type of individual components.
  """
  return tf.data.TFRecordDataset(filenames, compression_type='GZIP')


# Tf.Transform considers these features as "raw".
def _get_raw_feature_spec(schema):
  """Generates a feature spec from a Schema proto.

  Args:
    schema: A Schema proto.

  Returns:
    A feature spec defined as a dict whose keys are feature names and values are
      instances of FixedLenFeature, VarLenFeature or SparseFeature.
  """
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _example_serving_receiver_fn(tf_transform_output, schema):
  """Builds the serving in inputs.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    TensorFlow graph which parses examples, applying tf-transform to them.
  """
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_feature_spec.pop(LABEL_KEY)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(
      serving_input_receiver.features)
  transformed_features.pop(transformed_name(LABEL_KEY))
  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(tf_transform_output, schema):
  """Builds everything needed for the tf-model-analysis to run the model.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    EvalInputReceiver function, which contains:
      - TensorFlow graph which parses raw untransformed features, applies the
          tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  """
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)

  serialized_tf_example = tf.compat.v1.placeholder(
      dtype=tf.string, shape=[None], name='input_example_tensor')

  # Add a parse_example operator to the tensorflow graph, which will parse
  # raw, untransformed, tf examples.
  features = tf.io.parse_example(
      serialized=serialized_tf_example, features=raw_feature_spec)

  transformed_features = tf_transform_output.transform_raw_features(features)
  labels = transformed_features.pop(transformed_name(LABEL_KEY))

  receiver_tensors = {'examples': serialized_tf_example}

  return tfma.export.EvalInputReceiver(
      features=transformed_features,
      receiver_tensors=receiver_tensors,
      labels=labels)


def _input_fn(filenames, tf_transform_output, batch_size=200):
  """Generates features and labels for training or evaluation.

  Args:
    filenames: List of CSV files to read data from.
    tf_transform_output: A TFTransformOutput.
    batch_size: First dimension size of the Tensors returned by input_fn.

  Returns:
    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())

  dataset = tf.compat.v1.data.experimental.make_batched_features_dataset(
      filenames,
      batch_size,
      transformed_feature_spec,
      shuffle=False,
      reader=_gzip_reader_fn)

  transformed_features = dataset.make_one_shot_iterator().get_next()

  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(
      transformed_name(LABEL_KEY))


def _keras_model_builder():
  """Build a keras model for COMPAS dataset classification.

  Returns:
    A compiled Keras model.
  """
  feature_columns = []
  feature_layer_inputs = {}

  for key in transformed_names(INT_FEATURE_KEYS):
    feature_columns.append(tf.feature_column.numeric_column(key))
    feature_layer_inputs[key] = tf.keras.Input(shape=(1,), name=key)

  for key, num_buckets in zip(transformed_names(CATEGORICAL_FEATURE_KEYS),
                              MAX_CATEGORICAL_FEATURE_VALUES):
    feature_columns.append(
        tf.feature_column.indicator_column(
            tf.feature_column.categorical_column_with_identity(
                key, num_buckets=num_buckets)))
    feature_layer_inputs[key] = tf.keras.Input(
        shape=(1,), name=key, dtype=tf.dtypes.int32)

  feature_columns_input = tf.keras.layers.DenseFeatures(feature_columns)
  feature_layer_outputs = feature_columns_input(feature_layer_inputs)

  dense_layers = tf.keras.layers.Dense(
      20, activation='relu', name='dense_1')(feature_layer_outputs)
  dense_layers = tf.keras.layers.Dense(
      10, activation='relu', name='dense_2')(dense_layers)
  output = tf.keras.layers.Dense(
      1, name='predictions')(dense_layers)

  model = tf.keras.Model(
      inputs=[v for v in feature_layer_inputs.values()], outputs=output)

  model.compile(
      loss=tf.keras.losses.MeanAbsoluteError(),
      optimizer=tf.optimizers.Adam(learning_rate=_LEARNING_RATE))

  return model


# TFX will call this function.
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.

  Args:
    hparams: Hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.

  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(
      hparams.train_files,
      tf_transform_output,
      batch_size=_BATCH_SIZE)

  eval_input_fn = lambda: _input_fn(
      hparams.eval_files,
      tf_transform_output,
      batch_size=_BATCH_SIZE)

  train_spec = tf.estimator.TrainSpec(
      train_input_fn,
      max_steps=hparams.train_steps)

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_output, schema)

  exporter = tf.estimator.FinalExporter('compas', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='compas-eval')

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=_SAVE_CHECKPOINT_STEPS,
      keep_checkpoint_max=_MAX_CHECKPOINTS)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = tf.keras.estimator.model_to_estimator(
      keras_model=_keras_model_builder(), config=run_config)

  # Create an input receiver for TFMA processing.
  receiver_fn = lambda: _eval_input_receiver_fn(tf_transform_output, schema)

  return {
      'estimator': estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn
  }
[ ]:
_pipelie_path = os.path.join('compas', 'pipeline.py')
[ ]:
%%writefile {_pipelie_path}

from typing import Optional
import os

import absl
import tensorflow_model_analysis as tfma
from tfx import v1 as tfx
from tfx.components import (CsvExampleGen,
                            Evaluator,
                            Pusher,
                            SchemaGen,
                            StatisticsGen,
                            Trainer,
                            Transform)

from tfx.components.trainer.executor import Executor
from tfx.dsl.components.base import executor_spec

from tfx.orchestration import pipeline
from tfx.orchestration import metadata
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.proto import example_gen_pb2
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

_pipeline_name = 'compas'
_compas_root = os.path.join('.', 'compas')
_data_path = os.path.join(_compas_root, 'data')
# Python module file to inject customized logic into the TFX components. The
# Transform and Trainer both require user-defined functions to run successfully.
_transformer_file = os.path.join(_compas_root, 'transformer.py')
_trainer_file = os.path.join(_compas_root, 'trainer.py')
# Path which can be listened to by the model server.  Pusher will output the
# trained model here.
_serving_model_dir = os.path.join(_compas_root, 'serving_model', _pipeline_name)

# Directory and data locations.  This example assumes all of the chicago taxi
# example code and metadata library is relative to $HOME, but you can store
# these files anywhere on your local filesystem.
_tfx_root = os.path.join('compas', 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
# Sqlite ML-metadata db path.
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name,
                              'metadata.db')

def create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_path: str,
    preprocessing_module_file: str,
    trainer_module_file: str,
    train_args: tfx.proto.TrainArgs,
    eval_args: tfx.proto.EvalArgs,
    serving_model_dir: str,
    metadata_path: str,
    schema_path: Optional[str] = None,
) -> tfx.dsl.Pipeline:
  """Implements the compass pipeline with TFX."""

  # Brings data into the pipeline or otherwise joins/converts training data.

  input = tfx.proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
  example_gen = CsvExampleGen(input_base=data_path, input_config=input)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(
      examples=example_gen.outputs['examples'])

  if schema_path is None:
    # Generates schema based on statistics files.
    schema_gen = SchemaGen(
        statistics=statistics_gen.outputs['statistics'])
  else:
    # Import user provided schema into the pipeline.
    schema_gen = tfx.components.ImportSchemaGen(schema_file=schema_path)


  # Performs transformations and feature engineering in training and serving.
  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=os.path.abspath(preprocessing_module_file))

  # Uses user-provided Python function that implements a model.
  trainer_args = {
      'module_file': trainer_module_file,
      'examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'custom_executor_spec' : executor_spec.ExecutorClassSpec(Executor),
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
  }
  trainer = Trainer(**trainer_args)

  # Uses TFMA to compute a evaluation statistics over features of a model and
  # perform quality validation of a candidate model (compared to a baseline).
  eval_config = tfma.EvalConfig(
      model_specs=[
          tfma.ModelSpec(
              label_key='is_recid')
      ],
      slicing_specs=[
          tfma.SlicingSpec(
              feature_keys=['race'])
      ],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.MetricConfig(
                  class_name='BinaryAccuracy'),
              tfma.MetricConfig(
                  class_name='AUC'),
              tfma.MetricConfig(
                  class_name='FairnessIndicators',
                  config='{"thresholds": [0.25, 0.5, 0.75]}')

          ])
      ])
  evaluator = Evaluator(examples=example_gen.outputs['examples'],
                        model=trainer.outputs['model'],
                        eval_config=eval_config)

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          transform,
          trainer,
          evaluator,
      ],
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path)
  )

if __name__ == '__main__':
  absl.logging.set_verbosity(absl.logging.INFO)

  LocalDagRunner().run(
      create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_path=_data_path,
          preprocessing_module_file= _transformer_file,
          trainer_module_file=_trainer_file,
          serving_model_dir=_serving_model_dir,
          metadata_path=_metadata_path,
          train_args=trainer_pb2.TrainArgs(num_steps=10000),
          eval_args=trainer_pb2.EvalArgs(num_steps=5000))
  )
[ ]:
!python {_pipelie_path}
[ ]:
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = './compas/tfx/metadata/compas/metadata.db'
connection_config.sqlite.connection_mode = 3 # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)
[ ]:
data = store.get_artifacts_by_type("Examples")[0].uri
evaluator = store.get_artifacts_by_type("ModelEvaluation")[-1].uri
model = store.get_artifacts_by_type("Model")[-1].uri
[ ]:
_model_path = os.path.join(model, 'Format-Serving')
_data_paths = {'eval': TensorflowDataset(dataset_path=os.path.join(data, 'Split-eval', '*.gz')),
               'train': TensorflowDataset(dataset_path=os.path.join(data, 'Split-train', '*.gz'))}
[ ]:
_project_path = os.path.join('.', 'compas')
[ ]:
_eval_config = os.path.join(_project_path, 'eval_config.proto')
[ ]:
%%writefile {_eval_config}

model_specs {
    label_key: 'is_recid'
  }
metrics_specs {
    metrics {class_name: "BinaryAccuracy"}
    metrics {class_name: "AUC"}
    metrics {class_name: "ConfusionMatrixPlot"}
    metrics {
      class_name: "FairnessIndicators"
      config: '{"thresholds": [0.25, 0.5, 0.75]}'
    }
  }
slicing_specs {}
slicing_specs {
        feature_keys: 'race'
  }
options {
    include_default_metrics { value: false }
  }
[ ]:
overview = ("COMPAS (Correctional Offender Management Profiling for Alternative Sanctions)"
" is a public dataset, which contains approximately 18,000 criminal cases from "
"Broward County, Florida between January, 2013 and December, 2014. The data contains"
" information about 11,000 unique defendants, including criminal history demographics,"
" and a risk score intended to represent the defendant’s likelihood of reoffending"
" (recidivism). A machine learning model trained on this data has been used by judges"
" and parole officers to determine whether or not to set bail and whether or not to"
" grant parole."

"In 2016, an article published in ProPublica found that the COMPAS model was incorrectly"
" predicting that African-American defendants would recidivate at much higher rates than"
" their white counterparts while Caucasian would not recidivate at a much higher rate. "
"For Caucasian defendants, the model made mistakes in the opposite direction, making incorrect predictions "
"that they wouldn’t commit another crime. The authors went on to show that these biases were likely due to "
"an uneven distribution in the data between African-Americans and Caucasian defendants. Specifically, the "
"ground truth label of a negative example (a defendant would not commit another crime) and a positive example "
"(defendant would commit another crime) were disproportionate between the two races. "
"Since 2016, the COMPAS dataset has appeared frequently in the ML fairness literature "
"1, 2, 3, with researchers using it to demonstrate techniques for identifying and remediating "
"fairness concerns."

"It is important to note that developing a machine learning model to predict pre-trial detention "
"has a number of important ethical considerations. You can learn more about these issues in the "
"Partnership on AI Report on Algorithmic Risk Assessment Tools in the U.S. Criminal Justice System."
" The Partnership on AI is a multi-stakeholder organization -- of which Google is a member -- that "
"creates guidelines around AI.")
[ ]:
mc = {
  "model_details": {
    "name": "COMPAS (Correctional Offender Management Profiling for Alternative Sanctions)",
    "overview": overview,
    "owners": [
      {
        "name": "Intel XAI Team",
        "contact": "xai@intel.com"
      }
    ],
    "references": [
      {
        "reference": "Wadsworth, C., Vera, F., Piech, C. (2017). Achieving Fairness Through Adversarial Learning: an Application to Recidivism Prediction. https://arxiv.org/abs/1807.00199."
      },
      {
        "reference": "Chouldechova, A., G'Sell, M., (2017). Fairer and more accurate, but for whom? https://arxiv.org/abs/1707.00046."
      },
      {
        "reference": "Berk et al., (2017), Fairness in Criminal Justice Risk Assessments: The State of the Art, https://arxiv.org/abs/1703.09207."
      }
    ],
    "graphics": {
      "description": " "
    }
  },
  "quantitative_analysis": {
    "graphics": {
      "description": " "
    }
  },
  "schema_version": "0.0.1"
}
[ ]:
mcg = ModelCardGen.generate(data_sets=_data_paths,
                            eval_config=_eval_config,
                            model_path=_model_path,
                            model_card=mc)

Display Model Card

[ ]:
mcg
[ ]:
mcg.export_html('compas_plotly.html')