import random
from io import BytesIO
from typing import List, Tuple, Union
import cv2
import numpy as np
import skimage as sk
from alibi_detect.utils.data import Bunch
from alibi_detect.utils.discretizer import Discretizer
from alibi_detect.utils.distance import abdm, multidim_scaling
from alibi_detect.utils.mapping import ohe2ord
from PIL import Image
from scipy.ndimage import zoom
from scipy.ndimage.interpolation import map_coordinates
from skimage.filters import gaussian
[docs]
def apply_mask(X: np.ndarray,
mask_size: tuple = (4, 4),
n_masks: int = 1,
coord: tuple = None,
channels: list = [0, 1, 2],
mask_type: str = 'uniform',
noise_distr: tuple = (0, 1),
noise_rng: tuple = (0, 1),
clip_rng: tuple = (0, 1)
) -> Tuple[np.ndarray, np.ndarray]:
"""
Mask images. Can zero out image patches or add normal or uniformly distributed noise.
Parameters
----------
X
Batch of instances to be masked.
mask_size
Tuple with the size of the mask.
n_masks
Number of masks applied for each instance in the batch X.
coord
Upper left (x,y)-coordinates for the mask.
channels
Channels of the image to apply the mask to.
mask_type
Type of mask. One of 'uniform', 'random' (both additive noise) or 'zero' (zero values for mask).
noise_distr
Mean and standard deviation for noise of 'random' mask type.
noise_rng
Min and max value for noise of 'uniform' type.
clip_rng
Min and max values for the masked instances.
Returns
-------
Tuple with masked instances and the masks.
"""
X_shape = X.shape
# initialize mask
if mask_type != 'zero':
mask = np.zeros((n_masks,) + X_shape[1:])
elif mask_type == 'zero':
mask = np.ones((n_masks,) + X_shape[1:])
else:
raise ValueError('Only `normal`, `uniform` and `zero` masking available.')
# create noise for mask
if mask_type == 'normal':
noise = np.random.normal(loc=noise_distr[0], scale=noise_distr[1], size=(n_masks,) + mask_size)
elif mask_type == 'uniform':
noise = np.random.uniform(low=noise_rng[0], high=noise_rng[1], size=(n_masks,) + mask_size)
# find upper left coordinate for mask
if coord is None:
x_start = np.random.randint(0, X_shape[1] - mask_size[0], n_masks)
y_start = np.random.randint(0, X_shape[2] - mask_size[1], n_masks)
else:
x_start, y_start = coord
# update masks
for _ in range(x_start.shape[0]):
if mask_type == 'zero':
update_val: Union[float, np.ndarray] = 0.0
else:
update_val = noise[_]
for c in channels:
mask[
_,
x_start[_]:x_start[_] + mask_size[0],
y_start[_]:y_start[_] + mask_size[1],
c
] = update_val
# apply masks to instances
X_masks = []
for _ in range(X_shape[0]):
if mask_type == 'zero':
X_mask_ = X[_].reshape((1,) + X_shape[1:]) * mask
else:
X_mask_ = np.clip(X[_].reshape((1,) + X_shape[1:]) + mask, clip_rng[0], clip_rng[1])
X_masks.append(X_mask_)
X_mask = np.concatenate(X_masks, axis=0)
return X_mask, mask
[docs]
def inject_outlier_ts(X: np.ndarray,
perc_outlier: int,
perc_window: int = 10,
n_std: float = 2.,
min_std: float = 1.
) -> Bunch:
"""
Inject outliers in both univariate and multivariate time series data.
Parameters
----------
X
Time series data to perturb (inject outliers).
perc_outlier
Percentage of observations which are perturbed to outliers. For multivariate data,
the percentage is evenly split across the individual time series.
perc_window
Percentage of the observations used to compute the standard deviation used in the perturbation.
n_std
Number of standard deviations in the window used to perturb the original data.
min_std
Minimum number of standard deviations away from the current observation. This is included because
of the stochastic nature of the perturbation which could lead to minimal perturbations without a floor.
Returns
-------
Bunch object with the perturbed time series and the outlier labels.
"""
n_dim = len(X.shape)
if n_dim == 1:
X = X.reshape(-1, 1)
n_samples, n_ts = X.shape
X_outlier = X.copy()
is_outlier = np.zeros(n_samples)
# one sided window used to compute mean and stdev from
window = int(perc_window * n_samples * .5 / 100)
# distribute outliers evenly over different time series
n_outlier = int(n_samples * perc_outlier * .01 / n_ts)
if n_outlier == 0:
return Bunch(data=X_outlier, target=is_outlier, target_names=['normal', 'outlier'])
for s in range(n_ts):
outlier_idx = np.sort(random.sample(range(n_samples), n_outlier))
window_idx = [
np.maximum(outlier_idx - window, 0),
np.minimum(outlier_idx + window, n_samples)
]
stdev = np.array([X_outlier[window_idx[0][i]:window_idx[1][i], s].std() for i in range(len(outlier_idx))])
rnd = np.random.normal(size=n_outlier)
X_outlier[outlier_idx, s] += np.sign(rnd) * np.maximum(np.abs(rnd * n_std), min_std) * stdev
is_outlier[outlier_idx] = 1
if n_dim == 1:
X_outlier = X_outlier.reshape(n_samples, )
return Bunch(data=X_outlier, target=is_outlier, target_names=['normal', 'outlier'])
[docs]
def inject_outlier_tabular(X: np.ndarray,
cols: List[int],
perc_outlier: int,
y: np.ndarray = None,
n_std: float = 2.,
min_std: float = 1.
) -> Bunch:
"""
Inject outliers in numerical tabular data.
Parameters
----------
X
Tabular data to perturb (inject outliers).
cols
Columns of X that are numerical and can be perturbed.
perc_outlier
Percentage of observations which are perturbed to outliers. For multiple numerical features,
the percentage is evenly split across the features.
y
Outlier labels.
n_std
Number of feature-wise standard deviations used to perturb the original data.
min_std
Minimum number of standard deviations away from the current observation. This is included because
of the stochastic nature of the perturbation which could lead to minimal perturbations without a floor.
Returns
-------
Bunch object with the perturbed tabular data and the outlier labels.
"""
n_dim = len(X.shape)
if n_dim == 1:
X = X.reshape(-1, 1)
n_samples, n_features = X.shape
X_outlier = X.astype(np.float32).copy()
if y is None:
is_outlier = np.zeros(n_samples)
else:
is_outlier = y
n_cols = len(cols)
# distribute outliers evenly over different columns
n_outlier = int(n_samples * perc_outlier * .01 / n_cols)
if n_outlier == 0:
return Bunch(data=X_outlier, target=is_outlier, target_names=['normal', 'outlier'])
# add perturbations
stdev = X_outlier.std(axis=0)
for col in cols:
outlier_idx = np.sort(random.sample(range(n_samples), n_outlier))
rnd = np.random.normal(size=n_outlier)
X_outlier[outlier_idx, col] += np.sign(rnd) * np.maximum(np.abs(rnd * n_std), min_std) * stdev[col]
is_outlier[outlier_idx] = 1
if n_dim == 1:
X_outlier = X_outlier.reshape(n_samples, )
return Bunch(data=X_outlier, target=is_outlier, target_names=['normal', 'outlier'])
[docs]
def inject_outlier_categorical(X: np.ndarray,
cols: List[int],
perc_outlier: int,
y: np.ndarray = None,
cat_perturb: dict = None,
X_fit: np.ndarray = None,
disc_perc: list = [25, 50, 75],
smooth: float = 1.
) -> Bunch:
"""
Inject outliers in categorical variables of tabular data.
Parameters
----------
X
Tabular data with categorical variables to perturb (inject outliers).
cols
Columns of X that are categorical and can be perturbed.
perc_outlier
Percentage of observations which are perturbed to outliers. For multiple numerical features,
the percentage is evenly split across the features.
y
Outlier labels.
cat_perturb
Dictionary mapping each category in the categorical variables to their furthest neighbour.
X_fit
Optional data used to infer pairwise distances from.
disc_perc
List with percentiles used in binning of numerical features used for the 'abdm' pairwise distance measure.
smooth
Smoothing exponent between 0 and 1 for the distances.
Lower values will smooth the difference in distance metric between different features.
Returns
-------
Bunch object with the perturbed tabular data, outlier labels and \
a dictionary used to map categories to their furthest neighbour.
"""
if cat_perturb is None:
# transform the categorical variables into numerical ones via
# pairwise distances computed with abdm and multidim scaling
X_fit = X.copy() if X_fit is None else X_fit
# find number of categories for each categorical variable
cat_vars = {k: None for k in cols}
for k in cols:
cat_vars[k] = len(np.unique(X_fit[:, k])) # type: ignore
# TODO: extend method for OHE
ohe = False
if ohe:
X_ord, cat_vars_ord = ohe2ord(X, cat_vars)
else:
X_ord, cat_vars_ord = X, cat_vars
# bin numerical features to compute the pairwise distance matrices
n_ord = X_ord.shape[1]
if len(cols) != n_ord:
fnames = [str(_) for _ in range(n_ord)]
disc = Discretizer(X_ord, cols, fnames, percentiles=disc_perc)
X_bin = disc.discretize(X_ord)
cat_vars_bin = {k: len(disc.names[k]) for k in range(n_ord) if k not in cols}
else:
X_bin = X_ord
cat_vars_bin = {}
# pairwise distances for categorical variables
d_pair = abdm(X_bin, cat_vars_ord, cat_vars_bin)
# multidim scaling
feature_range = (np.ones((1, n_ord)) * -1e10, np.ones((1, n_ord)) * 1e10)
d_abs = multidim_scaling(d_pair,
n_components=2,
use_metric=True,
standardize_cat_vars=True,
smooth=smooth,
feature_range=feature_range,
update_feature_range=False)[0]
# find furthest category away for each category in the categorical variables
cat_perturb = {k: np.zeros(len(v)) for k, v in d_abs.items()}
for k, v in d_abs.items():
for i in range(len(v)):
cat_perturb[k][i] = np.argmax(np.abs(v[i] - v))
else:
d_abs = None
n_dim = len(X.shape)
if n_dim == 1:
X = X.reshape(-1, 1)
n_samples, n_features = X.shape
X_outlier = X.astype(np.float32).copy()
if y is None:
is_outlier = np.zeros(n_samples)
else:
is_outlier = y
n_cols = len(cols)
# distribute outliers evenly over different columns
n_outlier = int(n_samples * perc_outlier * .01 / n_cols)
for col in cols:
outlier_idx = np.sort(random.sample(range(n_samples), n_outlier))
col_cat = X_outlier[outlier_idx, col].astype(int)
col_map = np.tile(cat_perturb[col], (n_outlier, 1))
X_outlier[outlier_idx, col] = np.diag(col_map.T[col_cat])
is_outlier[outlier_idx] = 1
if n_dim == 1:
X_outlier = X_outlier.reshape(n_samples, )
return Bunch(data=X_outlier,
target=is_outlier,
cat_perturb=cat_perturb,
d_abs=d_abs,
target_names=['normal', 'outlier'])
# Note: the perturbation functions below are adopted from
# https://github.com/hendrycks/robustness/blob/master/ImageNet-C/imagenet_c/imagenet_c/corruptions.py
# and used in Dan Hendrycks and Thomas Dietterich, "Benchmarking Neural Network Robustness to Common
# Corruptions and Perturbations" (ICLR 2019).
# TODO: add proper batch support
[docs]
def scale_minmax(x: np.ndarray, xrange: tuple = None) -> Tuple[np.ndarray, bool]:
"""
Minmax scaling to [0,1].
Parameters
----------
x
Numpy array to be scaled.
xrange
Tuple with min and max data range.
Returns
-------
Scaled array and boolean whether the array is actually scaled.
"""
scale_back = False
if isinstance(xrange, tuple):
scale_back = True
x = (x - xrange[0]) / (xrange[1] - xrange[0])
return x, scale_back
# Noise
[docs]
def gaussian_noise(x: np.ndarray, stdev: float, xrange: tuple = None) -> np.ndarray:
"""
Inject Gaussian noise.
Parameters
----------
x
Instance to be perturbed.
stdev
Standard deviation of noise.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x_gn = x + np.random.normal(size=x.shape, scale=stdev)
if scale_back:
x_gn = x_gn * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_gn, xrange[0], xrange[1])
else:
return x_gn
[docs]
def shot_noise(x: np.ndarray, lam: float, xrange: tuple = None) -> np.ndarray:
"""
Inject Poisson noise.
Parameters
----------
x
Instance to be perturbed.
lam
Scalar for the lambda parameter determining the expectation of the interval.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x_sn = np.random.poisson(x * lam) / float(lam)
if scale_back:
x_sn = x_sn * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_sn, xrange[0], xrange[1])
else:
return x_sn
[docs]
def speckle_noise(x: np.ndarray, stdev: float, xrange: tuple = None) -> np.ndarray:
"""
Inject speckle noise.
Parameters
----------
x
Instance to be perturbed.
stdev
Standard deviation of noise.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x_sp = x * (1 + np.random.normal(size=x.shape, scale=stdev))
if scale_back:
x_sp = x_sp * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_sp, xrange[0], xrange[1])
else:
return x_sp
[docs]
def impulse_noise(x: np.ndarray, amount: float, xrange: tuple = None) -> np.ndarray:
"""
Inject salt & pepper noise.
Parameters
----------
x
Instance to be perturbed.
amount
Proportion of pixels to replace with noise.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
if isinstance(xrange, tuple):
xmin, xmax = xrange[0], xrange[1]
else:
xmin, xmax = x.min(), x.max()
x_sc = (x - xmin) / (xmax - xmin) # scale to [0,1]
x_in = sk.util.random_noise(x_sc, mode='s&p', amount=amount) # inject noise
x_in = x_in * (xmax - xmin) + xmin # scale back
if isinstance(xrange, tuple):
return np.clip(x_in, xrange[0], xrange[1])
else:
return x_in
# Blur
[docs]
def gaussian_blur(x: np.ndarray, sigma: float, channel_axis: int = -1, xrange: tuple = None) -> np.ndarray:
"""
Apply Gaussian blur.
Parameters
----------
x
Instance to be perturbed.
sigma
Standard deviation determining the strength of the blur.
channel_axis
Denotes the axis of the colour channel. If `None` the image is assumed to be grayscale.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x_gb = gaussian(x, sigma=sigma, channel_axis=channel_axis)
if scale_back:
x_gb = x_gb * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_gb, xrange[0], xrange[1])
else:
return x_gb
[docs]
def clipped_zoom(x: np.ndarray, zoom_factor: float) -> np.ndarray:
"""
Helper function for zoom blur.
Parameters
----------
x
Instance to be perturbed.
zoom_factor
Zoom strength.
Returns
-------
Cropped and zoomed instance.
"""
h = x.shape[0]
ch = int(np.ceil(h / float(zoom_factor))) # ceil crop height(= crop width)
top = (h - ch) // 2
x = zoom(x[top:top + ch, top:top + ch], (zoom_factor, zoom_factor, 1), order=1)
trim_top = (x.shape[0] - h) // 2 # trim off any extra pixels
return x[trim_top:trim_top + h, trim_top:trim_top + h]
[docs]
def zoom_blur(x: np.ndarray, max_zoom: float, step_zoom: float, xrange: tuple = None) -> np.ndarray:
"""
Apply zoom blur.
Parameters
----------
x
Instance to be perturbed.
max_zoom
Max zoom strength.
step_zoom
Step size to go from 1 to `max_zoom` strength.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
zoom_factors = np.arange(1, max_zoom, step_zoom)
out = np.zeros_like(x)
for zoom_factor in zoom_factors:
out += clipped_zoom(x, zoom_factor)
x_z = (x + out) / (len(zoom_factors) + 1)
if scale_back:
x_z = x_z * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_z, xrange[0], xrange[1])
else:
return x_z
[docs]
def glass_blur(x: np.ndarray, sigma: float, max_delta: int, iterations: int, xrange: tuple = None) -> np.ndarray:
"""
Apply glass blur.
Parameters
----------
x
Instance to be perturbed.
sigma
Standard deviation determining the strength of the Gaussian perturbation.
max_delta
Maximum pixel range for the blurring.
iterations
Number of blurring iterations.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
nrows, ncols = x.shape[:2]
if not isinstance(xrange, tuple):
xrange = (x.min(), x.max())
if xrange[0] != 0 or xrange[1] != 255:
x = (x - xrange[0]) / (xrange[1] - xrange[0]) * 255
x = gaussian(x, sigma=sigma, channel_axis=-1).astype(np.uint8) # assume [h, w, c] image layout
for i in range(iterations):
for h in range(nrows - max_delta, max_delta, -1):
for w in range(ncols - max_delta, max_delta, -1):
dx, dy = np.random.randint(-max_delta, max_delta, size=(2,))
h_prime, w_prime = h + dy, w + dx
x[h, w], x[h_prime, w_prime] = x[h_prime, w_prime], x[h, w]
x_gb = gaussian(x / 255, sigma=sigma, channel_axis=-1)
x_gb = x_gb * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_gb, xrange[0], xrange[1])
else:
return x_gb
[docs]
def disk(radius: float, alias_blur: float = 0.1, dtype=np.float32) -> np.ndarray:
"""
Helper function for defocus blur.
Parameters
----------
radius
Radius for the Gaussian kernel.
alias_blur
Standard deviation for the Gaussian kernel in both X and Y directions.
dtype
Data type.
Returns
-------
Kernel used for Gaussian blurring.
"""
if radius <= 8.:
L = np.arange(-8., 8. + 1)
ksize = (3, 3)
else:
L = np.arange(-radius, radius + 1)
ksize = (5, 5)
X, Y = np.meshgrid(L, L)
aliased_disk = np.array((X ** 2 + Y ** 2) <= radius ** 2, dtype=dtype)
aliased_disk /= np.sum(aliased_disk)
# supersample disk to antialias
return cv2.GaussianBlur(aliased_disk, ksize=ksize, sigmaX=alias_blur)
[docs]
def defocus_blur(x: np.ndarray, radius: int, alias_blur: float, xrange: tuple = None) -> np.ndarray:
"""
Apply defocus blur.
Parameters
----------
x
Instance to be perturbed.
radius
Radius for the Gaussian kernel.
alias_blur
Standard deviation for the Gaussian kernel in both X and Y directions.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
kernel = disk(radius=radius, alias_blur=alias_blur)
channels = []
for d in range(3):
channels.append(cv2.filter2D(x[:, :, d], -1, kernel))
x_db = np.array(channels).transpose((1, 2, 0))
if scale_back:
x_db = x_db * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_db, xrange[0], xrange[1])
else:
return x_db
[docs]
def plasma_fractal(mapsize: int = 256, wibbledecay: float = 3.) -> np.ndarray:
"""
Helper function to apply fog to instance.
Generates a heightmap using diamond-square algorithm.
Returns a square 2d array, side length 'mapsize', of floats in range 0-255.
'mapsize' must be a power of two.
"""
assert (mapsize & (mapsize - 1) == 0)
maparray = np.empty((mapsize, mapsize), dtype=np.float_)
maparray[0, 0] = 0
stepsize = mapsize
wibble = 100.
def wibbledmean(array):
return array / 4 + wibble * np.random.uniform(-wibble, wibble, array.shape)
def fillsquares():
"""For each square of points stepsize apart,
calculate middle value as mean of points + wibble"""
cornerref = maparray[0:mapsize:stepsize, 0:mapsize:stepsize]
squareaccum = cornerref + np.roll(cornerref, shift=-1, axis=0)
squareaccum += np.roll(squareaccum, shift=-1, axis=1)
maparray[stepsize // 2:mapsize:stepsize, stepsize // 2:mapsize:stepsize] = wibbledmean(squareaccum)
def filldiamonds():
"""For each diamond of points stepsize apart,
calculate middle value as mean of points + wibble"""
mapsize = maparray.shape[0]
drgrid = maparray[stepsize // 2:mapsize:stepsize, stepsize // 2:mapsize:stepsize]
ulgrid = maparray[0:mapsize:stepsize, 0:mapsize:stepsize]
ldrsum = drgrid + np.roll(drgrid, 1, axis=0)
lulsum = ulgrid + np.roll(ulgrid, -1, axis=1)
ltsum = ldrsum + lulsum
maparray[0:mapsize:stepsize, stepsize // 2:mapsize:stepsize] = wibbledmean(ltsum)
tdrsum = drgrid + np.roll(drgrid, 1, axis=1)
tulsum = ulgrid + np.roll(ulgrid, -1, axis=0)
ttsum = tdrsum + tulsum
maparray[stepsize // 2:mapsize:stepsize, 0:mapsize:stepsize] = wibbledmean(ttsum)
while stepsize >= 2:
fillsquares()
filldiamonds()
stepsize //= 2
wibble /= wibbledecay
maparray -= maparray.min()
return maparray / maparray.max()
[docs]
def fog(x: np.ndarray, fractal_mult: float, wibbledecay: float, xrange: tuple = None) -> np.ndarray:
"""
Apply fog to instance.
Parameters
----------
x
Instance to be perturbed.
fractal_mult
Strength applied to `plasma_fractal` output.
wibbledecay
Decay factor for size of noise that is applied.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
max_val = x.max()
nrows, ncols = x.shape[:2]
x_fo = x + fractal_mult * plasma_fractal(wibbledecay=wibbledecay)[:nrows, :ncols][..., np.newaxis]
x_fo = x_fo * max_val / (max_val + fractal_mult)
if scale_back:
x_fo = x_fo * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_fo, xrange[0], xrange[1])
else:
return x_fo
# Digital
[docs]
def contrast(x: np.ndarray, strength: float, xrange: tuple = None) -> np.ndarray:
"""
Change contrast of image.
Parameters
----------
x
Instance to be perturbed.
strength
Strength of contrast change. Lower is actually more contrast.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
means = np.mean(x, axis=(0, 1), keepdims=True)
x_co = (x - means) * strength + means
if scale_back:
x_co = x_co * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_co, xrange[0], xrange[1])
else:
return x_co
[docs]
def brightness(x: np.ndarray, strength: float, xrange: tuple = None) -> np.ndarray:
"""
Change brightness of image.
Parameters
----------
x
Instance to be perturbed.
strength
Strength of brightness change.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x = sk.color.rgb2hsv(x)
x[:, :, 2] = np.clip(x[:, :, 2] + strength, xrange[0], xrange[1])
x_br = sk.color.hsv2rgb(x)
if scale_back:
x_br = x_br * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_br, xrange[0], xrange[1])
else:
return x_br
[docs]
def saturate(x: np.ndarray, strength: tuple, xrange: tuple = None) -> np.ndarray:
"""
Change colour saturation of image.
Parameters
----------
x
Instance to be perturbed.
strength
Strength of saturation change. Tuple consists of (multiplier, shift) of the perturbation.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
x, scale_back = scale_minmax(x, xrange)
x = sk.color.rgb2hsv(x)
x[:, :, 1] = x[:, :, 1] * strength[0] + strength[1]
if isinstance(xrange, tuple):
x[:, :, 1] = np.clip(x[:, :, 1], xrange[0], xrange[1])
x_sa = sk.color.hsv2rgb(x)
if scale_back:
x_sa = x_sa * (xrange[1] - xrange[0]) + xrange[0]
if isinstance(xrange, tuple):
return np.clip(x_sa, xrange[0], xrange[1])
else:
return x_sa
[docs]
def pixelate(x: np.ndarray, strength: float, xrange: tuple = None) -> np.ndarray:
"""
Change coarseness of pixels for an image.
Parameters
----------
x
Instance to be perturbed.
strength
Strength of pixelation (<1). Lower is actually more pixelated.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
rows, cols = x.shape[:2]
if not isinstance(xrange, tuple):
xrange = (x.min(), x.max())
if xrange[0] != 0 or xrange[1] != 255:
x = (x - xrange[0]) / (xrange[1] - xrange[0]) * 255
im = Image.fromarray(x.astype('uint8'), mode='RGB')
im = im.resize((int(rows * strength), int(cols * strength)), Image.BOX)
im = im.resize((rows, cols), Image.BOX)
x_pi = np.array(im, dtype=np.float32) / 255
x_pi = x_pi * (xrange[1] - xrange[0]) + xrange[0]
return x_pi
[docs]
def jpeg_compression(x: np.ndarray, strength: float, xrange: tuple = None) -> np.ndarray:
"""
Simulate changes due to JPEG compression for an image.
Parameters
----------
x
Instance to be perturbed.
strength
Strength of compression (>1). Lower is actually more compressed.
xrange
Tuple with min and max data range.
Returns
-------
Perturbed instance.
"""
if not isinstance(xrange, tuple):
xrange = (x.min(), x.max())
if xrange[0] != 0 or xrange[1] != 255:
x = (x - xrange[0]) / (xrange[1] - xrange[0]) * 255
x = Image.fromarray(x.astype('uint8'), mode='RGB')
output = BytesIO()
x.save(output, 'JPEG', quality=strength) # type: ignore[attr-defined] # TODO: allow redefinition
x = Image.open(output)
x_jpeg = np.array(x, dtype=np.float32) / 255
x_jpeg = x_jpeg * (xrange[1] - xrange[0]) + xrange[0]
return x_jpeg