Source code for alibi.confidence.model_linearity

import logging
import string
from time import time
from typing import Callable, List, Optional, Tuple, Union

import numpy as np
from numpy.linalg import norm
from sklearn.neighbors import NearestNeighbors

logger = logging.getLogger(__name__)

def _linear_superposition(alphas, vecs, shape):
    """Calculate the tensor superposition along axis 1 of vecs with coefficient alphas using einsum.

        Coefficients of the superposition.
        Tensors of the superposition.
        Shape of each tensor.

    Linear tensor superposition.
    input_str = string.ascii_lowercase[2: 2 + len(shape)]
    einstr = 'a,ba{}->b{}'.format(input_str, input_str)
    return np.einsum(einstr, alphas, vecs)

def _calculate_global_linearity(predict_fn: Callable, input_shape: Tuple, X_samples: np.ndarray,
                                model_type: str, alphas: np.ndarray) -> np.ndarray:
    """Calculates the norm of the difference between the output of a linear superposition of vectors and the
    linear superposition of the outputs for each individual vector.

        Model prediction function.
        Shape of the input.
        Array of feature vectors in the linear superposition.
        Supported values: ``'classifier'`` | ``'regressor'``.
        Array of coefficients in the linear superposition.

    Linearity score.

    ss = X_samples.shape[:2]  # X_samples shape=(nb_instances, nb_samples, nb_features)
    X_samples = X_samples.reshape((X_samples.shape[0] * X_samples.shape[1],) + input_shape)

    t_0 = time()
    if model_type == 'classifier':
        outs = np.log(predict_fn(X_samples) + 1e-10)
        outs_shape = outs.shape[1:]
        outs = outs.reshape(ss + outs_shape)  # shape=(nb_instances, nb_samples, nb_classes)
    elif model_type == 'regressor':
        outs = predict_fn(X_samples)
        outs_shape = outs.shape[1:]
        if len(outs.shape) == 1:
            outs = outs.reshape(ss + (1,))  # shape=(nb_instances, nb_samples, 1)
        else:  # if regression on multiple targets
            outs = outs.reshape(ss + outs_shape)  # shape=(nb_instances, nb_samples, nb_targets)
        raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
    t_f = time() - t_0
    logger.debug('predict time {}'.format(t_f))

    if len(outs_shape) == 0:
        sum_out = np.matmul(alphas, outs)
        sum_out = _linear_superposition(alphas, outs, outs_shape)

    X_samples = X_samples.reshape(ss + input_shape)
    summ = _linear_superposition(alphas, X_samples, input_shape)

    if model_type == 'classifier':
        out_sum = np.log(predict_fn(summ) + 1e-10)
        out_sum_shape = out_sum.shape[1:]
    elif model_type == 'regressor':
        out_sum = predict_fn(summ)
        out_sum_shape = out_sum.shape[1:]
        if len(out_sum.shape) == 1:
            out_sum = out_sum.reshape((ss[0],) + (1,))
            out_sum = out_sum.reshape((ss[0],) + out_sum_shape)
        raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")

    diff = out_sum - sum_out
    linearity_score = norm(diff.reshape(diff.shape[0], -1), axis=1)

    return linearity_score

