# Source code for alibi.explainers.backends.cfrl_tabular

```
"""
This module contains utility functions for the Counterfactual with Reinforcement Learning tabular class,
:py:class:`alibi.explainers.cfrl_tabular`, that are common for both Tensorflow and Pytorch backends.
"""
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from scipy.special import softmax
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
if TYPE_CHECKING:
import tensorflow as tf
import torch
[docs]
def get_conditional_dim(feature_names: List[str], category_map: Dict[int, List[str]]) -> int:
"""
Computes the dimension of the conditional vector.
Parameters
----------
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
Returns
-------
Dimension of the conditional vector
"""
cat_feat = int(np.sum([len(vals) for vals in category_map.values()]))
num_feat = len(feature_names) - len(category_map)
return 2 * num_feat + cat_feat
[docs]
def split_ohe(X_ohe: 'Union[np.ndarray, torch.Tensor, tf.Tensor]',
category_map: Dict[int, List[str]]) -> Tuple[List, List]:
"""
Splits a one-hot encoding array in a list of numerical heads and a list of categorical heads. Since by
convention the numerical heads are merged in a single head, if the function returns a list of numerical heads,
then the size of the list is 1.
Parameters
----------
X_ohe
One-hot encoding representation. This can be any type of tensor: `np.ndarray`, `torch.Tensor`, `tf.Tensor`.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible values of a feature.
Returns
-------
X_ohe_num_split
List of numerical heads. If different than ``None``, the list's size is 1.
X_ohe_cat_split
List of categorical one-hot encoded heads.
"""
assert hasattr(X_ohe, "shape"), "X_ohe needs to have `shape` attribute."
X_ohe_num_split, X_ohe_cat_split = [], []
offset = 0
# Compute the number of columns spanned by the categorical one-hot encoded heads, and the number of columns
# spanned by the numerical heads.
cat_feat = int(np.sum([len(vals) for vals in category_map.values()]))
num_feat = X_ohe.shape[1] - cat_feat
# If the number of numerical features is different than 0, then the first `num_feat` columns correspond
# to the numerical features
if num_feat > 0:
X_ohe_num_split.append(X_ohe[:, :num_feat])
offset = num_feat
# If there exist categorical features, then extract them one by one
if cat_feat > 0:
for id in sorted(category_map.keys()):
X_ohe_cat_split.append(X_ohe[:, offset:offset + len(category_map[id])])
offset += len(category_map[id])
return X_ohe_num_split, X_ohe_cat_split
[docs]
def generate_numerical_condition(X_ohe: np.ndarray,
feature_names: List[str],
category_map: Dict[int, List[str]],
ranges: Dict[str, List[float]],
immutable_features: List[str],
conditional: bool = True) -> np.ndarray:
"""
Generates numerical features conditional vector. For numerical features with a minimum value `a_min` and a
maximum value `a_max`, we include in the conditional vector the values `-p_min`, `p_max`, where `p_min, p_max`
are in [0, 1]. The range `[-p_min, p_max]` encodes a shift and scale-invariant representation of the interval
`[a - p_min * (a_max - a_min), a + p_max * (a_max - a_min)], where `a` is the original feature value. During
training, `p_min` and `p_max` are sampled from `Beta(2, 2)` for each unconstrained feature. Immutable features
can be encoded by `p_min = p_max = 0` or listed in `immutable_features` list. Features allowed to increase or
decrease only correspond to setting `p_min = 0` or `p_max = 0`, respectively. For example, allowing the ``'Age'``
feature to increase by up to 5 years is encoded by taking `p_min = 0`, `p_max=0.1`, assuming the minimum age of
10 and the maximum age of 60 years in the training set: `5 = 0.1 * (60 - 10)`.
Parameters
----------
X_ohe
One-hot encoding representation of the element(s) for which the conditional vector will be generated.
This argument is used to extract the number of conditional vector. The choice of `X_ohe` instead of a
`size` argument is for consistency purposes with `categorical_cond` function.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values.
ranges:
Dictionary of ranges for numerical features. Each value is a list containing two elements, first one
negative and the second one positive.
immutable_features
Dictionary of immutable features. The keys are the column indexes and the values are booleans: ``True`` if
the feature is immutable, ``False`` otherwise.
conditional
Boolean flag to generate a conditional vector. If ``False`` the conditional vector does not impose any
restrictions on the feature value.
Returns
-------
Conditional vector for numerical features.
"""
num_cond = []
size = X_ohe.shape[0]
for feature_id, feature_name in enumerate(feature_names):
# skip categorical features
if feature_id in category_map:
continue
if feature_name in immutable_features:
# immutable feature
range_low, range_high = 0., 0.
else:
range_low = ranges[feature_name][0] if feature_name in ranges else -1.
range_high = ranges[feature_name][1] if feature_name in ranges else 1.
# Check if the ranges are valid.
if range_low > 0:
raise ValueError(f"Lower bound range for {feature_name} should be negative.")
if range_high < 0:
raise ValueError(f"Upper bound range for {feature_name} should be positive.")
# Generate lower and upper bound coefficients.
coeff_lower = np.random.beta(a=2, b=2, size=size).reshape(-1, 1) if conditional else np.ones((size, 1))
coeff_upper = np.random.beta(a=2, b=2, size=size).reshape(-1, 1) if conditional else np.ones((size, 1))
# Generate lower and upper bound conditionals.
num_cond.append(coeff_lower * range_low)
num_cond.append(coeff_upper * range_high)
# Construct numerical conditional vector by concatenating all numerical conditions.
return np.concatenate(num_cond, axis=1)
[docs]
def generate_categorical_condition(X_ohe: np.ndarray,
feature_names: List[str],
category_map: Dict[int, List],
immutable_features: List[str],
conditional: bool = True) -> np.ndarray:
"""
Generates categorical features conditional vector. For a categorical feature of cardinality `K`, we condition the
subset of allowed feature through a binary mask of dimension `K`. When training the counterfactual generator,
the mask values are sampled from `Bern(0.5)`. For immutable features, only the original input feature value is
set to one in the binary mask. For example, the immutability of the ``'marital_status'`` having the current
value ``'married'`` is encoded through the binary sequence [1, 0, 0], given an ordering of the possible feature
values `[married, unmarried, divorced]`.
Parameters
----------
X_ohe
One-hot encoding representation of the element(s) for which the conditional vector will be generated.
The elements are required since some features can be immutable. In that case, the mask vector is the
one-hot encoding itself for that particular feature.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values.
immutable_features
List of immutable features.
conditional
Boolean flag to generate a conditional vector. If ``False`` the conditional vector does not impose any
restrictions on the feature value.
Returns
-------
Conditional vector for categorical feature.
"""
C_cat = [] # define list of conditional vector for each feature
cat_idx = 0 # categorical feature index
# Split the one-hot representation into a list where each element corresponds to an feature.
_, X_ohe_cat_split = split_ohe(X_ohe, category_map)
# Create mask for each categorical column.
for feature_id, feature_name in enumerate(feature_names):
# Skip numerical features
if feature_id not in category_map:
continue
# Initialize mask with the original value
mask = X_ohe_cat_split[cat_idx].copy()
# If the feature is not immutable, add noise to modify the mask
if feature_name not in immutable_features:
mask += np.random.rand(*mask.shape) if conditional else np.ones_like(mask)
# Construct binary mask
mask = (mask > 0.5).astype(np.float32)
C_cat.append(mask)
# Move to the next categorical index
cat_idx += 1
return np.concatenate(C_cat, axis=1)
[docs]
def generate_condition(X_ohe: np.ndarray,
feature_names: List[str],
category_map: Dict[int, List[str]],
ranges: Dict[str, List[float]],
immutable_features: List[str],
conditional: bool = True) -> np.ndarray:
"""
Generates conditional vector.
Parameters
----------
X_ohe
One-hot encoding representation of the element(s) for which the conditional vector will be generated.
This method assumes that the input array, `X_ohe`, is has the first columns corresponding to the
numerical features, and the rest are one-hot encodings of the categorical columns. The numerical and the
categorical columns are ordered by the original column index( e.g., `numerical = (1, 4)`,
`categorical=(0, 2, 3)`).
feature_names
List of feature names.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values.
ranges
Dictionary of ranges for numerical features. Each value is a list containing two elements, first one
negative and the second one positive.
immutable_features
List of immutable map features.
conditional
Boolean flag to generate a conditional vector. If ``False`` the conditional vector does not impose any
restrictions on the feature value.
Returns
-------
Conditional vector.
"""
# Define conditional vector buffer
C = []
# Generate numerical condition vector.
if len(feature_names) > len(category_map):
C_num = generate_numerical_condition(X_ohe=X_ohe,
feature_names=feature_names,
category_map=category_map,
ranges=ranges,
immutable_features=immutable_features,
conditional=conditional)
C.append(C_num)
# Generate categorical condition vector.
if len(category_map):
C_cat = generate_categorical_condition(X_ohe=X_ohe,
feature_names=feature_names,
category_map=category_map,
immutable_features=immutable_features,
conditional=conditional)
C.append(C_cat)
# Concatenate numerical and categorical conditional vectors.
return np.concatenate(C, axis=1)
[docs]
def sample_numerical(X_hat_num_split: List[np.ndarray],
X_ohe_num_split: List[np.ndarray],
C_num_split: Optional[List[np.ndarray]],
stats: Dict[int, Dict[str, float]]) -> List[np.ndarray]:
"""
Samples numerical features according to the conditional vector. This method clips the values between the
desired ranges specified in the conditional vector, and ensures that the values are between the minimum and
the maximum values from train training datasets stored in the dictionary of statistics.
Parameters
----------
X_hat_num_split
List of reconstructed numerical heads from the auto-encoder. This list should contain a single element
as all the numerical features are part of a singe linear layer output.
X_ohe_num_split
List of original numerical heads. The list should contain a single element as part of the convention
mentioned in the description of `X_ohe_hat_num`.
C_num_split
List of conditional vector for numerical heads. The list should contain a single element as part of the
convention mentioned in the description of `X_ohe_hat_num`.
stats
Dictionary of statistic of the training data. Contains the minimum and maximum value of each numerical
feature in the training set. Each key is an index of the column and each value is another dictionary
containing ``'min'`` and ``'max'`` keys.
Returns
-------
X_ohe_hat_num
List of clamped input vectors according to the conditional vectors and the dictionary of statistics.
"""
num_cols = X_hat_num_split[0].shape[1] # number of numerical columns
sorted_cols = sorted(stats.keys()) # ensure that the column ids are sorted
for i, col_id in zip(range(num_cols), sorted_cols):
# Extract the minimum and the maximum value for the current column from the training set.
min, max = stats[col_id]["min"], stats[col_id]["max"]
if C_num_split is not None:
# Extract the minimum and the maximum value according to the conditional vector.
lhs = X_ohe_num_split[0][:, i] + C_num_split[0][:, 2 * i] * (max - min)
rhs = X_ohe_num_split[0][:, i] + C_num_split[0][:, 2 * i + 1] * (max - min)
# Clamp output according to the conditional vector.
X_hat_num_split[0][:, i] = np.clip(X_hat_num_split[0][:, i], a_min=lhs, a_max=rhs)
# Clamp output according to the minimum and maximum value from the training set.
X_hat_num_split[0][:, i] = np.clip(X_hat_num_split[0][:, i], a_min=min, a_max=max)
return X_hat_num_split
[docs]
def sample_categorical(X_hat_cat_split: List[np.ndarray],
C_cat_split: Optional[List[np.ndarray]]) -> List[np.ndarray]:
"""
Samples categorical features according to the conditional vector. This method sample conditional according to
the masking vector the most probable outcome.
Parameters
----------
X_hat_cat_split
List of reconstructed categorical heads from the auto-encoder. The categorical columns contain logits.
C_cat_split
List of conditional vector for categorical heads.
Returns
-------
X_ohe_hat_cat
List of one-hot encoded vectors sampled according to the conditional vector.
"""
X_ohe_hat_cat = [] # initialize the returning list
rows = np.arange(X_hat_cat_split[0].shape[0]) # initialize the returning list
for i in range(len(X_hat_cat_split)):
# compute probability distribution
proba = softmax(X_hat_cat_split[i], axis=1)
proba = proba * C_cat_split[i] if (C_cat_split is not None) else proba
# sample the most probable outcome conditioned on the conditional vector
cols = np.argmax(proba, axis=1)
samples = np.zeros_like(proba)
samples[rows, cols] = 1
X_ohe_hat_cat.append(samples)
return X_ohe_hat_cat
[docs]
def sample(X_hat_split: List[np.ndarray],
X_ohe: np.ndarray,
C: Optional[np.ndarray],
category_map: Dict[int, List[str]],
stats: Dict[int, Dict[str, float]]) -> List[np.ndarray]:
"""
Samples an instance from the given reconstruction according to the conditional vector and
the dictionary of statistics.
Parameters
----------
X_hat_split
List of reconstructed columns from the auto-encoder. The categorical columns contain logits.
X_ohe
One-hot encoded representation of the input.
C
Conditional vector.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the possible
values for a feature.
stats
Dictionary of statistic of the training data. Contains the minimum and maximum value of each numerical
feature in the training set. Each key is an index of the column and each value is another dictionary
containing ``'min'`` and ``'max'`` keys.
Returns
-------
X_ohe_hat_split
Most probable reconstruction sample according to the auto-encoder, sampled according to the conditional vector
and the dictionary of statistics. This method assumes that the input array, `X_ohe` , has the first columns
corresponding to the numerical features, and the rest are one-hot encodings of the categorical columns.
"""
X_ohe_num_split, X_ohe_cat_split = split_ohe(X_ohe, category_map)
C_num_split, C_cat_split = split_ohe(C, category_map) if (C is not None) else (None, None)
X_ohe_hat_split = [] # list of sampled numerical columns and sampled categorical columns
num_feat, cat_feat = len(X_ohe_num_split), len(X_ohe_cat_split)
if num_feat > 0:
# Sample numerical columns
X_ohe_hat_split += sample_numerical(X_hat_num_split=X_hat_split[:num_feat],
X_ohe_num_split=X_ohe_num_split,
C_num_split=C_num_split,
stats=stats)
if cat_feat > 0:
# Sample categorical columns
X_ohe_hat_split += sample_categorical(X_hat_cat_split=X_hat_split[-cat_feat:],
C_cat_split=C_cat_split)
return X_ohe_hat_split
[docs]
def get_he_preprocessor(X: np.ndarray,
feature_names: List[str],
category_map: Dict[int, List[str]],
feature_types: Optional[Dict[str, type]] = None
) -> Tuple[Callable[[np.ndarray], np.ndarray], Callable[[np.ndarray], np.ndarray]]:
"""
Heterogeneous dataset preprocessor. The numerical features are standardized and the categorical features
are one-hot encoded.
Parameters
----------
X
Data to fit.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
feature_types
Dictionary of type for the numerical features.
Returns
-------
preprocessor
Data preprocessor.
inv_preprocessor
Inverse data preprocessor (e.g., `inv_preprocessor(preprocessor(x)) = x` )
"""
if feature_types is None:
feature_types = dict()
# Separate columns in numerical and categorical
categorical_ids = list(category_map.keys())
numerical_ids = [i for i in range(len(feature_names)) if i not in category_map.keys()]
# Define standard scaler and one-hot encoding transformations
num_transf = StandardScaler()
cat_transf = OneHotEncoder(
categories=[range(len(x)) for x in category_map.values()],
handle_unknown="ignore"
)
# Define preprocessor
preprocessor = ColumnTransformer(
transformers=[
("num", num_transf, numerical_ids),
("cat", cat_transf, categorical_ids)
],
sparse_threshold=0
)
preprocessor.fit(X)
num_feat_ohe = len(numerical_ids) # number of numerical columns
cat_feat_ohe = sum([len(v) for v in category_map.values()]) # number of categorical columns
# Define inverse preprocessor
def get_inv_preprocessor(X_ohe: np.ndarray):
X_inv = []
if "num" in preprocessor.named_transformers_ and len(numerical_ids):
num_transf = preprocessor.named_transformers_["num"]
X_ohe_num = X_ohe[:, :num_feat_ohe] if preprocessor.transformers[0][0] == "num" else \
X_ohe[:, -num_feat_ohe:]
X_inv.append(num_transf.inverse_transform(X_ohe_num))
if "cat" in preprocessor.named_transformers_ and len(categorical_ids):
cat_transf = preprocessor.named_transformers_["cat"]
X_ohe_cat = X_ohe[:, :cat_feat_ohe] if preprocessor.transformers[0][0] == "cat" else \
X_ohe[:, -cat_feat_ohe:]
X_inv.append(cat_transf.inverse_transform(X_ohe_cat))
# Concatenate all columns. At this point the columns are not ordered correctly
np_X_inv = np.concatenate(X_inv, axis=1)
# Construct permutation to order the columns correctly
perm = [i for i in range(len(feature_names)) if i not in category_map.keys()]
perm += [i for i in range(len(feature_names)) if i in category_map.keys()]
inv_perm = [0] * len(perm)
for i in range(len(perm)):
inv_perm[perm[i]] = i
# Permute columns and cast to object
np_X_inv = np_X_inv[:, inv_perm].astype(object)
# Cast numerical features to desired data types
for i, fn in enumerate(feature_names):
if i in numerical_ids:
ft_type = feature_types[fn] if fn in feature_types else float
# Round `int` type features to the closest integer number to avoid rounding error when casting to `int`.
# The casting to `np.float32` is due to previous casting to `object` which raises an error when
# applying `np.rint` (i.e., 'TypeError: loop of ufunc does not support argument 0 of type float which
# has no callable rint method')
if ft_type == int:
np_X_inv[:, i] = np.rint(np_X_inv[:, i].astype(np.float32))
else:
ft_type = int # for categorical features
np_X_inv[:, i] = np_X_inv[:, i].astype(ft_type)
return np_X_inv
return preprocessor.transform, get_inv_preprocessor
[docs]
def get_statistics(X: np.ndarray,
preprocessor: Callable[[np.ndarray], np.ndarray],
category_map: Dict[int, List[str]]) -> Dict[int, Dict[str, float]]:
"""
Computes statistics.
Parameters
----------
X
Instances for which to compute statistic.
preprocessor
Data preprocessor. The preprocessor should standardize the numerical values and convert categorical ones
into one-hot encoding representation. By convention, numerical features should be first, followed by the
rest of categorical ones.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
Returns
-------
Dictionary of statistics. For each numerical column, the minimum and maximum value is returned.
"""
stats = dict()
# Extract numerical features
num_features_ids = [id for id in range(X.shape[1]) if id not in category_map]
# Preprocess data (standardize + one-hot encoding)
X_ohe = preprocessor(X)
for i, feature_id in enumerate(num_features_ids):
min, max = np.min(X_ohe[:, i]), np.max(X_ohe[:, i])
stats[feature_id] = {"min": min, "max": max}
return stats
[docs]
def get_numerical_conditional_vector(X: np.ndarray,
condition: Dict[str, List[Union[float, str]]],
preprocessor: Callable[[np.ndarray], np.ndarray],
feature_names: List[str],
category_map: Dict[int, List[str]],
stats: Dict[int, Dict[str, float]],
ranges: Optional[Dict[str, List[float]]] = None,
immutable_features: Optional[List[str]] = None,
diverse=False) -> List[np.ndarray]:
"""
Generates a conditional vector. The condition is expressed a a delta change of the feature.
For numerical features, if the ``'Age'`` feature is allowed to increase up to 10 more years, the delta change is
[0, 10]. If the ``'Hours per week'`` is allowed to decrease down to -5 and increases up to +10, then the
delta change is [-5, +10]. Note that the interval must go include 0.
Parameters
----------
X
Instances for which to generate the conditional vector in the original input format.
condition
Dictionary of conditions per feature. For numerical features it expects a range that contains the original
value. For categorical features it expects a list of feature values per features that includes the original
value.
preprocessor
Data preprocessor. The preprocessor should standardize the numerical values and convert categorical ones
into one-hot encoding representation. By convention, numerical features should be first, followed by the
rest of categorical ones.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
stats
Dictionary of statistic of the training data. Contains the minimum and maximum value of each numerical
feature in the training set. Each key is an index of the column and each value is another dictionary
containing ``'min'`` and ``'max'`` keys.
ranges
Dictionary of ranges for numerical feature. Each value is a list containing two elements, first one
negative and the second one positive.
immutable_features
List of immutable features.
diverse
Whether to generate a diverse set of conditional vectors. A diverse set of conditional vector can generate
a diverse set of counterfactuals for a given input instance.
Returns
-------
List of conditional vectors for each numerical feature.
"""
if ranges is None:
ranges = dict()
if immutable_features is None:
immutable_features = list()
# Extract numerical features
num_features_ids = [id for id in range(X.shape[1]) if id not in category_map]
num_features_names = [feature_names[id] for id in num_features_ids]
# Need to standardize numerical features. Thus, we use the preprocessor
X_low, X_high = X.copy(), X.copy()
for feature_id, feature_name in enumerate(feature_names):
if feature_id in category_map:
continue
if feature_name in condition:
if int(condition[feature_name][0]) > 0: # int conversion because of mypy error (the value can be str too)
raise ValueError(f"Lower bound on the conditional vector for {feature_name} should be negative.")
if int(condition[feature_name][1]) < 0: # int conversion because of mypy error (the value can be str too)
raise ValueError(f"Upper bound on the conditional vector for {feature_name} should be positive.")
X_low[:, feature_id] += condition[feature_name][0]
X_high[:, feature_id] += condition[feature_name][1]
# Preprocess the vectors (standardize + one-hot encoding)
X_low_ohe = preprocessor(X_low)
X_high_ohe = preprocessor(X_high)
X_ohe = preprocessor(X)
# Initialize conditional vector buffer.
C = []
# Scale the numerical features in [0, 1] and add them to the conditional vector
for i, (feature_id, feature_name) in enumerate(zip(num_features_ids, num_features_names)):
if feature_name in immutable_features:
range_low, range_high = 0., 0.
elif feature_name in ranges:
range_low, range_high = ranges[feature_name][0], ranges[feature_name][1]
else:
range_low, range_high = -1., 1.
if (feature_name in condition) and (feature_name not in immutable_features):
# Mutable feature with conditioning
min, max = stats[feature_id]["min"], stats[feature_id]["max"]
X_low_ohe[:, i] = (X_low_ohe[:, i] - X_ohe[:, i]) / (max - min)
X_high_ohe[:, i] = (X_high_ohe[:, i] - X_ohe[:, i]) / (max - min)
# Clip in [0, 1]
X_low_ohe[:, i] = np.clip(X_low_ohe[:, i], a_min=range_low, a_max=0)
X_high_ohe[:, i] = np.clip(X_high_ohe[:, i], a_min=0, a_max=range_high)
else:
# This means no conditioning
X_low_ohe[:, i] = range_low
X_high_ohe[:, i] = range_high
if diverse:
# Note that this is still a feasible counterfactual
X_low_ohe[:, i] *= np.random.rand(*X_low_ohe[:, i].shape)
X_high_ohe[:, i] *= np.random.rand(*X_high_ohe[:, i].shape)
# Append feature conditioning
C += [X_low_ohe[:, i].reshape(-1, 1), X_high_ohe[:, i].reshape(-1, 1)]
return C
[docs]
def get_categorical_conditional_vector(X: np.ndarray,
condition: Dict[str, List[Union[float, str]]],
preprocessor: Callable[[np.ndarray], np.ndarray],
feature_names: List[str],
category_map: Dict[int, List[str]],
immutable_features: Optional[List[str]] = None,
diverse=False) -> List[np.ndarray]:
"""
Generates a conditional vector. The condition is expressed a a delta change of the feature.
For categorical feature, if the ``'Occupation'`` can change to ``'Blue-Collar'`` or ``'White-Collar'``, the delta
change is ``['Blue-Collar', 'White-Collar']``. Note that the original value is optional as it is
included by default.
Parameters
----------
X
Instances for which to generate the conditional vector in the original input format.
condition
Dictionary of conditions per feature. For numerical features it expects a range that contains the original
value. For categorical features it expects a list of feature values per features that includes the original
value.
preprocessor
Data preprocessor. The preprocessor should standardize the numerical values and convert categorical ones
into one-hot encoding representation. By convention, numerical features should be first, followed by the
rest of categorical ones.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
immutable_features
List of immutable features.
diverse
Whether to generate a diverse set of conditional vectors. A diverse set of conditional vector can generate
a diverse set of counterfactuals for a given input instance.
Returns
-------
List of conditional vectors for each categorical feature.
"""
if immutable_features is None:
immutable_features = list()
# Define conditional vector buffer
C = []
# extract categorical features
cat_features_ids = [id for id in range(X.shape[1]) if id in category_map]
cat_feature_names = [feature_names[id] for id in cat_features_ids]
# Extract list of categorical one-hot encoded columns
X_ohe = preprocessor(X)
_, X_ohe_cat_split = split_ohe(X_ohe, category_map)
# For each categorical feature add the masking vector
for i, (feature_id, feature_name) in enumerate(zip(cat_features_ids, cat_feature_names)):
mask = np.zeros_like(X_ohe_cat_split[i])
if feature_name not in immutable_features:
if feature_name in condition:
indexes = [category_map[feature_id].index(str(feature_value)) for feature_value in
condition[feature_name]] # conversion to str because of mypy (can be also float)
mask[:, indexes] = 1
else:
# Allow any value
mask[:] = 1
if diverse:
# Note that by masking random entries we still have a feasible counterfactual
mask *= np.random.randint(low=0, high=2, size=mask.shape)
# Ensure that the original value is a possibility
mask = ((mask + X_ohe_cat_split[i]) > 0).astype(int)
# Append feature conditioning
C.append(mask)
return C
[docs]
def get_conditional_vector(X: np.ndarray,
condition: Dict[str, List[Union[float, str]]],
preprocessor: Callable[[np.ndarray], np.ndarray],
feature_names: List[str],
category_map: Dict[int, List[str]],
stats: Dict[int, Dict[str, float]],
ranges: Optional[Dict[str, List[float]]] = None,
immutable_features: Optional[List[str]] = None,
diverse=False) -> np.ndarray:
"""
Generates a conditional vector. The condition is expressed a a delta change of the feature.
For numerical features, if the ``'Age'`` feature is allowed to increase up to 10 more years, the delta change is
[0, 10]. If the ``'Hours per week'`` is allowed to decrease down to -5 and increases up to +10, then the
delta change is [-5, +10]. Note that the interval must go include 0.
For categorical feature, if the ``'Occupation'`` can change to ``'Blue-Collar'`` or ``'White-Collar'``,
the delta change is ``['Blue-Collar', 'White-Collar']``. Note that the original value is optional as it is
included by default.
Parameters
----------
X
Instances for which to generate the conditional vector in the original input format.
condition
Dictionary of conditions per feature. For numerical features it expects a range that contains the original
value. For categorical features it expects a list of feature values per features that includes the original
value.
preprocessor
Data preprocessor. The preprocessor should standardize the numerical values and convert categorical ones
into one-hot encoding representation. By convention, numerical features should be first, followed by the
rest of categorical ones.
feature_names
List of feature names. This should be provided by the dataset.
category_map
Dictionary of category mapping. The keys are column indexes and the values are lists containing the
possible feature values. This should be provided by the dataset.
stats
Dictionary of statistic of the training data. Contains the minimum and maximum value of each numerical
feature in the training set. Each key is an index of the column and each value is another dictionary
containing ``'min'`` and ``'max'`` keys.
ranges
Dictionary of ranges for numerical feature. Each value is a list containing two elements, first one
negative and the second one positive.
immutable_features
List of immutable features.
diverse
Whether to generate a diverse set of conditional vectors. A diverse set of conditional vector can generate
a diverse set of counterfactuals for a given input instance.
Returns
-------
Conditional vector.
"""
if ranges is None:
ranges = dict()
if immutable_features is None:
immutable_features = list()
# Reshape the vector.
X = X.reshape(1, -1) if len(X.shape) == 1 else X
# Check that the second dimension matches the number of features.
if X.shape[1] != len(feature_names):
raise ValueError(f"Unexpected number of features. The expected number "
f"is {len(feature_names)}, but the input has {X.shape[1]} features.")
# Get list of numerical conditional vectors.
C_num = get_numerical_conditional_vector(X=X,
condition=condition,
preprocessor=preprocessor,
feature_names=feature_names,
category_map=category_map,
stats=stats,
ranges=ranges,
immutable_features=immutable_features,
diverse=diverse)
# Get list of categorical conditional vectors.
C_cat = get_categorical_conditional_vector(X=X,
condition=condition,
preprocessor=preprocessor,
feature_names=feature_names,
category_map=category_map,
immutable_features=immutable_features,
diverse=diverse)
# concat all conditioning
return np.concatenate(C_num + C_cat, axis=1)
[docs]
def apply_category_mapping(X: np.ndarray, category_map: Dict[int, List[str]]) -> np.ndarray:
"""
Applies a category mapping for the categorical feature in the array. It transforms ints back to strings
to be readable.
Parameters
-----------
X
Array containing the columns to be mapped.
category_map
Dictionary of category mapping. Keys are columns index, and values are list of feature values.
Returns
-------
Transformed array.
"""
pd_X = pd.DataFrame(X)
for key in category_map:
pd_X[key].replace(range(len(category_map[key])), category_map[key], inplace=True)
return pd_X.to_numpy()
```