import numpy as np
from sklearn.manifold import MDS
from typing import Dict, Tuple, Callable, Optional, Union
[docs]
def cityblock_batch(X: np.ndarray,
y: np.ndarray) -> np.ndarray:
"""
Calculate the L1 distances between a batch of arrays `X` and an array of the same shape `y`.
Parameters
----------
X
Batch of arrays to calculate the distances from.
y
Array to calculate the distance to.
Returns
-------
Array of distances from each array in `X` to `y`.
"""
X_dim = len(X.shape)
y_dim = len(y.shape)
if X_dim == y_dim:
assert y.shape[0] == 1, 'y must have batch size equal to 1'
else:
assert X.shape[1:] == y.shape, 'X and y must have matching shapes'
return np.abs(X - y).sum(axis=tuple(np.arange(1, X_dim))).reshape(X.shape[0], -1)
[docs]
def mvdm(X: np.ndarray,
y: np.ndarray,
cat_vars: dict,
alpha: int = 1) -> Dict[int, np.ndarray]:
"""
Calculate the pair-wise distances between categories of a categorical variable using
the Modified Value Difference Measure based on Cost et al (1993).
https://link.springer.com/article/10.1023/A:1022664626993
Parameters
----------
X
Batch of arrays.
y
Batch of labels or predictions.
cat_vars
Dict with as keys the categorical columns and as optional values
the number of categories per categorical variable.
alpha
Power of absolute difference between conditional probabilities.
Returns
-------
Dict with as keys the categorical columns and as values the pairwise distance matrix for the variable.
"""
# TODO: handle triangular inequality
# infer number of categories per categorical variable
n_y = len(np.unique(y))
cat_cols = list(cat_vars.keys())
for col in cat_cols:
if cat_vars[col] is not None:
continue
cat_vars[col] = len(np.unique(X[:, col]))
# conditional probabilities and pairwise distance matrix
d_pair = {}
for col, n_cat in cat_vars.items():
d_pair_col = np.zeros([n_cat, n_cat])
p_cond_col = np.zeros([n_cat, n_y])
for i in range(n_cat):
idx = np.where(X[:, col] == i)[0]
for i_y in range(n_y):
p_cond_col[i, i_y] = np.sum(y[idx] == i_y) / (y[idx].shape[0] + 1e-12)
for i in range(n_cat):
j = 0
while j < i: # symmetrical matrix
d_pair_col[i, j] = np.sum(np.abs(p_cond_col[i, :] - p_cond_col[j, :]) ** alpha)
j += 1
d_pair_col += d_pair_col.T
d_pair[col] = d_pair_col
return d_pair
[docs]
def abdm(X: np.ndarray,
cat_vars: dict,
cat_vars_bin: dict = dict()):
"""
Calculate the pair-wise distances between categories of a categorical variable using
the Association-Based Distance Metric based on Le et al (2005).
http://www.jaist.ac.jp/~bao/papers/N26.pdf
Parameters
----------
X
Batch of arrays.
cat_vars
Dict with as keys the categorical columns and as optional values
the number of categories per categorical variable.
cat_vars_bin
Dict with as keys the binned numerical columns and as optional values
the number of bins per variable.
Returns
-------
Dict with as keys the categorical columns and as values the pairwise distance matrix for the variable.
"""
# TODO: handle triangular inequality
# ensure numerical stability
eps = 1e-12
# infer number of categories per categorical variable
cat_cols = list(cat_vars.keys())
for col in cat_cols:
if cat_vars[col] is not None:
continue
cat_vars[col] = len(np.unique(X[:, col]))
# combine dict for categorical with binned features
cat_vars_combined = {**cat_vars, **cat_vars_bin}
d_pair: Dict = {}
X_cat_eq: Dict = {}
for col, n_cat in cat_vars.items():
X_cat_eq[col] = []
for i in range(n_cat): # for each category in categorical variable, store instances of each category
idx = np.where(X[:, col] == i)[0]
X_cat_eq[col].append(X[idx, :])
# conditional probabilities, also use the binned numerical features
p_cond = []
for col_t, n_cat_t in cat_vars_combined.items():
if col == col_t:
continue
p_cond_t = np.zeros([n_cat_t, n_cat])
for i in range(n_cat_t):
for j, X_cat_j in enumerate(X_cat_eq[col]):
idx = np.where(X_cat_j[:, col_t] == i)[0]
p_cond_t[i, j] = len(idx) / (X_cat_j.shape[0] + eps)
p_cond.append(p_cond_t)
# pairwise distance matrix
d_pair_col = np.zeros([n_cat, n_cat])
for i in range(n_cat):
j = 0
while j < i:
d_ij_tmp = 0
for p in p_cond: # loop over other categorical variables
for t in range(p.shape[0]): # loop over categories of each categorical variable
a, b = p[t, i], p[t, j]
d_ij_t = a * np.log((a + eps) / (b + eps)) + b * np.log((b + eps) / (a + eps)) # KL divergence
d_ij_tmp += d_ij_t
d_pair_col[i, j] = d_ij_tmp
j += 1
d_pair_col += d_pair_col.T
d_pair[col] = d_pair_col
return d_pair
[docs]
def multidim_scaling(d_pair: dict,
feature_range: Tuple[np.ndarray, np.ndarray],
n_components: int = 2,
use_metric: bool = True,
standardize_cat_vars: bool = True,
smooth: float = 1.,
center: bool = True,
update_feature_range: bool = True) -> Tuple[dict, tuple]:
"""
Apply multidimensional scaling to pairwise distance matrices.
Parameters
----------
d_pair
Dict with as keys the column index of the categorical variables and as values
a pairwise distance matrix for the categories of the variable.
feature_range
Tuple with `min` and `max` ranges to allow for perturbed instances. `Min` and `max` ranges are
`numpy` arrays with dimension (`1 x nb of features`).
n_components
Number of dimensions in which to immerse the dissimilarities.
use_metric
If ``True``, perform metric MDS; otherwise, perform nonmetric MDS.
standardize_cat_vars
Standardize numerical values of categorical variables if ``True``.
smooth
Smoothing exponent between 0 and 1 for the distances. Lower values than 1 will smooth the difference in
distance metric between different features.
center
Whether to center the scaled distance measures. If ``False``, the min distance for each feature
except for the feature with the highest raw max distance will be the lower bound of the
feature range, but the upper bound will be below the max feature range.
update_feature_range
Update feature range with scaled values.
Returns
-------
Dict with multidimensional scaled version of pairwise distance matrices.
"""
d_abs = {}
d_min, d_max = 1e10, 0
for k, v in d_pair.items():
# distance smoothening
v **= smooth
# fit multi-dimensional scaler
mds = MDS(n_components=n_components, max_iter=5000, eps=1e-9, random_state=0, n_init=4,
dissimilarity="precomputed", metric=use_metric)
d_fit = mds.fit(v)
emb = d_fit.embedding_ # coordinates in embedding space
# use biggest single observation Frobenius norm as origin
origin = np.argsort(np.linalg.norm(emb, axis=1))[-1]
# calculate distance from origin for each category
d_origin = np.linalg.norm(emb - emb[origin].reshape(1, -1), axis=1)
# assign to category
d_abs[k] = d_origin
# update min and max distance
d_min_k, d_max_k = d_origin.min(), d_origin.max()
d_min = d_min_k if d_min_k < d_min else d_min
d_max = d_max_k if d_max_k > d_max else d_max
d_abs_scaled = {}
new_feature_range = tuple([f.copy() for f in feature_range])
for k, v in d_abs.items():
if standardize_cat_vars: # scale numerical values for the category
d_scaled = (v - v.mean()) / (v.std() + 1e-12)
else: # scale by overall min and max
try:
rng = (feature_range[0][0, k], feature_range[1][0, k])
except TypeError:
raise TypeError('Feature-wise min and max ranges need to be specified.')
d_scaled = (v - d_min) / (d_max - d_min) * (rng[1] - rng[0]) + rng[0]
if center: # center the numerical feature values between the min and max feature range
d_scaled -= .5 * (d_scaled.max() + d_scaled.min())
if update_feature_range:
new_feature_range[0][0, k] = d_scaled.min()
new_feature_range[1][0, k] = d_scaled.max()
d_abs_scaled[k] = d_scaled # scaled distance from the origin for each category
if update_feature_range:
feature_range = new_feature_range # type: ignore
return d_abs_scaled, feature_range
[docs]
def squared_pairwise_distance(x: np.ndarray,
y: np.ndarray,
a_min: float = 1e-7,
a_max: float = 1e30) -> np.ndarray:
"""
`numpy` pairwise squared Euclidean distance between samples `x` and `y`.
Parameters
----------
x
A batch of instances of shape `Nx x features`.
y
A batch of instances of shape `Ny x features`.
a_min
Lower bound to clip distance values.
a_max
Upper bound to clip distance values.
Returns
-------
Pairwise squared Euclidean distance `Nx x Ny`.
"""
x2 = np.sum(x ** 2, axis=-1, keepdims=True)
y2 = np.sum(y ** 2, axis=-1, keepdims=True)
dist = x2 + np.transpose(y2, (1, 0)) - 2. * x @ np.transpose(y, (1, 0))
return np.clip(dist, a_min=a_min, a_max=a_max)
[docs]
def batch_compute_kernel_matrix(x: Union[list, np.ndarray],
y: Union[list, np.ndarray],
kernel: Callable[[np.ndarray, np.ndarray], np.ndarray],
batch_size: int = int(1e10),
preprocess_fn: Optional[Callable[[Union[list, np.ndarray]], np.ndarray]] = None
) -> np.ndarray:
"""
Compute the kernel matrix between `x` and `y` by filling in blocks of size
`batch_size x batch_size` at a time.
Parameters
----------
x
The first list/`numpy` array of data instances.
y
The second list/`numpy` array of data instances.
kernel
Kernel function to be used for kernel matrix computation.
batch_size
Batch size to be used for each prediction.
preprocess_fn
Optional preprocessing function for each batch.
Returns
-------
Kernel matrix in the form of a `numpy` array.
"""
if type(x) != type(y): # noqa: E721
raise ValueError("x and y should be of the same type")
n_x, n_y = len(x), len(y)
n_batch_x, n_batch_y = int(np.ceil(n_x / batch_size)), int(np.ceil(n_y / batch_size))
k_is = []
for i in range(n_batch_x):
istart, istop = i * batch_size, min((i + 1) * batch_size, n_x)
x_batch = x[istart:istop]
if preprocess_fn is not None:
x_batch = preprocess_fn(x_batch)
k_ijs = []
for j in range(n_batch_y):
jstart, jstop = j * batch_size, min((j + 1) * batch_size, n_y)
y_batch = y[jstart:jstop]
if preprocess_fn is not None:
y_batch = preprocess_fn(y_batch)
k_ijs.append(kernel(x_batch, y_batch)) # type: ignore
k_is.append(np.concatenate(k_ijs, axis=1))
k_mat = np.concatenate(k_is, axis=0)
return k_mat