import logging
import string
from time import time
from typing import Callable, List, Optional, Tuple, Union
import numpy as np
from numpy.linalg import norm
from sklearn.neighbors import NearestNeighbors
logger = logging.getLogger(__name__)
def _linear_superposition(alphas, vecs, shape):
"""Calculate the tensor superposition along axis 1 of vecs with coefficient alphas using einsum.
Parameters
----------
alphas
Coefficients of the superposition.
vecs
Tensors of the superposition.
shape
Shape of each tensor.
Returns
-------
Linear tensor superposition.
"""
input_str = string.ascii_lowercase[2: 2 + len(shape)]
einstr = 'a,ba{}->b{}'.format(input_str, input_str)
return np.einsum(einstr, alphas, vecs)
def _calculate_global_linearity(predict_fn: Callable, input_shape: Tuple, X_samples: np.ndarray,
model_type: str, alphas: np.ndarray) -> np.ndarray:
"""Calculates the norm of the difference between the output of a linear superposition of vectors and the
linear superposition of the outputs for each individual vector.
Parameters
----------
predict_fn
Model prediction function.
input_shape
Shape of the input.
X_samples
Array of feature vectors in the linear superposition.
model_type
Supported values: ``'classifier'`` | ``'regressor'``.
alphas
Array of coefficients in the linear superposition.
Returns
-------
Linearity score.
"""
ss = X_samples.shape[:2] # X_samples shape=(nb_instances, nb_samples, nb_features)
X_samples = X_samples.reshape((X_samples.shape[0] * X_samples.shape[1],) + input_shape)
t_0 = time()
if model_type == 'classifier':
outs = np.log(predict_fn(X_samples) + 1e-10)
outs_shape = outs.shape[1:]
outs = outs.reshape(ss + outs_shape) # shape=(nb_instances, nb_samples, nb_classes)
elif model_type == 'regressor':
outs = predict_fn(X_samples)
outs_shape = outs.shape[1:]
if len(outs.shape) == 1:
outs = outs.reshape(ss + (1,)) # shape=(nb_instances, nb_samples, 1)
else: # if regression on multiple targets
outs = outs.reshape(ss + outs_shape) # shape=(nb_instances, nb_samples, nb_targets)
else:
raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
t_f = time() - t_0
logger.debug('predict time {}'.format(t_f))
if len(outs_shape) == 0:
sum_out = np.matmul(alphas, outs)
else:
sum_out = _linear_superposition(alphas, outs, outs_shape)
X_samples = X_samples.reshape(ss + input_shape)
summ = _linear_superposition(alphas, X_samples, input_shape)
if model_type == 'classifier':
out_sum = np.log(predict_fn(summ) + 1e-10)
out_sum_shape = out_sum.shape[1:]
elif model_type == 'regressor':
out_sum = predict_fn(summ)
out_sum_shape = out_sum.shape[1:]
if len(out_sum.shape) == 1:
out_sum = out_sum.reshape((ss[0],) + (1,))
else:
out_sum = out_sum.reshape((ss[0],) + out_sum_shape)
else:
raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
logger.debug(out_sum.shape)
logger.debug(sum_out.shape)
diff = out_sum - sum_out
linearity_score = norm(diff.reshape(diff.shape[0], -1), axis=1)
return linearity_score
def _calculate_pairwise_linearity(predict_fn: Callable, x: np.ndarray, input_shape: Tuple, X_samples: np.ndarray,
model_type: str, alphas: np.ndarray) -> np.ndarray:
"""Calculates the norm of the difference between the output of a linear superposition of a test vector `x` and
vectors in `X_samples` and the linear superposition of the outputs, averaged over all the vectors in `X_samples`.
Parameters
----------
predict_fn
Model prediction function.
x
Test instance for which to calculate the linearity measure.
input_shape
Shape of the input.
X_samples
Array of feature vectors in the linear superposition.
model_type
Supported values: ``'classifier'`` | ``'regressor'``.
alphas
Array of coefficients in the linear superposition.
Returns
-------
Linearity score.
"""
ss = X_samples.shape[:2] # X_samples shape=(nb_instances, nb_samples, nb_features)
X_samples = X_samples.reshape((X_samples.shape[0] * X_samples.shape[1],) + input_shape)
t_0 = time()
if model_type == 'classifier':
outs = np.log(predict_fn(X_samples) + 1e-10)
outs_shape = outs.shape[1:]
x_out = np.log(predict_fn(x) + 1e-10) # shape=(nb_instances, nb_classes)
outs = outs.reshape(ss + outs_shape) # shape=(nb_instances, nb_samples, nb_classes)
elif model_type == 'regressor':
outs = predict_fn(X_samples)
outs_shape = outs.shape[1:]
x_out = predict_fn(x)
if len(outs.shape) == 1:
outs = outs.reshape(ss + (1,)) # shape=(nb_instances, nb_samples, 1)
x_out = x_out.reshape(x_out.shape + (1,)) # shape=(nb_instances, 1)
else: # if regression on multiple targets
outs = outs.reshape(ss + outs_shape) # shape=(nb_instances, nb_samples, nb_targets)
else:
raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
t_f = time() - t_0
logger.debug('predict time', t_f)
x_out_stack = np.repeat(x_out.reshape((x_out.shape[0], 1,) + (x_out.shape[1:])), outs.shape[1], axis=1)
# linear superposition of the outputs
sum_out = np.matmul(np.array([x_out_stack, outs]).T, alphas).T # shape=(nb_instances,nb_samples,nb_targets)
X_samples = X_samples.reshape(ss + input_shape)
x_stack = np.repeat(x.reshape((x.shape[0], 1,) + (x.shape[1:])), X_samples.shape[1], axis=1)
# linear superposition of the inputs
summ = np.matmul(np.array([x_stack, X_samples]).T, alphas).T # shape=(nb_instances,nb_samples,input_shape)
if model_type == 'classifier':
# output of the linear superposition of inputs
out_sum = np.log(predict_fn(summ.reshape((summ.shape[0] * summ.shape[1],) + summ.shape[2:])) + 1e-10)
out_sum_shape = out_sum.shape[1:]
out_sum = out_sum.reshape(ss + out_sum_shape)
elif model_type == 'regressor':
out_sum = predict_fn(summ.reshape((summ.shape[0] * summ.shape[1],) + summ.shape[2:]))
out_sum_shape = out_sum.shape[1:]
if len(out_sum.shape) == 1:
out_sum = out_sum.reshape(ss + (1,))
else:
out_sum = out_sum.reshape(ss + out_sum_shape)
else:
raise ValueError("Passed 'model_type' not supported. Supported model types: 'classifier', 'regressor'")
logger.debug(out_sum.shape)
logger.debug(sum_out.shape)
diff = out_sum - sum_out
linearity_score = norm(diff.reshape(diff.shape[0], diff.shape[1], -1), axis=2).mean(axis=1)
return linearity_score
def _sample_knn(x: np.ndarray, X_train: np.ndarray, nb_samples: int = 10) -> np.ndarray:
"""Samples data points from a training set around instance x using k-nearest neighbours.
Parameters
----------
x
Central instance for sampling.
X_train
Training set.
nb_samples
Number of samples to generate.
Returns
-------
Sampled vectors.
"""
x = x.reshape(x.shape[0], -1)
nb_instances = x.shape[0]
X_sampled = []
for i in range(nb_instances):
X_train = X_train.reshape(X_train.shape[0], -1)
X_stack = np.stack([x[i] for _ in range(X_train.shape[0])], axis=0)
X_stack = X_stack.reshape(X_stack.shape[0], -1)
nbrs = NearestNeighbors(n_neighbors=nb_samples, algorithm='ball_tree').fit(X_train)
distances, indices = nbrs.kneighbors(X_stack)
distances, indices = distances[0], indices[0]
X_sampled_tmp = X_train[indices]
X_sampled.append(X_sampled_tmp)
return np.asarray(X_sampled) # shape=(nb_instances, nb_samples, nb_features)
def _sample_grid(x: np.ndarray, feature_range: np.ndarray, epsilon: float = 0.04,
nb_samples: int = 10, res: int = 100) -> np.ndarray:
"""Samples data points uniformly from an interval centered at `x` and with size `epsilon * delta`,
with `delta = f_max - f_min` the features ranges.
Parameters
----------
x
Instance of interest.
feature_range
Array with min and max values for each feature.
epsilon
Size of the sampling region around central instance as percentage of features range.
nb_samples
Number of samples to generate.
Returns
-------
Sampled vectors.
"""
nb_instances = x.shape[0]
x = x.reshape(x.shape[0], -1)
dim = x.shape[1] # number of features
size = np.round(epsilon * res).astype(int)
if size <= 2:
size = 2
deltas = (np.abs(feature_range[:, 1] - feature_range[:, 0]) / float(res)) # shape=(nb_features)
rnd_sign = 2 * (np.random.randint(2, size=(nb_instances, nb_samples, dim))) - 1
rnd = np.random.randint(size, size=(nb_instances, nb_samples, dim)) + 1
rnd = rnd_sign * rnd # shape=(nb_instances, nb_samples, nb_features)
vprime = rnd * deltas
X_sampled = x.reshape(x.shape[0], 1, x.shape[1]) + vprime # shape=(nb_instances, nb_samples, nb_features)
return X_sampled
def _linearity_measure(predict_fn: Callable,
x: np.ndarray,
X_train: Optional[np.ndarray] = None,
feature_range: Optional[Union[List, np.ndarray]] = None,
method: Optional[str] = None,
epsilon: float = 0.04,
nb_samples: int = 10,
res: int = 100,
alphas: Optional[np.ndarray] = None,
model_type: str = 'classifier',
agg: str = 'global') -> np.ndarray:
"""Calculate the linearity measure of the model around an instance of interest `x`.
Parameters
----------
predict_fn
Model prediction function.
x
Instance of interest.
X_train
Training set.
feature_range
Array with min and max values for each feature.
method
Method for sampling. Supported values: ``'knn'`` | ``'grid'``.
epsilon
Size of the sampling region around the central instance as a percentage of feature range.
nb_samples
Number of samples to generate.
res
Resolution of the grid. Number of intervals in which the feature range is discretized.
alphas
Array of coefficients in the superposition.
model_type
Type of task. Supported values: ``'regressor'`` | ``'classifier'``.
agg
Aggregation method. Supported values: ``'global'`` | ``'pairwise'``.
Returns
-------
Linearity score.
"""
input_shape = x.shape[1:]
if method == 'knn':
assert X_train is not None, "The 'knn' method requires X_train != None"
X_sampled = _sample_knn(x, X_train, nb_samples=nb_samples)
elif method == 'grid':
assert feature_range is not None, "The 'grid' method requires feature_range != None."
if isinstance(feature_range, list):
feature_range = np.asarray(feature_range)
X_sampled = _sample_grid(x, feature_range=feature_range, epsilon=epsilon,
nb_samples=nb_samples, res=res)
else:
raise ValueError('Method not understood. Supported methods: "knn", "grid"')
logger.debug(x.shape)
logger.debug(X_sampled.shape)
if agg == 'pairwise':
if alphas is None:
alphas = np.array([0.5, 0.5])
score = _calculate_pairwise_linearity(predict_fn, x, input_shape, X_sampled, model_type, alphas)
elif agg == 'global':
if alphas is None:
alphas = np.array([1 / float(nb_samples) for _ in range(nb_samples)])
score = _calculate_global_linearity(predict_fn, input_shape, X_sampled, model_type, alphas)
else:
raise ValueError('Aggregation argument supported values: "global" or "pairwise "')
return score
[docs]
def infer_feature_range(X_train: np.ndarray) -> np.ndarray:
"""Infers the feature range from the training set.
Parameters
----------
X_train
Training set.
Returns
-------
Feature range.
"""
X_train = X_train.reshape(X_train.shape[0], -1)
return np.vstack((X_train.min(axis=0), X_train.max(axis=0))).T
[docs]
class LinearityMeasure:
[docs]
def __init__(self,
method: str = 'grid',
epsilon: float = 0.04,
nb_samples: int = 10,
res: int = 100,
alphas: Optional[np.ndarray] = None,
model_type: str = 'classifier',
agg: str = 'pairwise',
verbose: bool = False) -> None:
"""
Parameters
----------
method
Method for sampling. Supported methods: ``'knn'`` | ``'grid'``.
epsilon
Size of the sampling region around the central instance as a percentage of the features range.
nb_samples
Number of samples to generate.
res
Resolution of the grid. Number of intervals in which the feature range is discretized.
alphas
Coefficients in the superposition.
agg
Aggregation method. Supported values: ``'global'`` | ``'pairwise'``.
model_type
Type of task. Supported values: ``'regressor'`` | ``'classifier'``.
"""
self.method = method
self.epsilon = epsilon
self.nb_samples = nb_samples
self.res = res
self.alphas = alphas
self.model_type = model_type
self.agg = agg
self.verbose = verbose
self.is_fit = False
[docs]
def fit(self, X_train: np.ndarray) -> None:
"""
Parameters
----------
X_train
Training set.
"""
self.X_train = X_train
self.feature_range = infer_feature_range(X_train)
self.input_shape = X_train.shape[1:]
self.is_fit = True
[docs]
def score(self, predict_fn: Callable, x: np.ndarray) -> np.ndarray:
"""
Parameters
----------
predict_fn
Prediction function.
x
Instance of interest.
Returns
-------
Linearity measure.
"""
input_shape = x.shape[1:]
if self.is_fit:
assert input_shape == self.input_shape
if self.method == 'knn':
if not self.is_fit:
raise ValueError("Method 'knn' cannot be used without calling fit().")
lin = _linearity_measure(predict_fn, x, X_train=self.X_train, feature_range=None, method=self.method,
nb_samples=self.nb_samples, res=self.res, epsilon=self.epsilon, alphas=self.alphas,
model_type=self.model_type, agg=self.agg)
elif self.method == 'grid':
if not self.is_fit:
self.feature_range = np.asarray([[0, 1]] * x.shape[1]) # hardcoded (e.g. from 0 to 1)
lin = _linearity_measure(predict_fn, x, X_train=None, feature_range=self.feature_range,
method=self.method, nb_samples=self.nb_samples, res=self.res, epsilon=self.epsilon,
alphas=self.alphas, model_type=self.model_type, agg=self.agg)
else:
raise ValueError('Method not understood. Supported methods: "knn", "grid"')
return lin
[docs]
def linearity_measure(predict_fn: Callable,
x: np.ndarray,
feature_range: Optional[Union[List, np.ndarray]] = None,
method: str = 'grid',
X_train: Optional[np.ndarray] = None,
epsilon: float = 0.04,
nb_samples: int = 10,
res: int = 100,
alphas: Optional[np.ndarray] = None,
agg: str = 'global',
model_type: str = 'classifier') -> np.ndarray:
"""Calculate the linearity measure of the model around an instance of interest x.
Parameters
----------
predict_fn
Predict function.
x
Instance of interest.
feature_range
Array with min and max values for each feature.
method
Method for sampling. Supported values: ``'knn'`` | ``'grid'``.
X_train
Training set.
epsilon
Size of the sampling region as a percentage of the feature range.
nb_samples
Number of samples to generate.
res
Resolution of the grid. Number of intervals in which the features range is discretized.
alphas
Coefficients in the superposition.
agg
Aggregation method. Supported values: ``'global'`` | ``'pairwise'``.
model_type
Type of task. Supported values: ``'regressor'`` | ``'classifier'``.
Returns
-------
Linearity measure.
"""
if method == 'knn':
assert X_train is not None, " Method 'knn' requires X_train != None"
lin = _linearity_measure(predict_fn, x, X_train=X_train, feature_range=None, method=method,
nb_samples=nb_samples, res=res, epsilon=epsilon, alphas=alphas,
model_type=model_type, agg=agg)
elif method == 'grid':
assert feature_range is not None or X_train is not None, "Method 'grid' requires " \
"feature_range != None or X_train != None"
if X_train is not None and feature_range is None:
feature_range = infer_feature_range(X_train) # infer from dataset
elif feature_range is not None:
feature_range = np.asarray(feature_range)
lin = _linearity_measure(predict_fn, x, X_train=None, feature_range=feature_range, method=method,
nb_samples=nb_samples, res=res, epsilon=epsilon, alphas=alphas,
model_type=model_type, agg=agg)
else:
raise ValueError('Method not understood. Supported methods: "knn", "grid"')
return lin