def _calculate_pairwise_linearity(predict_fn: Callable, x: np.ndarray, input_shape: Tuple, X_samples: np.ndarray,
                                  model_type: str, alphas: np.ndarray) -> np.ndarray:
    """Calculates the norm of the difference between the output of a linear superposition of a test vector `x` and
    vectors in `X_samples` and the linear superposition of the outputs, averaged over all the vectors in `X_samples`.

        Model prediction function.
        Test instance for which to calculate the linearity measure.
        Shape of the input.
        Array of feature vectors in the linear superposition.
        Supported values: ``'classifier'`` | ``'regressor'``.
        Array of coefficients in the linear superposition.

    Linearity score.

    ss = X_samples.shape[:2]  # X_samples shape=(nb_instances, nb_samples, nb_features)
    X_samples = X_samples.reshape((X_samples.shape[0] * X_samples.shape[1],) + input_shape)

    t_0 = time()
    if model_type == 'classifier':
        outs = np.log(predict_fn(X_samples) + 1e-10)
        outs_shape = outs.shape[1:]
        x_out = np.log(predict_fn(x) + 1e-10)  # shape=(nb_instances, nb_classes)
        outs = outs.reshape(ss + outs_shape)  # shape=(nb_instances, nb_samples, nb_classes)
    elif model_type == 'regressor':
        outs = predict_fn(X_samples)
        outs_shape = outs.shape[1:]
        x_out = predict_fn(x)
        if len(outs.shape) == 1:
            outs = outs.reshape(ss + (1,))  # shape=(nb_instances, nb_samples, 1)
            x_out = x_out.reshape(x_out.shape + (1,))  # shape=(nb_instances, 1)
        else:  # if regression on multiple targets
            outs = outs.reshape(ss + outs_shape)  # shape=(nb_instances, nb_samples, nb_targets)
        raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
    t_f = time() - t_0
    logger.debug('predict time', t_f)

    x_out_stack = np.repeat(x_out.reshape((x_out.shape[0], 1,) + (x_out.shape[1:])), outs.shape[1], axis=1)

    # linear superposition of the outputs
    sum_out = np.matmul(np.array([x_out_stack, outs]).T, alphas).T  # shape=(nb_instances,nb_samples,nb_targets)

    X_samples = X_samples.reshape(ss + input_shape)
    x_stack = np.repeat(x.reshape((x.shape[0], 1,) + (x.shape[1:])), X_samples.shape[1], axis=1)

    # linear superposition of the inputs
    summ = np.matmul(np.array([x_stack, X_samples]).T, alphas).T  # shape=(nb_instances,nb_samples,input_shape)
    if model_type == 'classifier':
        # output of the linear superposition of inputs
        out_sum = np.log(predict_fn(summ.reshape((summ.shape[0] * summ.shape[1],) + summ.shape[2:])) + 1e-10)
        out_sum_shape = out_sum.shape[1:]
        out_sum = out_sum.reshape(ss + out_sum_shape)
    elif model_type == 'regressor':
        out_sum = predict_fn(summ.reshape((summ.shape[0] * summ.shape[1],) + summ.shape[2:]))
        out_sum_shape = out_sum.shape[1:]
        if len(out_sum.shape) == 1:
            out_sum = out_sum.reshape(ss + (1,))
            out_sum = out_sum.reshape(ss + out_sum_shape)
        raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")

    diff = out_sum - sum_out
    linearity_score = norm(diff.reshape(diff.shape[0], diff.shape[1], -1), axis=2).mean(axis=1)

    return linearity_score

def _sample_knn(x: np.ndarray, X_train: np.ndarray, nb_samples: int = 10) -> np.ndarray:
    """Samples data points from a training set around instance x using k-nearest neighbours.

        Central instance for sampling.
        Training set.
        Number of samples to generate.

    Sampled vectors.

    x = x.reshape(x.shape[0], -1)
    nb_instances = x.shape[0]
    X_sampled = []
    for i in range(nb_instances):
        X_train = X_train.reshape(X_train.shape[0], -1)
        X_stack = np.stack([x[i] for _ in range(X_train.shape[0])], axis=0)
        X_stack = X_stack.reshape(X_stack.shape[0], -1)

        nbrs = NearestNeighbors(n_neighbors=nb_samples, algorithm='ball_tree').fit(X_train)
        distances, indices = nbrs.kneighbors(X_stack)
        distances, indices = distances[0], indices[0]

        X_sampled_tmp = X_train[indices]

    return np.asarray(X_sampled)  # shape=(nb_instances, nb_samples, nb_features)

def _sample_grid(x: np.ndarray, feature_range: np.ndarray, epsilon: float = 0.04,
                 nb_samples: int = 10, res: int = 100) -> np.ndarray:
    """Samples data points uniformly from an interval centered at `x` and with size `epsilon * delta`,
    with `delta = f_max - f_min` the features ranges.

        Instance of interest.
        Array with min and max values for each feature.
        Size of the sampling region around central instance as percentage of features range.
        Number of samples to generate.

    Sampled vectors.

    nb_instances = x.shape[0]
    x = x.reshape(x.shape[0], -1)
    dim = x.shape[1]  # number of features

    size = np.round(epsilon * res).astype(int)
    if size <= 2:
        size = 2

    deltas = (np.abs(feature_range[:, 1] - feature_range[:, 0]) / float(res))  # shape=(nb_features)

    rnd_sign = 2 * (np.random.randint(2, size=(nb_instances, nb_samples, dim))) - 1
    rnd = np.random.randint(size, size=(nb_instances, nb_samples, dim)) + 1
    rnd = rnd_sign * rnd  # shape=(nb_instances, nb_samples, nb_features)

    vprime = rnd * deltas
    X_sampled = x.reshape(x.shape[0], 1, x.shape[1]) + vprime  # shape=(nb_instances, nb_samples, nb_features)

    return X_sampled

def _linearity_measure(predict_fn: Callable,
                       x: np.ndarray,
                       X_train: Optional[np.ndarray] = None,
                       feature_range: Optional[Union[List, np.ndarray]] = None,
                       method: Optional[str] = None,
                       epsilon: float = 0.04,
                       nb_samples: int = 10,
                       res: int = 100,
                       alphas: Optional[np.ndarray] = None,
                       model_type: str = 'classifier',
                       agg: str = 'global') -> np.ndarray:
    """Calculate the linearity measure of the model around an instance of interest `x`.

        Model prediction function.
        Instance of interest.
        Training set.
        Array with min and max values for each feature.
        Method for sampling. Supported values: ``'knn'`` | ``'grid'``.
        Size of the sampling region around the central instance as a percentage of feature range.
        Number of samples to generate.
        Resolution of the grid. Number of intervals in which the feature range is discretized.
        Array of coefficients in the superposition.
        Type of task. Supported values: ``'regressor'`` | ``'classifier'``.
        Aggregation method. Supported values: ``'global'`` | ``'pairwise'``.

    Linearity score.

    input_shape = x.shape[1:]

    if method == 'knn':
        assert X_train is not None, "The 'knn' method requires X_train != None"
        X_sampled = _sample_knn(x, X_train, nb_samples=nb_samples)
    elif method == 'grid':
        assert feature_range is not None, "The 'grid' method requires feature_range != None."
        if isinstance(feature_range, list):
            feature_range = np.asarray(feature_range)
        X_sampled = _sample_grid(x, feature_range=feature_range, epsilon=epsilon,
                                 nb_samples=nb_samples, res=res)
        raise ValueError('Method not understood. Supported methods: "knn", "grid"')

    if agg == 'pairwise':
        if alphas is None:
            alphas = np.array([0.5, 0.5])
        score = _calculate_pairwise_linearity(predict_fn, x, input_shape, X_sampled, model_type, alphas)
    elif agg == 'global':
        if alphas is None:
            alphas = np.array([1 / float(nb_samples) for _ in range(nb_samples)])
        score = _calculate_global_linearity(predict_fn, input_shape, X_sampled, model_type, alphas)
        raise ValueError('Aggregation argument supported values: "global" or "pairwise "')

    return score

[docs] def infer_feature_range(X_train: np.ndarray) -> np.ndarray: """Infers the feature range from the training set. Parameters ---------- X_train Training set. Returns ------- Feature range. """ X_train = X_train.reshape(X_train.shape[0], -1) return np.vstack((X_train.min(axis=0), X_train.max(axis=0))).T
[docs] class LinearityMeasure:
[docs] def __init__(self, method: str = 'grid', epsilon: float = 0.04, nb_samples: int = 10, res: int = 100, alphas: Optional[np.ndarray] = None, model_type: str = 'classifier', agg: str = 'pairwise', verbose: bool = False) -> None: """ Parameters ---------- method Method for sampling. Supported methods: ``'knn'`` | ``'grid'``. epsilon Size of the sampling region around the central instance as a percentage of the features range. nb_samples Number of samples to generate. res Resolution of the grid. Number of intervals in which the feature range is discretized. alphas Coefficients in the superposition. agg Aggregation method. Supported values: ``'global'`` | ``'pairwise'``. model_type Type of task. Supported values: ``'regressor'`` | ``'classifier'``. """ self.method = method self.epsilon = epsilon self.nb_samples = nb_samples self.res = res self.alphas = alphas self.model_type = model_type self.agg = agg self.verbose = verbose self.is_fit = False
[docs] def fit(self, X_train: np.ndarray) -> None: """ Parameters ---------- X_train Training set. """ self.X_train = X_train self.feature_range = infer_feature_range(X_train) self.input_shape = X_train.shape[1:] self.is_fit = True
[docs] def score(self, predict_fn: Callable, x: np.ndarray) -> np.ndarray: """ Parameters ---------- predict_fn Prediction function. x Instance of interest. Returns ------- Linearity measure. """ input_shape = x.shape[1:] if self.is_fit: assert input_shape == self.input_shape if self.method == 'knn': if not self.is_fit: raise ValueError("Method 'knn' cannot be used without calling fit().") lin = _linearity_measure(predict_fn, x, X_train=self.X_train, feature_range=None, method=self.method, nb_samples=self.nb_samples, res=self.res, epsilon=self.epsilon, alphas=self.alphas, model_type=self.model_type, agg=self.agg) elif self.method == 'grid': if not self.is_fit: self.feature_range = np.asarray([[0, 1]] * x.shape[1]) # hardcoded (e.g. from 0 to 1) lin = _linearity_measure(predict_fn, x, X_train=None, feature_range=self.feature_range, method=self.method, nb_samples=self.nb_samples, res=self.res, epsilon=self.epsilon, alphas=self.alphas, model_type=self.model_type, agg=self.agg) else: raise ValueError('Method not understood. Supported methods: "knn", "grid"') return lin
[docs] def linearity_measure(predict_fn: Callable, x: np.ndarray, feature_range: Optional[Union[List, np.ndarray]] = None, method: str = 'grid', X_train: Optional[np.ndarray] = None, epsilon: float = 0.04, nb_samples: int = 10, res: int = 100, alphas: Optional[np.ndarray] = None, agg: str = 'global', model_type: str = 'classifier') -> np.ndarray: """Calculate the linearity measure of the model around an instance of interest x. Parameters ---------- predict_fn Predict function. x Instance of interest. feature_range Array with min and max values for each feature. method Method for sampling. Supported values: ``'knn'`` | ``'grid'``. X_train Training set. epsilon Size of the sampling region as a percentage of the feature range. nb_samples Number of samples to generate. res Resolution of the grid. Number of intervals in which the features range is discretized. alphas Coefficients in the superposition. agg Aggregation method. Supported values: ``'global'`` | ``'pairwise'``. model_type Type of task. Supported values: ``'regressor'`` | ``'classifier'``. Returns ------- Linearity measure. """ if method == 'knn': assert X_train is not None, " Method 'knn' requires X_train != None" lin = _linearity_measure(predict_fn, x, X_train=X_train, feature_range=None, method=method, nb_samples=nb_samples, res=res, epsilon=epsilon, alphas=alphas, model_type=model_type, agg=agg) elif method == 'grid': assert feature_range is not None or X_train is not None, "Method 'grid' requires " \ "feature_range != None or X_train != None" if X_train is not None and feature_range is None: feature_range = infer_feature_range(X_train) # infer from dataset elif feature_range is not None: feature_range = np.asarray(feature_range) lin = _linearity_measure(predict_fn, x, X_train=None, feature_range=feature_range, method=method, nb_samples=nb_samples, res=res, epsilon=epsilon, alphas=alphas, model_type=model_type, agg=agg) else: raise ValueError('Method not understood. Supported methods: "knn", "grid"') return lin